diff --git a/integrity_test.go b/integrity_test.go index d4522fd..2afb068 100644 --- a/integrity_test.go +++ b/integrity_test.go @@ -28,6 +28,7 @@ import ( "os" "os/exec" "path/filepath" + "sort" "strconv" "strings" "testing" @@ -58,11 +59,13 @@ type Flog struct { // - Download parquet files from the store created by Parseable for the minute // - Compare the sent logs with the ones loaded from the downloaded parquet func TestIntegrity(t *testing.T) { - CreateStream(t, NewGlob.QueryClient, NewGlob.Stream) + t.Parallel() + stream := uniqueStream(t) + CreateStream(t, NewGlob.QueryClient, stream) iterations := 1 flogsPerIteration := 100 - parseableSyncWait := 3 * time.Minute // NOTE: This needs to be in sync with Parseable's. + parseableSyncWait := 2 * time.Minute // NOTE: This needs to be in sync with Parseable's. // - Generate log files using `flog` // - Load them into `Flog` structs @@ -70,8 +73,9 @@ func TestIntegrity(t *testing.T) { flogs := make([]Flog, 0, iterations*flogsPerIteration) - for i := 0; i < iterations; i++ { - flogsFile := fmt.Sprintf("%d.log", i) + tmpDir := t.TempDir() + for i := range iterations { + flogsFile := filepath.Join(tmpDir, fmt.Sprintf("%s_%d_%s.log", stream, i, randSuffix())) err := exec.Command("flog", "--number", strconv.Itoa(flogsPerIteration), @@ -85,7 +89,7 @@ func TestIntegrity(t *testing.T) { loadedFlogs := loadFlogsFromFile(flogsFile) - err = ingestFlogs(loadedFlogs, NewGlob.Stream) + err = ingestFlogs(loadedFlogs, stream) if err != nil { t.Fatal("error ingesting flogs", err) } @@ -101,18 +105,26 @@ func TestIntegrity(t *testing.T) { // XXX: We don't need to sleep for the entire minute, just until the next minute boundary. } - parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig) + parquetFiles := downloadParquetFiles(stream, NewGlob.MinIoConfig) actualFlogs := loadFlogsFromParquetFiles(parquetFiles) require.Equal(t, len(flogs), len(actualFlogs), "row count mismatch") - // Parseable now writes rows in ascending order, so we compare directly. + // Row order from parquet not deterministic under concurrent writes. + // Sort both slices on a stable composite key before compare. + sortKey := func(f Flog) string { + return fmt.Sprintf("%s|%s|%s|%s|%v|%v|%s", + f.Timestamp, f.Host, f.UserId, f.Request, f.Status, f.ByteCount, f.Referer) + } + sort.Slice(flogs, func(i, j int) bool { return sortKey(flogs[i]) < sortKey(flogs[j]) }) + sort.Slice(actualFlogs, func(i, j int) bool { return sortKey(actualFlogs[i]) < sortKey(actualFlogs[j]) }) + for i, expectedFlog := range flogs { actualFlog := actualFlogs[i] require.Equal(t, expectedFlog, actualFlog) } - DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream) + DeleteStream(t, NewGlob.QueryClient, stream) } func ingestFlogs(flogs []Flog, stream string) error { @@ -248,7 +260,7 @@ func loadFlogsFromParquetFile(path string) []Flog { refererCol := getStringCol("referer") flogs := make([]Flog, numRows) - for i := 0; i < numRows; i++ { + for i := range numRows { flogs[i] = Flog{ Host: hostCol.Value(i), UserId: userIdCol.Value(i), diff --git a/main.sh b/main.sh index 5ef99ae..76ed671 100755 --- a/main.sh +++ b/main.sh @@ -39,13 +39,8 @@ ingestor_username=${13} ingestor_password=${14} stream_name=$(head /dev/urandom | tr -dc a-z | head -c10) -parallel_flag="" -if [ "$mode" = "load-parallel" ]; then - parallel_flag="-test.parallel=10" -fi - run () { - ./quest.test -test.v $parallel_flag -mode="$mode" -query-url="$endpoint" -stream="$stream_name" -query-user="$username" -query-pass="$password" -minio-url="$minio_url" -minio-user="$minio_access_key" -minio-pass="$minio_secret_key" -minio-bucket="$minio_bucket" -ingestor-url="$ingestor_endpoint" -ingestor-user="$ingestor_username" -ingestor-pass="$ingestor_password" + ./quest.test -test.v -mode="$mode" -query-url="$endpoint" -stream="$stream_name" -query-user="$username" -query-pass="$password" -minio-url="$minio_url" -minio-user="$minio_access_key" -minio-pass="$minio_secret_key" -minio-bucket="$minio_bucket" -ingestor-url="$ingestor_endpoint" -ingestor-user="$ingestor_username" -ingestor-pass="$ingestor_password" return $? } diff --git a/quest_test.go b/quest_test.go index b1e141f..c9866aa 100644 --- a/quest_test.go +++ b/quest_test.go @@ -727,8 +727,3 @@ func TestLoadStreamNoBatchWithCustomPartitionWithK6(t *testing.T) { DeleteStream(t, client, stream) } - -func TestDeleteStream(t *testing.T) { - client := testClient(t) - DeleteStream(t, client, NewGlob.Stream) -}