Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark_runner/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "bufio"
// needed to run an insert or benchmark benchmark.
type Benchmark interface {
// GetCmdDecoder returns the DocDecoder to use for this Benchmark
GetCmdDecoder(br *bufio.Reader) DocDecoder
GetCmdDecoder(br *bufio.Reader, maxTokenSizeMB uint) DocDecoder

// GetBatchFactory returns the BatchFactory to use for this Benchmark
GetBatchFactory() BatchFactory
Expand Down
6 changes: 4 additions & 2 deletions benchmark_runner/benchmark_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BenchmarkRunner struct {
start time.Time
end time.Time
file *os.File
maxTokenSizeMB uint
// time-based run support
Duration time.Duration

Expand Down Expand Up @@ -295,6 +296,7 @@ func GetBenchmarkRunnerWithBatchSize(batchSize uint) *BenchmarkRunner {
flag.Uint64Var(&loader.maxRPS, "max-rps", 0, "enable limiting the rate of queries per second, 0 = no limit. By default no limit is specified and the binaries will stress the DB up to the maximum. A normal \"modus operandi\" would be to initially stress the system ( no limit on RPS) and afterwards that we know the limit vary with lower rps configurations.")
flag.StringVar(&loader.JsonOutFile, "json-out-file", "", "Name of json output file to output benchmark results. If not set, will not print to json.")
flag.StringVar(&loader.Metadata, "metadata-string", "", "Metadata string to add to json-out-file. If -json-out-file is not set, will not use this option.")
flag.UintVar(&loader.maxTokenSizeMB, "max-token-size-mb", 1, "Maximum size of token to read from input file in MB. Minimum is 1MB.")
return loader
}

Expand Down Expand Up @@ -400,7 +402,7 @@ func (l *BenchmarkRunner) GetResetReaderFunc(b Benchmark) func() (*bufio.Reader,
return nil, nil
}
newReader := bufio.NewReaderSize(file, defaultReadSize)
return newReader, b.GetCmdDecoder(newReader)
return newReader, b.GetCmdDecoder(newReader, l.maxTokenSizeMB)
}
}

Expand Down Expand Up @@ -444,7 +446,7 @@ func (l *BenchmarkRunner) scan(b Benchmark, channels []*duplexChannel, start tim
}

resetFn := l.GetResetReaderFunc(b)
return scanWithTimeout(ctx, channels, 100, l.limit, l.Duration, l.br, b.GetCmdDecoder(l.br), b.GetBatchFactory(), b.GetCommandIndexer(uint(len(channels))), resetFn)
return scanWithTimeout(ctx, channels, 100, l.limit, l.Duration, l.br, b.GetCmdDecoder(l.br, l.maxTokenSizeMB), b.GetBatchFactory(), b.GetCommandIndexer(uint(len(channels))), resetFn)
}

// work is the processing function for each worker in the loader
Expand Down
4 changes: 2 additions & 2 deletions cmd/ftsb_redisearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func (i *RedisIndexer) GetIndex(itemsRead uint64, p *benchmark_runner.DocHolder)
return int(uint(itemsRead) % i.partitions)
}

func (b *benchmark) GetCmdDecoder(br *bufio.Reader) benchmark_runner.DocDecoder {
func (b *benchmark) GetCmdDecoder(br *bufio.Reader, maxTokenSizeMB uint) benchmark_runner.DocDecoder {
scanner := bufio.NewScanner(br)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
scanner.Buffer(buf, int(maxTokenSizeMB*1024*1024))
return &decoder{scanner: scanner}
}

Expand Down