Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions lmdbenv/limitscanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@ import (
"github.com/PowerDNS/lmdb-go/lmdbscan"
)

// ErrLimitReached is returned when the LimitScanner reaches the configured limit
var ErrLimitReached = errors.New("limit reached")

func NewLimitScanner(opt Options) (*LimitScanner, error) {
if opt.Txn == nil {
panic("limit scanner requires Options.Txn")
return nil, errors.New("limit scanner requires Options.Txn")
}
if opt.DBI == 0 {
panic("limit scanner requires Options.DBI")
return nil, errors.New("limit scanner requires Options.DBI")
}
if opt.LimitDurationCheckEvery <= 0 {
opt.LimitDurationCheckEvery = LimitDurationCheckEveryDefault
Expand Down Expand Up @@ -64,15 +61,15 @@ func (c LimitCursor) IsZero() bool {
// LimitScanner allows iteration over chunks of the LMDB for processing.
// The chunk size can either be given as a number or as a time limit.
type LimitScanner struct {
opt Options
sc *lmdbscan.Scanner
count int
deadline time.Time
err error
opt Options
sc *lmdbscan.Scanner
count int
deadline time.Time
limitReached bool
}

func (s *LimitScanner) Scan() bool {
if s.err != nil {
if s.limitReached {
return false
}

Expand All @@ -81,7 +78,7 @@ func (s *LimitScanner) Scan() bool {
// If the entry still exists, it will be the first one (basically
// rescanning the last entry). If it is gone, we will start from the
// next one after that.
//s.sc.SetNext(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange, lmdb.Next)
// s.sc.SetNext(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange, lmdb.Next)
s.sc.Set(s.opt.Last.key, s.opt.Last.val, lmdb.SetRange)
if bytes.Equal(s.Key(), s.opt.Last.key) && bytes.Equal(s.Val(), s.opt.Last.val) {
// Advance one
Expand All @@ -91,15 +88,15 @@ func (s *LimitScanner) Scan() bool {

// Check number-of-records limit
if s.opt.LimitRecords > 0 && s.count >= s.opt.LimitRecords {
s.err = ErrLimitReached
s.limitReached = true
return false
}

// Check time limit
checkEvery := s.opt.LimitDurationCheckEvery
if checkEvery > 0 && s.count > 0 && s.count%checkEvery == 0 && !s.deadline.IsZero() {
if time.Now().After(s.deadline) {
s.err = ErrLimitReached
s.limitReached = true
return false
}
}
Expand All @@ -108,15 +105,14 @@ func (s *LimitScanner) Scan() bool {
return s.sc.Scan()
}

func (s *LimitScanner) Last() LimitCursor {
// Cursor returns the LimitCursor for the next LimitScanner to use in
// (Options).Cursor, and whether the limit was reached by this LimitScanner
// (meaning this cursor is actually useful).
func (s *LimitScanner) Cursor() (c LimitCursor, limitReached bool) {
return LimitCursor{
key: s.Key(),
val: s.Val(),
}
}

func (s *LimitScanner) Cursor() *lmdb.Cursor {
return s.sc.Cursor()
}, s.limitReached
}

func (s *LimitScanner) Key() []byte {
Expand All @@ -128,9 +124,6 @@ func (s *LimitScanner) Val() []byte {
}

func (s *LimitScanner) Err() error {
if s.err != nil {
return s.err
}
return s.sc.Err()
}

Expand Down
46 changes: 28 additions & 18 deletions lmdbenv/limitscanner/scanner_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package limitscanner

import (
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -33,28 +32,31 @@ func TestLimitScanner(t *testing.T) {
require.NoError(t, err)

var last LimitCursor
var limitReached bool
t.Run("limited-scan", func(t *testing.T) {
err = env.View(func(txn *lmdb.Txn) error {
ls, err := NewLimitScanner(Options{
Txn: txn,
DBI: dbi,
LimitRecords: 100,
})
require.NoError(t, err)
assert.NoError(t, err)
defer ls.Close()

count := 0
for ls.Scan() {
count++
}
assert.Equal(t, 100, count)

last = ls.Last()
last, limitReached = ls.Cursor()
assert.Equal(t, "key-00100", string(last.key))
assert.Equal(t, "val-00100", string(last.val))

return ls.Err()
})
require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached")
require.NoError(t, err)
require.True(t, limitReached, "expected a limited scan")
})

t.Run("limited-scan-continued", func(t *testing.T) {
Expand All @@ -66,21 +68,22 @@ func TestLimitScanner(t *testing.T) {
LimitRecords: 100,
Last: last, // Note this added cursor
})
require.NoError(t, err)
assert.NoError(t, err)
defer ls.Close()

count := 0
for ls.Scan() {
count++
}
assert.Equal(t, 100, count)

last = ls.Last()
last, limitReached = ls.Cursor()
assert.Equal(t, "key-00200", string(last.key))
assert.Equal(t, "val-00200", string(last.val))

return ls.Err()
})
require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached")
require.True(t, limitReached, "expected a limited scan")
})

t.Run("limited-scan-continued-deleted", func(t *testing.T) {
Expand All @@ -96,21 +99,23 @@ func TestLimitScanner(t *testing.T) {
LimitRecords: 10,
Last: last,
})
require.NoError(t, err)
assert.NoError(t, err)
defer ls.Close()

count := 0
for ls.Scan() {
count++
}
assert.Equal(t, 10, count)

last = ls.Last()
last, limitReached = ls.Cursor()
assert.Equal(t, "key-00210", string(last.key))
assert.Equal(t, "val-00210", string(last.val))

return ls.Err()
})
require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached")
require.NoError(t, err)
require.True(t, limitReached, "expected a limited scan")
})

t.Run("limited-scan-final", func(t *testing.T) {
Expand All @@ -122,21 +127,23 @@ func TestLimitScanner(t *testing.T) {
LimitRecords: 100,
Last: last,
})
require.NoError(t, err)
assert.NoError(t, err)
defer ls.Close()

count := 0
for ls.Scan() {
count++
}
assert.Equal(t, 40, count)

last = ls.Last()
last, limitReached = ls.Cursor()
assert.Nil(t, last.key)
assert.Nil(t, last.val)

return ls.Err()
})
require.NoError(t, err)
require.False(t, limitReached, "unexpected limited scan")
})

t.Run("limited-by-time", func(t *testing.T) {
Expand All @@ -148,7 +155,8 @@ func TestLimitScanner(t *testing.T) {
LimitDuration: time.Nanosecond, // very short
LimitDurationCheckEvery: 50, // check time every 50 records
})
require.NoError(t, err)
assert.NoError(t, err)
defer ls.Close()

count := 0
for ls.Scan() {
Expand All @@ -158,13 +166,14 @@ func TestLimitScanner(t *testing.T) {
// before we realize we passed the short deadline.
assert.Equal(t, 50, count)

last = ls.Last()
last, limitReached = ls.Cursor()
assert.Equal(t, "key-00050", string(last.key))
assert.Equal(t, "val-00050", string(last.val))

return ls.Err()
})
require.True(t, errors.Is(err, ErrLimitReached), "expected ErrLimitReached")
require.NoError(t, err)
require.True(t, limitReached, "expected a limited scan")
})

t.Run("limited-by-plenty-of-time", func(t *testing.T) {
Expand All @@ -176,7 +185,8 @@ func TestLimitScanner(t *testing.T) {
LimitDuration: time.Second, // an eternity
LimitDurationCheckEvery: 50, // check time every 50 records
})
require.NoError(t, err)
assert.NoError(t, err)
defer ls.Close()

count := 0
for ls.Scan() {
Expand All @@ -186,17 +196,17 @@ func TestLimitScanner(t *testing.T) {
// (note that we deleted one before)
assert.Equal(t, 249, count)

last = ls.Last()
last, limitReached = ls.Cursor()
assert.Nil(t, last.key)
assert.Nil(t, last.val)

return ls.Err()
})
require.NoError(t, err)
require.False(t, limitReached, "unexpected limited scan")
})

return nil
})
require.NoError(t, err)

}
8 changes: 5 additions & 3 deletions syncer/sweeper/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Sweeper struct {
}

// Run runs the sweeper according to the configured schedule.
// It only runs when an error occurs or the context is closed.
// It only runs until an error occurs or the context is closed.
func (s *Sweeper) Run(ctx context.Context) error {
wait := s.conf.FirstInterval
for {
Expand Down Expand Up @@ -108,6 +108,7 @@ func (s *Sweeper) sweep(ctx context.Context) error {
l.Debug("Sweep DBI")

var last limitscanner.LimitCursor
var limitReached bool
for {
err := s.env.Update(func(txn *lmdb.Txn) error {
st.nTxn++
Expand All @@ -127,6 +128,7 @@ func (s *Sweeper) sweep(ctx context.Context) error {
if err != nil {
return err // configuration error
}
defer ls.Close()

// Actual cleaning
for ls.Scan() {
Expand Down Expand Up @@ -156,10 +158,10 @@ func (s *Sweeper) sweep(ctx context.Context) error {
}
}

last = ls.Last()
last, limitReached = ls.Cursor()
return ls.Err()
})
if errors.Is(err, limitscanner.ErrLimitReached) {
if limitReached {
l.Debug("Sweep limit reached, continuing after pause")
// Give the app some room to get a write lock before continuing
if err := utils.SleepContext(ctx, s.conf.ReleaseDuration); err != nil {
Expand Down
18 changes: 15 additions & 3 deletions syncer/sweeper/sweeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func BenchmarkSweeper(b *testing.B) {
RetentionDays: 2,
Interval: time.Second,
FirstInterval: 0,
LockDuration: 100 * time.Millisecond, // Forces split operation
LockDuration: 10 * time.Millisecond, // Forces split operation
ReleaseDuration: 0,
}

Expand All @@ -116,15 +116,15 @@ func BenchmarkSweeper(b *testing.B) {
var dbi lmdb.DBI
err := env.Update(func(txn *lmdb.Txn) error {
var err error
dbi, err = txn.CreateDBI("test1")
dbi, err = txn.CreateDBI(name)
return err
})
assert.NoError(b, err)
return dbi
}

now := time.Now()
past := now.Add(-50 * time.Hour) // longer than RetentionDays=2
past := now.Add(-3 * 24 * time.Hour) // longer than RetentionDays=2
nowTS := header.TimestampFromTime(now)
pastTS := header.TimestampFromTime(past)

Expand Down Expand Up @@ -155,6 +155,18 @@ func BenchmarkSweeper(b *testing.B) {
b.ResetTimer()
err := sweeper.sweep(b.Context())
b.StopTimer()

if b.N > 8 {
assert.NoError(b, env.Update(func(txn *lmdb.Txn) error {
stat, err := txn.Stat(mix)
if err != nil {
return err
}
assert.Greater(b, b.N-int(stat.Entries), b.N/4, "less than a 1/4 entries were cleaned")
return nil
}))
}

// This is the most interesting metric
b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "entries/s")

Expand Down