diff --git a/benchmark_runner/benchmark.go b/benchmark_runner/benchmark.go index fbcae48..db29268 100644 --- a/benchmark_runner/benchmark.go +++ b/benchmark_runner/benchmark.go @@ -6,7 +6,7 @@ import "bufio" // needed to run an insert or benchmark benchmark. type Benchmark interface { // GetCmdDecoder returns the DocDecoder to use for this Benchmark - GetCmdDecoder(br *bufio.Reader) DocDecoder + GetCmdDecoder(br *bufio.Reader, maxTokenSizeMB uint) DocDecoder // GetBatchFactory returns the BatchFactory to use for this Benchmark GetBatchFactory() BatchFactory diff --git a/benchmark_runner/benchmark_runner.go b/benchmark_runner/benchmark_runner.go index 2f3e650..ab5a0c2 100644 --- a/benchmark_runner/benchmark_runner.go +++ b/benchmark_runner/benchmark_runner.go @@ -55,6 +55,7 @@ type BenchmarkRunner struct { start time.Time end time.Time file *os.File + maxTokenSizeMB uint // time-based run support Duration time.Duration @@ -295,6 +296,7 @@ func GetBenchmarkRunnerWithBatchSize(batchSize uint) *BenchmarkRunner { flag.Uint64Var(&loader.maxRPS, "max-rps", 0, "enable limiting the rate of queries per second, 0 = no limit. By default no limit is specified and the binaries will stress the DB up to the maximum. A normal \"modus operandi\" would be to initially stress the system ( no limit on RPS) and afterwards that we know the limit vary with lower rps configurations.") flag.StringVar(&loader.JsonOutFile, "json-out-file", "", "Name of json output file to output benchmark results. If not set, will not print to json.") flag.StringVar(&loader.Metadata, "metadata-string", "", "Metadata string to add to json-out-file. If -json-out-file is not set, will not use this option.") + flag.UintVar(&loader.maxTokenSizeMB, "max-token-size-mb", 1, "Maximum size of token to read from input file in MB. Minimum is 1MB.") return loader } @@ -400,7 +402,7 @@ func (l *BenchmarkRunner) GetResetReaderFunc(b Benchmark) func() (*bufio.Reader, return nil, nil } newReader := bufio.NewReaderSize(file, defaultReadSize) - return newReader, b.GetCmdDecoder(newReader) + return newReader, b.GetCmdDecoder(newReader, l.maxTokenSizeMB) } } @@ -444,7 +446,7 @@ func (l *BenchmarkRunner) scan(b Benchmark, channels []*duplexChannel, start tim } resetFn := l.GetResetReaderFunc(b) - return scanWithTimeout(ctx, channels, 100, l.limit, l.Duration, l.br, b.GetCmdDecoder(l.br), b.GetBatchFactory(), b.GetCommandIndexer(uint(len(channels))), resetFn) + return scanWithTimeout(ctx, channels, 100, l.limit, l.Duration, l.br, b.GetCmdDecoder(l.br, l.maxTokenSizeMB), b.GetBatchFactory(), b.GetCommandIndexer(uint(len(channels))), resetFn) } // work is the processing function for each worker in the loader diff --git a/cmd/ftsb_redisearch/main.go b/cmd/ftsb_redisearch/main.go index dd173e7..53ff691 100644 --- a/cmd/ftsb_redisearch/main.go +++ b/cmd/ftsb_redisearch/main.go @@ -68,10 +68,10 @@ func (i *RedisIndexer) GetIndex(itemsRead uint64, p *benchmark_runner.DocHolder) return int(uint(itemsRead) % i.partitions) } -func (b *benchmark) GetCmdDecoder(br *bufio.Reader) benchmark_runner.DocDecoder { +func (b *benchmark) GetCmdDecoder(br *bufio.Reader, maxTokenSizeMB uint) benchmark_runner.DocDecoder { scanner := bufio.NewScanner(br) buf := make([]byte, 0, 64*1024) - scanner.Buffer(buf, 1024*1024) + scanner.Buffer(buf, int(maxTokenSizeMB*1024*1024)) return &decoder{scanner: scanner} }