diff --git a/frac/sealed/block_offsets.go b/frac/sealed/block_offsets.go index 2be59942..d644a0f7 100644 --- a/frac/sealed/block_offsets.go +++ b/frac/sealed/block_offsets.go @@ -6,13 +6,17 @@ import ( ) type BlockOffsets struct { - IDsTotal uint32 // todo: the best place for this field is Info block - Offsets []uint64 + Offsets []uint64 } func (b *BlockOffsets) Pack(buf []byte) []byte { buf = binary.LittleEndian.AppendUint32(buf, uint32(len(b.Offsets))) - buf = binary.LittleEndian.AppendUint32(buf, b.IDsTotal) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + // + // I've created a task which will require fraction binary version bumping + // to get rid of this: https://github.com/ozontech/seq-db/issues/409 + buf = binary.LittleEndian.AppendUint32(buf, 0) var prev uint64 for _, pos := range b.Offsets { @@ -26,13 +30,16 @@ func (b *BlockOffsets) Unpack(data []byte) error { if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing offsets count)") } + idsBlocksCount := binary.LittleEndian.Uint32(data) data = data[4:] if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing IDsTotal)") } - b.IDsTotal = binary.LittleEndian.Uint32(data) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + _ = binary.LittleEndian.Uint32(data) data = data[4:] offset := uint64(0) @@ -42,15 +49,20 @@ func (b *BlockOffsets) Unpack(data []byte) error { if n == 0 { return errors.New("blocks offset decoding error: varint returned 0") } + if n < 0 { return errors.New("blocks offset decoding error: varint overflow") } + data = data[n:] offset += uint64(delta) + b.Offsets = append(b.Offsets, offset) } + if uint32(len(b.Offsets)) != idsBlocksCount { return errors.New("blocks offset decoding error: offset count mismatch") } + return nil } diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index a4c9ecdb..1f0c05de 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -13,7 +13,6 @@ import ( type Table struct { MinBlockIDs []seq.ID // from max to min - IDBlocksTotal uint32 IDsTotal uint32 StartBlockIndex uint32 } diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 893b75a4..10d95a2d 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -36,7 +36,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re l.skipSection() // skip token table blocks var err error - blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer) + blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info) if err != nil { logger.Fatal("legacy load ids error", zap.Error(err)) } @@ -77,7 +77,7 @@ func (l *LegacyLoader) skipSection() { } // loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets. -func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) { +func (l *LegacyLoader) loadIDs(info *common.Info) (seqids.Table, []uint64, error) { var buf []byte data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf) @@ -94,9 +94,8 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab l.blockIndex++ table := seqids.Table{ - StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index - IDsTotal: offsets.IDsTotal, - IDBlocksTotal: uint32(len(offsets.Offsets)), + StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } for { @@ -111,7 +110,7 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab } mid := seq.MID(h.GetExt1()) - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(h.GetExt1()) } @@ -184,10 +183,9 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers if err != nil { logger.Fatal("load offsets error", zap.Error(err)) } - blocksData.BlocksOffsets = blockOffsets.Offsets - blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) + blocksData.IDsTable = l.loadIDsTable(readers.ID, info) blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) if err != nil { logger.Fatal("load lids error", zap.Error(err)) @@ -227,10 +225,10 @@ func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets, // loadIDsTable scans block headers in the .id file to build seqids.Table. // Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers. -func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table { +func (l *Loader) loadIDsTable(r storage.IndexReader, info *common.Info) seqids.Table { table := seqids.Table{ StartBlockIndex: 0, - IDsTotal: idsTotal, + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } blocksCount, err := r.BlocksCount() @@ -248,7 +246,7 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio } var mid seq.MID - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(header.GetExt1()) } else { mid = seq.MID(header.GetExt1()) @@ -258,8 +256,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio MID: mid, RID: seq.RID(header.GetExt2()), }) - - table.IDBlocksTotal++ } return table diff --git a/indexwriter/index.go b/indexwriter/index.go index 12fedafe..f317d2d2 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -92,11 +92,7 @@ func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { } defer w.release() - offsets := sealed.BlockOffsets{ - IDsTotal: src.Info().DocsTotal + 1, - Offsets: src.BlockOffsets(), - } - + offsets := sealed.BlockOffsets{Offsets: src.BlockOffsets()} if err := w.writeBlock(blockTypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil { return err } @@ -214,6 +210,7 @@ func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock { // packInfoBlock packs fraction information into an index block. func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { + s.idsTable.IDsTotal = block.Info.DocsTotal + 1 // Increment by one for [seq.SystemID] s.buf1 = block.Pack(s.buf1[:0]) return newIndexBlock(s.buf1) // Info block is typically small, no compression } @@ -238,10 +235,6 @@ func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) inde // packBlocksOffsetsBlock packs document block offsets into a compressed index block. func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { - // Update IDs table for PreloadedData - s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs - s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks - // Packing block s.buf1 = block.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel)