Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2945,6 +2945,8 @@ func (d *DB) runCopyCompaction(
var wrote uint64
err = d.fileCache.withReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) error {
var err error
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat())
writerOpts.CompressionCounters = d.compressionCounters.Compressed.ForLevel(base.MakeLevel(c.outputLevel.level))
// TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
// or update category stats).
wrote, err = sstable.CopySpan(ctx,
Expand Down
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ type DB struct {
// compaction concurrency
openedAt time.Time

compressionCounters block.CompressionCounters

iterTracker *inflight.Tracker
}

Expand Down Expand Up @@ -2013,6 +2015,9 @@ func (d *DB) Metrics() *Metrics {
blobCompressionMetrics := blobCompressionStatsAnnotator.Annotation(&vers.BlobFiles)
metrics.BlobFiles.Compression.MergeWith(&blobCompressionMetrics)

metrics.CompressionCounters.LogicalBytesCompressed = d.compressionCounters.LoadCompressed()
metrics.CompressionCounters.LogicalBytesDecompressed = d.compressionCounters.LoadDecompressed()

metrics.BlockCache = d.opts.Cache.Metrics()
metrics.FileCache, metrics.Filter = d.fileCache.Metrics()
metrics.TableIters = d.fileCache.IterCount()
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ func TestMemTableReservation(t *testing.T) {
t.Fatalf("expected 2 refs, but found %d", refs)
}
// Verify the memtable reservation has caused our test block to be evicted.
if cv := tmpHandle.Peek(base.DiskFileNum(0), 0, cache.MakeLevel(0), cache.CategoryBackground); cv != nil {
if cv := tmpHandle.Peek(base.DiskFileNum(0), 0, base.MakeLevel(0), cache.CategoryBackground); cv != nil {
t.Fatalf("expected failure, but found success: %#v", cv)
}

Expand Down
2 changes: 1 addition & 1 deletion file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (h *fileCacheHandle) newIters(
internalOpts.readEnv.IsSharedIngested = env.IsSharedIngested
internalOpts.readEnv.InternalBounds = env.InternalBounds
if opts != nil && opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
internalOpts.readEnv.Block.Level = cache.MakeLevel(opts.layer.Level())
internalOpts.readEnv.Block.Level = base.MakeLevel(opts.layer.Level())
}

var iters iterSet
Expand Down
2 changes: 1 addition & 1 deletion flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheHandle, pendingOutputs)
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheHandle, &d.compressionCounters, pendingOutputs)
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 7 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func ingestLoad1(
fmv FormatMajorVersion,
readable objstorage.Readable,
cacheHandle *cache.Handle,
compressionCounters *block.CompressionCounters,
tableNum base.TableNum,
rangeKeyValidator rangeKeyIngestValidator,
) (
Expand All @@ -270,6 +271,9 @@ func ingestLoad1(
CacheHandle: cacheHandle,
FileNum: base.PhysicalTableDiskFileNum(tableNum),
}
if compressionCounters != nil {
o.CompressionCounters = &compressionCounters.Decompressed
}
r, err := sstable.NewReader(ctx, readable, o)
if err != nil {
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.CombineErrors(err, readable.Close())
Expand Down Expand Up @@ -498,6 +502,7 @@ func ingestLoad(
shared []SharedSSTMeta,
external []ExternalFile,
cacheHandle *cache.Handle,
compressionCounters *block.CompressionCounters,
pending []base.TableNum,
) (ingestLoadResult, error) {
localFileNums := pending[:len(paths)]
Expand Down Expand Up @@ -531,7 +536,7 @@ func ingestLoad(
if !shouldDisableRangeKeyChecks {
rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
}
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, compressionCounters, localFileNums[i], rangeKeyValidator)
if err != nil {
return ingestLoadResult{}, err
}
Expand Down Expand Up @@ -1480,7 +1485,7 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
// Load the metadata for all the files being ingested. This step detects
// and elides empty sstables.
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external,
d.cacheHandle, pendingOutputs)
d.cacheHandle, &d.compressionCounters, pendingOutputs)
if err != nil {
return IngestOperationStats{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestIngestLoad(t *testing.T) {
FS: mem,
}
opts.WithFSDefaults()
lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, nil, []base.TableNum{1})
lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, nil, nil, []base.TableNum{1})
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestIngestLoadRand(t *testing.T) {
}
opts.WithFSDefaults()
opts.EnsureDefaults()
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, nil, pending)
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, nil, nil, pending)
require.NoError(t, err)

// Reset flaky stats.
Expand All @@ -272,7 +272,7 @@ func TestIngestLoadInvalid(t *testing.T) {
FS: mem,
}
opts.WithFSDefaults()
if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, nil, []base.TableNum{1}); err == nil {
if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, nil, nil, []base.TableNum{1}); err == nil {
t.Fatalf("expected error, but found success")
}
}
Expand Down
7 changes: 6 additions & 1 deletion internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package pebble

import "github.com/cockroachdb/pebble/internal/base"
import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/sstable/block"
)

// SeqNum exports the base.SeqNum type.
type SeqNum = base.SeqNum
Expand Down Expand Up @@ -80,3 +83,5 @@ type ShortAttribute = base.ShortAttribute
// LazyValue.Clone requires a pointer to a LazyFetcher struct to avoid
// allocations. No code outside Pebble needs to peer into a LazyFetcher.
type LazyFetcher = base.LazyFetcher

type CompressionCounters = block.CompressionCounters
44 changes: 44 additions & 0 deletions internal/base/level.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package base

import (
"fmt"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/invariants"
)

// Level identifies an LSM level. The zero value indicates that the level is
// uninitialized or unknown.
type Level struct {
// v contains the level in the lowest 7 bits. The highest bit is set iff the
// value is valid.
v uint8
}

const validBit = 1 << 7

func (l Level) Get() (level int, ok bool) {
return int(l.v &^ validBit), l.Valid()
}

func (l Level) Valid() bool {
return l.v&validBit != 0
}

func (l Level) String() string {
if level, ok := l.Get(); ok {
return fmt.Sprintf("L%d", level)
}
return "n/a"
}

func MakeLevel(l int) Level {
if invariants.Enabled && l < 0 || l >= validBit {
panic(errors.AssertionFailedf("invalid level: %d", l))
}
return Level{uint8(l) | validBit}
}
6 changes: 3 additions & 3 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (c *Handle) Cache() *Cache {
// Peek supports the special CategoryHidden category, in which case the hit or
// miss is not recorded in metrics.
func (c *Handle) Peek(
fileNum base.DiskFileNum, offset uint64, level Level, category Category,
fileNum base.DiskFileNum, offset uint64, level base.Level, category Category,
) *Value {
k := makeKey(c.id, fileNum, offset)
return c.cache.getShard(k).get(k, level, category, true /* peekOnly */)
Expand All @@ -265,7 +265,7 @@ const CategoryHidden Category = -1
// Get retrieves the cache value for the specified file and offset, returning
// nil if no value is present.
func (c *Handle) Get(
fileNum base.DiskFileNum, offset uint64, level Level, category Category,
fileNum base.DiskFileNum, offset uint64, level base.Level, category Category,
) *Value {
k := makeKey(c.id, fileNum, offset)
return c.cache.getShard(k).get(k, level, category, false /* peekOnly */)
Expand Down Expand Up @@ -295,7 +295,7 @@ func (c *Handle) Get(
// While waiting, someone else may successfully read the value, which results
// in a valid Handle being returned. This is a case where cacheHit=false.
func (c *Handle) GetWithReadHandle(
ctx context.Context, fileNum base.DiskFileNum, offset uint64, level Level, category Category,
ctx context.Context, fileNum base.DiskFileNum, offset uint64, level base.Level, category Category,
) (
cv *Value,
rh ReadHandle,
Expand Down
21 changes: 12 additions & 9 deletions internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCache(t *testing.T) {
wantHit := fields[1][0] == 'h'

var hit bool
cv := h.Get(base.DiskFileNum(key), 0, MakeLevel(0), CategorySSTableData)
cv := h.Get(base.DiskFileNum(key), 0, base.MakeLevel(0), CategorySSTableData)
if cv == nil {
cv = Alloc(1)
cv.RawBuffer()[0] = fields[0][0]
Expand Down Expand Up @@ -81,14 +81,14 @@ func TestCachePeek(t *testing.T) {
setTestValue(h, 0, uint64(i), "a", 1)
}
for i := range size / 2 {
v := h.Get(base.DiskFileNum(0), uint64(i), MakeLevel(0), CategoryBackground)
v := h.Get(base.DiskFileNum(0), uint64(i), base.MakeLevel(0), CategoryBackground)
if v == nil {
t.Fatalf("expected to find block %d", i)
}
v.Release()
}
for i := size / 2; i < size; i++ {
v := h.Peek(base.DiskFileNum(0), uint64(i), MakeLevel(0), CategoryBackground)
v := h.Peek(base.DiskFileNum(0), uint64(i), base.MakeLevel(0), CategoryBackground)
if v == nil {
t.Fatalf("expected to find block %d", i)
}
Expand All @@ -100,7 +100,7 @@ func TestCachePeek(t *testing.T) {
}
// Verify that the Gets still find their values, despite the Peeks.
for i := range size / 2 {
v := h.Get(base.DiskFileNum(0), uint64(i), MakeLevel(0), CategoryBackground)
v := h.Get(base.DiskFileNum(0), uint64(i), base.MakeLevel(0), CategoryBackground)
if v == nil {
t.Fatalf("expected to find block %d", i)
}
Expand All @@ -124,12 +124,12 @@ func TestCacheDelete(t *testing.T) {
if expected, size := int64(10), cache.Size(); expected != size {
t.Fatalf("expected cache size %d, but found %d", expected, size)
}
if v := h.Get(base.DiskFileNum(0), 0, MakeLevel(0), CategorySSTableData); v == nil {
if v := h.Get(base.DiskFileNum(0), 0, base.MakeLevel(0), CategorySSTableData); v == nil {
t.Fatalf("expected to find block 0/0")
} else {
v.Release()
}
if v := h.Get(base.DiskFileNum(1), 0, MakeLevel(0), CategorySSTableData); v != nil {
if v := h.Get(base.DiskFileNum(1), 0, base.MakeLevel(0), CategorySSTableData); v != nil {
t.Fatalf("expected to not find block 1/0")
}
// Deleting a non-existing block does nothing.
Expand Down Expand Up @@ -196,11 +196,11 @@ func TestMultipleDBs(t *testing.T) {
if expected, size := int64(5), cache.Size(); expected != size {
t.Fatalf("expected cache size %d, but found %d", expected, size)
}
v := h1.Get(base.DiskFileNum(0), 0, MakeLevel(0), CategorySSTableData)
v := h1.Get(base.DiskFileNum(0), 0, base.MakeLevel(0), CategorySSTableData)
if v != nil {
t.Fatalf("expected not present, but found %#v", v)
}
v = h2.Get(base.DiskFileNum(0), 0, MakeLevel(0), CategorySSTableData)
v = h2.Get(base.DiskFileNum(0), 0, base.MakeLevel(0), CategorySSTableData)
if v := v.RawBuffer(); string(v) != "bbbbb" {
t.Fatalf("expected bbbbb, but found %s", v)
}
Expand Down Expand Up @@ -308,7 +308,10 @@ func BenchmarkCacheGet(b *testing.B) {
for pb.Next() {
randVal := pcg.Uint64()
offset := randVal % size
level := Level{levelPlusOne: int8((randVal >> 32) % NumLevels)}
var level base.Level
if l := int((randVal >> 32) % NumLevels); l > 0 {
level = base.MakeLevel(l - 1)
}
category := Category((randVal >> 48) % uint64(NumCategories))
v := h.Get(base.DiskFileNum(0), offset, level, category)
if v == nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/cache/clockpro.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *shard) init(maxSize int64) {
//
// If peekOnly is true, the state of the cache is not modified to reflect the
// access.
func (c *shard) get(k key, level Level, category Category, peekOnly bool) *Value {
func (c *shard) get(k key, level base.Level, category Category, peekOnly bool) *Value {
c.mu.RLock()
if e, _ := c.blocks.Get(k); e != nil {
if value := e.acquireValue(); value != nil {
Expand All @@ -148,14 +148,14 @@ func (c *shard) get(k key, level Level, category Category, peekOnly bool) *Value
}
c.mu.RUnlock()
if category != CategoryHidden {
c.counters[level.index()][category].hits.Add(1)
c.counters[levelIndex(level)][category].hits.Add(1)
}
return value
}
}
c.mu.RUnlock()
if category != CategoryHidden {
c.counters[level.index()][category].misses.Add(1)
c.counters[levelIndex(level)][category].misses.Add(1)
}
return nil
}
Expand All @@ -165,7 +165,7 @@ func (c *shard) get(k key, level Level, category Category, peekOnly bool) *Value
// is not in the cache (nil Value), a non-nil readEntry is returned (in which
// case the caller is responsible to dereference the entry, via one of
// unrefAndTryRemoveFromMap(), setReadValue(), setReadError()).
func (c *shard) getWithReadEntry(k key, level Level, category Category) (*Value, *readEntry) {
func (c *shard) getWithReadEntry(k key, level base.Level, category Category) (*Value, *readEntry) {
c.mu.RLock()
if e, _ := c.blocks.Get(k); e != nil {
if value := e.acquireValue(); value != nil {
Expand All @@ -174,13 +174,13 @@ func (c *shard) getWithReadEntry(k key, level Level, category Category) (*Value,
e.referenced.Store(true)
}
c.mu.RUnlock()
c.counters[level.index()][category].hits.Add(1)
c.counters[levelIndex(level)][category].hits.Add(1)
return value, nil
}
}
re := c.readShard.acquireReadEntry(k)
c.mu.RUnlock()
c.counters[level.index()][category].misses.Add(1)
c.counters[levelIndex(level)][category].misses.Add(1)
return nil, re
}

Expand Down
Loading