Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions frac/sealed/block_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
)

type BlockOffsets struct {
IDsTotal uint32 // todo: the best place for this field is Info block
Offsets []uint64
Offsets []uint64
}

func (b *BlockOffsets) Pack(buf []byte) []byte {
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(b.Offsets)))
buf = binary.LittleEndian.AppendUint32(buf, b.IDsTotal)

// NOTE(dkharms): Previously we stored here amount of documents ids.
//
// I've created a task which will require fraction binary version bumping
// to get rid of this: https://github.com/ozontech/seq-db/issues/409
buf = binary.LittleEndian.AppendUint32(buf, 0)

var prev uint64
for _, pos := range b.Offsets {
Expand All @@ -26,13 +30,16 @@ func (b *BlockOffsets) Unpack(data []byte) error {
if len(data) < 4 {
return errors.New("blocks offset decoding error: truncated header (missing offsets count)")
}

idsBlocksCount := binary.LittleEndian.Uint32(data)
data = data[4:]

if len(data) < 4 {
return errors.New("blocks offset decoding error: truncated header (missing IDsTotal)")
}
b.IDsTotal = binary.LittleEndian.Uint32(data)

// NOTE(dkharms): Previously we stored here amount of documents ids.
_ = binary.LittleEndian.Uint32(data)
data = data[4:]

offset := uint64(0)
Expand All @@ -42,15 +49,20 @@ func (b *BlockOffsets) Unpack(data []byte) error {
if n == 0 {
return errors.New("blocks offset decoding error: varint returned 0")
}

if n < 0 {
return errors.New("blocks offset decoding error: varint overflow")
}

data = data[n:]
offset += uint64(delta)

b.Offsets = append(b.Offsets, offset)
}

if uint32(len(b.Offsets)) != idsBlocksCount {
return errors.New("blocks offset decoding error: offset count mismatch")
}

return nil
}
1 change: 0 additions & 1 deletion frac/sealed/seqids/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

type Table struct {
MinBlockIDs []seq.ID // from max to min
IDBlocksTotal uint32
IDsTotal uint32
StartBlockIndex uint32
}
Expand Down
22 changes: 9 additions & 13 deletions frac/sealed_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re
l.skipSection() // skip token table blocks

var err error
blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer)
blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info)
if err != nil {
logger.Fatal("legacy load ids error", zap.Error(err))
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (l *LegacyLoader) skipSection() {
}

// loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets.
func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) {
func (l *LegacyLoader) loadIDs(info *common.Info) (seqids.Table, []uint64, error) {
var buf []byte

data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf)
Expand All @@ -94,9 +94,8 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab
l.blockIndex++

table := seqids.Table{
StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index
IDsTotal: offsets.IDsTotal,
IDBlocksTotal: uint32(len(offsets.Offsets)),
StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index
IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID]
}

for {
Expand All @@ -111,7 +110,7 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab
}

mid := seq.MID(h.GetExt1())
if fracVersion < config.BinaryDataV2 {
if info.BinaryDataVer < config.BinaryDataV2 {
mid = seq.MillisToMID(h.GetExt1())
}

Expand Down Expand Up @@ -184,10 +183,9 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers
if err != nil {
logger.Fatal("load offsets error", zap.Error(err))
}

blocksData.BlocksOffsets = blockOffsets.Offsets
blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer)

blocksData.IDsTable = l.loadIDsTable(readers.ID, info)
blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID)
if err != nil {
logger.Fatal("load lids error", zap.Error(err))
Expand Down Expand Up @@ -227,10 +225,10 @@ func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets,

// loadIDsTable scans block headers in the .id file to build seqids.Table.
// Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers.
func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table {
func (l *Loader) loadIDsTable(r storage.IndexReader, info *common.Info) seqids.Table {
table := seqids.Table{
StartBlockIndex: 0,
IDsTotal: idsTotal,
IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID]
}

blocksCount, err := r.BlocksCount()
Expand All @@ -248,7 +246,7 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
}

var mid seq.MID
if fracVersion < config.BinaryDataV2 {
if info.BinaryDataVer < config.BinaryDataV2 {
mid = seq.MillisToMID(header.GetExt1())
} else {
mid = seq.MID(header.GetExt1())
Expand All @@ -258,8 +256,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio
MID: mid,
RID: seq.RID(header.GetExt2()),
})

table.IDBlocksTotal++
}

return table
Expand Down
11 changes: 2 additions & 9 deletions indexwriter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error {
}
defer w.release()

offsets := sealed.BlockOffsets{
IDsTotal: src.Info().DocsTotal + 1,
Offsets: src.BlockOffsets(),
}

offsets := sealed.BlockOffsets{Offsets: src.BlockOffsets()}
if err := w.writeBlock(blockTypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil {
return err
}
Expand Down Expand Up @@ -214,6 +210,7 @@ func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock {

// packInfoBlock packs fraction information into an index block.
func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock {
s.idsTable.IDsTotal = block.Info.DocsTotal + 1 // Increment by one for [seq.SystemID]
s.buf1 = block.Pack(s.buf1[:0])
return newIndexBlock(s.buf1) // Info block is typically small, no compression
}
Expand All @@ -238,10 +235,6 @@ func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) inde

// packBlocksOffsetsBlock packs document block offsets into a compressed index block.
func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock {
// Update IDs table for PreloadedData
s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs
s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks

// Packing block
s.buf1 = block.Pack(s.buf1[:0])
b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel)
Expand Down