Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -58,20 +59,23 @@ type Flog struct {
// - Download parquet files from the store created by Parseable for the minute
// - Compare the sent logs with the ones loaded from the downloaded parquet
func TestIntegrity(t *testing.T) {
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
t.Parallel()
stream := uniqueStream(t)
CreateStream(t, NewGlob.QueryClient, stream)
iterations := 1
flogsPerIteration := 100

parseableSyncWait := 3 * time.Minute // NOTE: This needs to be in sync with Parseable's.
parseableSyncWait := 2 * time.Minute // NOTE: This needs to be in sync with Parseable's.

// - Generate log files using `flog`
// - Load them into `Flog` structs
// - Ingest them into Parseable

flogs := make([]Flog, 0, iterations*flogsPerIteration)

for i := 0; i < iterations; i++ {
flogsFile := fmt.Sprintf("%d.log", i)
tmpDir := t.TempDir()
for i := range iterations {
flogsFile := filepath.Join(tmpDir, fmt.Sprintf("%s_%d_%s.log", stream, i, randSuffix()))

err := exec.Command("flog",
"--number", strconv.Itoa(flogsPerIteration),
Expand All @@ -85,7 +89,7 @@ func TestIntegrity(t *testing.T) {

loadedFlogs := loadFlogsFromFile(flogsFile)

err = ingestFlogs(loadedFlogs, NewGlob.Stream)
err = ingestFlogs(loadedFlogs, stream)
if err != nil {
t.Fatal("error ingesting flogs", err)
}
Expand All @@ -101,18 +105,26 @@ func TestIntegrity(t *testing.T) {
// XXX: We don't need to sleep for the entire minute, just until the next minute boundary.
}

parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
parquetFiles := downloadParquetFiles(stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

require.Equal(t, len(flogs), len(actualFlogs), "row count mismatch")

// Parseable now writes rows in ascending order, so we compare directly.
// Row order from parquet not deterministic under concurrent writes.
// Sort both slices on a stable composite key before compare.
sortKey := func(f Flog) string {
return fmt.Sprintf("%s|%s|%s|%s|%v|%v|%s",
f.Timestamp, f.Host, f.UserId, f.Request, f.Status, f.ByteCount, f.Referer)
}
sort.Slice(flogs, func(i, j int) bool { return sortKey(flogs[i]) < sortKey(flogs[j]) })
sort.Slice(actualFlogs, func(i, j int) bool { return sortKey(actualFlogs[i]) < sortKey(actualFlogs[j]) })

for i, expectedFlog := range flogs {
actualFlog := actualFlogs[i]
require.Equal(t, expectedFlog, actualFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
DeleteStream(t, NewGlob.QueryClient, stream)
}

func ingestFlogs(flogs []Flog, stream string) error {
Expand Down Expand Up @@ -248,7 +260,7 @@ func loadFlogsFromParquetFile(path string) []Flog {
refererCol := getStringCol("referer")

flogs := make([]Flog, numRows)
for i := 0; i < numRows; i++ {
for i := range numRows {
flogs[i] = Flog{
Host: hostCol.Value(i),
UserId: userIdCol.Value(i),
Expand Down
7 changes: 1 addition & 6 deletions main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,8 @@ ingestor_username=${13}
ingestor_password=${14}
stream_name=$(head /dev/urandom | tr -dc a-z | head -c10)

parallel_flag=""
if [ "$mode" = "load-parallel" ]; then
parallel_flag="-test.parallel=10"
fi

run () {
./quest.test -test.v $parallel_flag -mode="$mode" -query-url="$endpoint" -stream="$stream_name" -query-user="$username" -query-pass="$password" -minio-url="$minio_url" -minio-user="$minio_access_key" -minio-pass="$minio_secret_key" -minio-bucket="$minio_bucket" -ingestor-url="$ingestor_endpoint" -ingestor-user="$ingestor_username" -ingestor-pass="$ingestor_password"
./quest.test -test.v -mode="$mode" -query-url="$endpoint" -stream="$stream_name" -query-user="$username" -query-pass="$password" -minio-url="$minio_url" -minio-user="$minio_access_key" -minio-pass="$minio_secret_key" -minio-bucket="$minio_bucket" -ingestor-url="$ingestor_endpoint" -ingestor-user="$ingestor_username" -ingestor-pass="$ingestor_password"
return $?
}

Expand Down
5 changes: 0 additions & 5 deletions quest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,3 @@ func TestLoadStreamNoBatchWithCustomPartitionWithK6(t *testing.T) {

DeleteStream(t, client, stream)
}

func TestDeleteStream(t *testing.T) {
client := testClient(t)
DeleteStream(t, client, NewGlob.Stream)
}
Loading