Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions .circleci/config.yml

This file was deleted.

28 changes: 28 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Integration Test

on:
push:
branches:
- main
- master
- 'v*'
pull_request:
branches:
- main
- master
- 'v*'
jobs:
integration:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'

- name: Run integration tests
run: make integration-test
15 changes: 8 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ MODULE=ftsb_redisearch
DISTDIR = ./dist

.PHONY: ftsb_redisearch
all: get test ftsb_redisearch
all: get ftsb_redisearch integration-test

# Build-time GIT variables
ifeq ($(GIT_SHA),)
Expand All @@ -32,22 +32,23 @@ build:
fmt:
$(GOFMT) ./...

ftsb_redisearch: test
ftsb_redisearch:
$(GOBUILD) \
-ldflags=$(LDFLAGS) \
-o bin/$@ ./cmd/$@
-o bin/ftsb_redisearch ./cmd/ftsb_redisearch

get:
$(GOGET) ./...

test: get
integration-test: get ftsb_redisearch
$(GOTEST) -v $(shell go list ./... | grep -v '/cmd/')

release:
$(GOGET) github.com/mitchellh/gox
$(GOGET) github.com/tcnksm/ghr
GO111MODULE=on gox -osarch ${OS_ARCHs} \
-ldflags="-X 'main.GitSHA1=$(GIT_SHA)' -X 'main.GitDirty=$(GIT_DIRTY)'" \
-output "${DISTDIR}/${BIN_NAME}_{{.OS}}_{{.Arch}}" ./cmd/ftsb_redisearch
GO111MODULE=on gox -osarch "linux/amd64 darwin/amd64 linux/arm64 darwin/arm64" \
-ldflags=$(LDFLAGS) \
-output "${DISTDIR}/${BIN_NAME}_{{.OS}}_{{.Arch}}" ./cmd/ftsb_redisearch

publish:
@for f in $(shell ls ${DISTDIR}); \
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ https://github.com/RediSearch/ftsb/releases/latest

| OS | Arch | Link |
| :--- | :---: | ---: |
| Linux | amd64 (64-bit X86) | [ftsb_redisearch-linux-amd64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch-linux-amd64.tar.gz) |
| Linux | arm64 (64-bit ARM) | [ftsb_redisearch-linux-arm64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch-linux-arm64.tar.gz) |
| Darwin | amd64 (64-bit X86) | [ftsb_redisearch-darwin-amd64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch-darwin-amd64.tar.gz) |
| Darwin | arm64 (64-bit ARM) | [ftsb_redisearch-darwin-arm64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch-darwin-arm64.tar.gz) |
| Linux | amd64 (64-bit X86) | [ftsb_redisearch_linux_amd64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch_linux_amd64.tar.gz) |
| Linux | arm64 (64-bit ARM) | [ftsb_redisearch_linux_arm64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch_linux_arm64.tar.gz) |
| Darwin | amd64 (64-bit X86) | [ftsb_redisearch_darwin_amd64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch_darwin_amd64.tar.gz) |
| Darwin | arm64 (64-bit ARM) | [ftsb_redisearch_darwin_arm64](https://github.com/RediSearch/ftsb/releases/latest/download/ftsb_redisearch_darwin_arm64.tar.gz) |

Here's how bash script to download and try it:

Expand Down
53 changes: 49 additions & 4 deletions benchmark_runner/benchmark_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"golang.org/x/time/rate"

"context"

"code.cloudfoundry.org/bytefmt"
hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
)
Expand Down Expand Up @@ -52,6 +54,9 @@ type BenchmarkRunner struct {
fileName string
start time.Time
end time.Time
file *os.File
// time-based run support
Duration time.Duration

br *bufio.Reader
detailedMapHistogramsMutex sync.RWMutex
Expand Down Expand Up @@ -285,6 +290,7 @@ func GetBenchmarkRunnerWithBatchSize(batchSize uint) *BenchmarkRunner {
flag.Uint64Var(&loader.limit, "requests", 0, "Number of total requests to issue (0 = all of the present in input file).")
flag.BoolVar(&loader.doLoad, "do-benchmark", true, "Whether to write databuild. Set this flag to false to check input read speed.")
flag.DurationVar(&loader.reportingPeriod, "reporting-period", 1*time.Second, "Period to report write stats")
flag.DurationVar(&loader.Duration, "duration", 0*time.Second, "Max duration for benchmark run (0 to disable)")
flag.StringVar(&loader.fileName, "input", "", "File name to read databuild from")
flag.Uint64Var(&loader.maxRPS, "max-rps", 0, "enable limiting the rate of queries per second, 0 = no limit. By default no limit is specified and the binaries will stress the DB up to the maximum. A normal \"modus operandi\" would be to initially stress the system ( no limit on RPS) and afterwards that we know the limit vary with lower rps configurations.")
flag.StringVar(&loader.JsonOutFile, "json-out-file", "", "Name of json output file to output benchmark results. If not set, will not print to json.")
Expand All @@ -307,6 +313,9 @@ func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint) {
requestBurst = int(l.workers) //int(b.workers)
}
var rateLimiter = rate.NewLimiter(requestRate, requestBurst)
if l.Duration > 0 && l.limit > 0 {
log.Printf("Warning! You've specified both --duration %d and --requests %d limits. --duration %d takes precedence over --requests", l.Duration, l.limit, l.Duration)
}

var wg sync.WaitGroup
for i := 0; i < int(l.workers); i++ {
Expand Down Expand Up @@ -354,6 +363,7 @@ func (l *BenchmarkRunner) GetBufferedReader() *bufio.Reader {
log.Fatalf("cannot open file for read %s: %v", l.fileName, err)
return nil
}
l.file = file // store raw file for resetting
l.br = bufio.NewReaderSize(file, defaultReadSize)
} else {
// Read from STDIN
Expand All @@ -363,6 +373,37 @@ func (l *BenchmarkRunner) GetBufferedReader() *bufio.Reader {
return l.br
}

// GetRawFile returns the raw file handle if available (used for rewinding input)
func (l *BenchmarkRunner) GetRawFile() *os.File {
return l.file
}

// GetResetReaderFunc returns a function that resets the reader and decoder if input can be reused.
func (l *BenchmarkRunner) GetResetReaderFunc(b Benchmark) func() (*bufio.Reader, DocDecoder) {
file := l.GetRawFile()
if file == nil {
return func() (*bufio.Reader, DocDecoder) {
return nil, nil
}
}
rewindCount := 0
lastLog := time.Now().Add(-15 * time.Second) // allow immediate log on first rewind
return func() (*bufio.Reader, DocDecoder) {
_, err := file.Seek(0, 0)
rewindCount++
if time.Since(lastLog) > 10*time.Second {
log.Printf("Rewinding input file: %s (rewinds so far: %d)", file.Name(), rewindCount)
lastLog = time.Now()
}
if err != nil {
log.Printf("Failed to rewind input file: %v", err)
return nil, nil
}
newReader := bufio.NewReaderSize(file, defaultReadSize)
return newReader, b.GetCmdDecoder(newReader)
}
}

// createChannels create channels from which workers would receive tasks
// Number of workers may be different from number of channels, thus we may have
// multiple workers per channel
Expand Down Expand Up @@ -392,14 +433,18 @@ func (l *BenchmarkRunner) createChannels(workQueues uint) []*duplexChannel {
// scan launches any needed reporting mechanism and proceeds to scan input databuild
// to distribute to workers
func (l *BenchmarkRunner) scan(b Benchmark, channels []*duplexChannel, start time.Time, w *tabwriter.Writer) uint64 {
// Start background reporting process
// TODO why it is here? May be it could be moved one level up?
if l.reportingPeriod.Nanoseconds() > 0 {
go l.report(l.reportingPeriod, start, w)
}
ctx := context.Background()
cancel := context.CancelFunc(func() {})
if l.Duration > 0 {
ctx, cancel = context.WithTimeout(context.Background(), l.Duration)
defer cancel()
}

// Scan incoming databuild
return scanWithIndexer(channels, 100, l.limit, l.br, b.GetCmdDecoder(l.br), b.GetBatchFactory(), b.GetCommandIndexer(uint(len(channels))))
resetFn := l.GetResetReaderFunc(b)
return scanWithTimeout(ctx, channels, 100, l.limit, l.Duration, l.br, b.GetCmdDecoder(l.br), b.GetBatchFactory(), b.GetCommandIndexer(uint(len(channels))), resetFn)
}

// work is the processing function for each worker in the loader
Expand Down
102 changes: 102 additions & 0 deletions benchmark_runner/redisearch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package benchmark_runner

import (
"encoding/json"
"os"
"os/exec"
"strings"
"testing"
"time"
)

func TestFTSBWithDuration(t *testing.T) {
entries, err := os.ReadDir("../bin")
if err != nil {
t.Fatalf("Failed to read bin/ directory: %v", err)
}

t.Log("Listing bin/ contents:")
for _, entry := range entries {
t.Logf(" - %s", entry.Name())
}
t.Log("Starting Redis container...")
dockerRun := exec.Command("docker", "run", "--rm", "-d", "-p", "6379:6379", "redis:8.0-M04-bookworm")
containerIDRaw, err := dockerRun.Output()
if err != nil {
t.Fatalf("Failed to start Redis container: %v", err)
}
containerID := strings.TrimSpace(string(containerIDRaw))

t.Cleanup(func() {
t.Log("Stopping Redis container...")
exec.Command("docker", "stop", containerID).Run()
})

t.Log("Waiting for Redis to be ready...")
time.Sleep(2 * time.Second)

t.Log("Running ftsb_redisearch with --duration=5s")
start := time.Now()
cmd := exec.Command("../bin/ftsb_redisearch",
"--input", "../testdata/minimal.csv",
"--duration=5s",
)
cmd.Env = append(os.Environ(), "REDIS_URL=redis://localhost:6379")
output, err := cmd.CombinedOutput()
duration := time.Since(start)

if err != nil {
t.Fatalf("Benchmark failed: %v\nOutput: %s", err, string(output))
}

if duration < 5*time.Second {
t.Errorf("Benchmark exited too early: ran for %v", duration)
}
if !strings.Contains(string(output), "Issued") {
t.Errorf("Expected benchmark output to contain 'Issued', got: %s", string(output))
}
}

func TestFTSBWithRequests(t *testing.T) {
t.Log("Starting Redis container...")
dockerRun := exec.Command("docker", "run", "--rm", "-d", "-p", "6379:6379", "redis:8.0-M04-bookworm")
containerIDRaw, err := dockerRun.Output()
if err != nil {
t.Fatalf("Failed to start Redis container: %v", err)
}
containerID := strings.TrimSpace(string(containerIDRaw))
t.Cleanup(func() {
t.Log("Stopping Redis container...")
exec.Command("docker", "stop", containerID).Run()
})

t.Log("Waiting for Redis to be ready...")
time.Sleep(2 * time.Second)

t.Log("Running ftsb_redisearch with --requests=50000")
jsonPath := "../testdata/results.requests.json"
cmd := exec.Command("../bin/ftsb_redisearch",
"--input", "../testdata/minimal.csv",
"--requests=50000",
"--json-out-file", jsonPath,
)
cmd.Env = append(os.Environ(), "REDIS_URL=redis://localhost:6379")
output, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("Benchmark failed: %v\nOutput: %s", err, string(output))
}

data, err := os.ReadFile(jsonPath)
if err != nil {
t.Fatalf("Failed to read json output file: %v", err)
}

var parsed map[string]interface{}
if err := json.Unmarshal(data, &parsed); err != nil {
t.Fatalf("Failed to parse JSON output: %v", err)
}

if parsed["Limit"] != float64(50000) { // json.Unmarshal converts numbers to float64
t.Errorf("Expected Limit to be 50000, got %v", parsed["Limit"])
}
}
Loading