diff --git a/lmdbenv/limitscanner/scanner.go b/lmdbenv/limitscanner/scanner.go index 868881f..9e533a1 100644 --- a/lmdbenv/limitscanner/scanner.go +++ b/lmdbenv/limitscanner/scanner.go @@ -9,15 +9,12 @@ import ( "github.com/PowerDNS/lmdb-go/lmdbscan" ) -// ErrLimitReached is returned when the LimitScanner reaches the configured limit -var ErrLimitReached = errors.New("limit reached") - func NewLimitScanner(opt Options) (*LimitScanner, error) { if opt.Txn == nil { - panic("limit scanner requires Options.Txn") + return nil, errors.New("limit scanner requires Options.Txn") } if opt.DBI == 0 { - panic("limit scanner requires Options.DBI") + return nil, errors.New("limit scanner requires Options.DBI") } if opt.LimitDurationCheckEvery <= 0 { opt.LimitDurationCheckEvery = LimitDurationCheckEveryDefault @@ -64,15 +61,15 @@ func (c LimitCursor) IsZero() bool { // LimitScanner allows iteration over chunks of the LMDB for processing. // The chunk size can either be given as a number or as a time limit. type LimitScanner struct { - opt Options - sc *lmdbscan.Scanner - count int - deadline time.Time - err error + opt Options + sc *lmdbscan.Scanner + count int + deadline time.Time + limitReached bool } func (s *LimitScanner) Scan() bool { - if s.err != nil { + if s.limitReached { return false } @@ -81,7 +78,7 @@ func (s *LimitScanner) Scan() bool { // If the entry still exists, it will be the first one (basically // rescanning the last entry). If it is gone, we will start from the // next one after that. - //s.sc.SetNext(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange, lmdb.Next) + // s.sc.SetNext(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange, lmdb.Next) s.sc.Set(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange) if bytes.Equal(s.Key(), s.opt.Last.key) && bytes.Equal(s.Val(), s.opt.Last.val) { // Advance one @@ -91,7 +88,7 @@ func (s *LimitScanner) Scan() bool { // Check number-of-records limit if s.opt.LimitRecords > 0 && s.count >= s.opt.LimitRecords { - s.err = ErrLimitReached + s.limitReached = true return false } @@ -99,7 +96,7 @@ func (s *LimitScanner) Scan() bool { checkEvery := s.opt.LimitDurationCheckEvery if checkEvery > 0 && s.count > 0 && s.count%checkEvery == 0 && !s.deadline.IsZero() { if time.Now().After(s.deadline) { - s.err = ErrLimitReached + s.limitReached = true return false } } @@ -108,15 +105,14 @@ func (s *LimitScanner) Scan() bool { return s.sc.Scan() } -func (s *LimitScanner) Last() LimitCursor { +// Cursor returns the LimitCursor for the next LimitScanner to use in +// (Options).Cursor, and whether the limit was reached by this LimitScanner +// (meaning this cursor is actually useful). +func (s *LimitScanner) Cursor() (c LimitCursor, limitReached bool) { return LimitCursor{ key: s.Key(), val: s.Val(), - } -} - -func (s *LimitScanner) Cursor() *lmdb.Cursor { - return s.sc.Cursor() + }, s.limitReached } func (s *LimitScanner) Key() []byte { @@ -128,9 +124,6 @@ func (s *LimitScanner) Val() []byte { } func (s *LimitScanner) Err() error { - if s.err != nil { - return s.err - } return s.sc.Err() } diff --git a/lmdbenv/limitscanner/scanner_test.go b/lmdbenv/limitscanner/scanner_test.go index 4d3fadd..a784077 100644 --- a/lmdbenv/limitscanner/scanner_test.go +++ b/lmdbenv/limitscanner/scanner_test.go @@ -1,7 +1,6 @@ package limitscanner import ( - "errors" "fmt" "testing" "time" @@ -33,6 +32,7 @@ func TestLimitScanner(t *testing.T) { require.NoError(t, err) var last LimitCursor + var limitReached bool t.Run("limited-scan", func(t *testing.T) { err = env.View(func(txn *lmdb.Txn) error { ls, err := NewLimitScanner(Options{ @@ -40,7 +40,8 @@ func TestLimitScanner(t *testing.T) { DBI: dbi, LimitRecords: 100, }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -48,13 +49,14 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 100, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00100", string(last.key)) assert.Equal(t, "val-00100", string(last.val)) return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.NoError(t, err) + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-scan-continued", func(t *testing.T) { @@ -66,7 +68,8 @@ func TestLimitScanner(t *testing.T) { LimitRecords: 100, Last: last, // Note this added cursor }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -74,13 +77,13 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 100, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00200", string(last.key)) assert.Equal(t, "val-00200", string(last.val)) return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-scan-continued-deleted", func(t *testing.T) { @@ -96,7 +99,8 @@ func TestLimitScanner(t *testing.T) { LimitRecords: 10, Last: last, }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -104,13 +108,14 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 10, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00210", string(last.key)) assert.Equal(t, "val-00210", string(last.val)) return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.NoError(t, err) + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-scan-final", func(t *testing.T) { @@ -122,7 +127,8 @@ func TestLimitScanner(t *testing.T) { LimitRecords: 100, Last: last, }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -130,13 +136,14 @@ func TestLimitScanner(t *testing.T) { } assert.Equal(t, 40, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Nil(t, last.key) assert.Nil(t, last.val) return ls.Err() }) require.NoError(t, err) + require.False(t, limitReached, "unexpected limited scan") }) t.Run("limited-by-time", func(t *testing.T) { @@ -148,7 +155,8 @@ func TestLimitScanner(t *testing.T) { LimitDuration: time.Nanosecond, // very short LimitDurationCheckEvery: 50, // check time every 50 records }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -158,13 +166,14 @@ func TestLimitScanner(t *testing.T) { // before we realize we passed the short deadline. assert.Equal(t, 50, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Equal(t, "key-00050", string(last.key)) assert.Equal(t, "val-00050", string(last.val)) return ls.Err() }) - require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached") + require.NoError(t, err) + require.True(t, limitReached, "expected a limited scan") }) t.Run("limited-by-plenty-of-time", func(t *testing.T) { @@ -176,7 +185,8 @@ func TestLimitScanner(t *testing.T) { LimitDuration: time.Second, // an eternity LimitDurationCheckEvery: 50, // check time every 50 records }) - require.NoError(t, err) + assert.NoError(t, err) + defer ls.Close() count := 0 for ls.Scan() { @@ -186,17 +196,17 @@ func TestLimitScanner(t *testing.T) { // (note that we deleted one before) assert.Equal(t, 249, count) - last = ls.Last() + last, limitReached = ls.Cursor() assert.Nil(t, last.key) assert.Nil(t, last.val) return ls.Err() }) require.NoError(t, err) + require.False(t, limitReached, "unexpected limited scan") }) return nil }) require.NoError(t, err) - } diff --git a/syncer/sweeper/sweeper.go b/syncer/sweeper/sweeper.go index 596e3ef..8094bcf 100644 --- a/syncer/sweeper/sweeper.go +++ b/syncer/sweeper/sweeper.go @@ -49,7 +49,7 @@ type Sweeper struct { } // Run runs the sweeper according to the configured schedule. -// It only runs when an error occurs or the context is closed. +// It only runs until an error occurs or the context is closed. func (s *Sweeper) Run(ctx context.Context) error { wait := s.conf.FirstInterval for { @@ -108,6 +108,7 @@ func (s *Sweeper) sweep(ctx context.Context) error { l.Debug("Sweep DBI") var last limitscanner.LimitCursor + var limitReached bool for { err := s.env.Update(func(txn *lmdb.Txn) error { st.nTxn++ @@ -127,6 +128,7 @@ func (s *Sweeper) sweep(ctx context.Context) error { if err != nil { return err // configuration error } + defer ls.Close() // Actual cleaning for ls.Scan() { @@ -156,10 +158,10 @@ func (s *Sweeper) sweep(ctx context.Context) error { } } - last = ls.Last() + last, limitReached = ls.Cursor() return ls.Err() }) - if errors.Is(err, limitscanner.ErrLimitReached) { + if limitReached { l.Debug("Sweep limit reached, continuing after pause") // Give the app some room to get a write lock before continuing if err := utils.SleepContext(ctx, s.conf.ReleaseDuration); err != nil { diff --git a/syncer/sweeper/sweeper_test.go b/syncer/sweeper/sweeper_test.go index 45bf50a..9a73d86 100644 --- a/syncer/sweeper/sweeper_test.go +++ b/syncer/sweeper/sweeper_test.go @@ -103,7 +103,7 @@ func BenchmarkSweeper(b *testing.B) { RetentionDays: 2, Interval: time.Second, FirstInterval: 0, - LockDuration: 100 * time.Millisecond, // Forces split operation + LockDuration: 10 * time.Millisecond, // Forces split operation ReleaseDuration: 0, } @@ -116,7 +116,7 @@ func BenchmarkSweeper(b *testing.B) { var dbi lmdb.DBI err := env.Update(func(txn *lmdb.Txn) error { var err error - dbi, err = txn.CreateDBI("test1") + dbi, err = txn.CreateDBI(name) return err }) assert.NoError(b, err) @@ -124,7 +124,7 @@ func BenchmarkSweeper(b *testing.B) { } now := time.Now() - past := now.Add(-50 * time.Hour) // longer than RetentionDays=2 + past := now.Add(-3 * 24 * time.Hour) // longer than RetentionDays=2 nowTS := header.TimestampFromTime(now) pastTS := header.TimestampFromTime(past) @@ -155,6 +155,18 @@ func BenchmarkSweeper(b *testing.B) { b.ResetTimer() err := sweeper.sweep(b.Context()) b.StopTimer() + + if b.N > 8 { + assert.NoError(b, env.Update(func(txn *lmdb.Txn) error { + stat, err := txn.Stat(mix) + if err != nil { + return err + } + assert.Greater(b, b.N-int(stat.Entries), b.N/4, "less than a 1/4 entries were cleaned") + return nil + })) + } + // This is the most interesting metric b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "entries/s")