diff --git a/.gitignore b/.gitignore index 835cd07..9fb5c90 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ out/ # Temp files x.* +*.out +*.test diagnose-*/ diff --git a/internal/cmd/root.go b/internal/cmd/root.go index e591b43..bf91bbf 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -5,6 +5,8 @@ import ( "context" "os" "os/signal" + "runtime" + "runtime/pprof" "syscall" "charm.land/fang/v2" @@ -40,6 +42,8 @@ func NewRootCommand(runnerOpts hooks.RunOptions) *cobra.Command { rootCmd.PersistentFlags(). Bool("ai-output", !term.IsTerminal(os.Stdout.Fd()), "Use sparse output for agent tooling (and robotic humans)") + rootCmd.PersistentFlags().String("memprofile", "", "write memory profile to this file") + _ = rootCmd.PersistentFlags().MarkHidden("memprofile") hooks.RegisterPersistentFlags(rootCmd.PersistentFlags()) gotestsumCmd := newGotestsumCmd(runnerOpts) @@ -88,5 +92,16 @@ func runExecute(opts ...hooks.Option) error { // Root forwards unknown flags (including go test -v) to go test; fang's -v is version. fang.WithoutVersion(), } - return fang.Execute(ctx, rootCmd, fangOpts...) + err := fang.Execute(ctx, rootCmd, fangOpts...) + + if memprofile, _ := rootCmd.Flags().GetString("memprofile"); memprofile != "" { + f, createErr := os.Create(memprofile) //nolint:gosec // G304: memprofile path requested by user + if createErr == nil { + runtime.GC() // Get up-to-date statistics + _ = pprof.WriteHeapProfile(f) + _ = f.Close() + } + } + + return err } diff --git a/internal/runner/analyze.go b/internal/runner/analyze.go index a3414b0..f5c6b10 100644 --- a/internal/runner/analyze.go +++ b/internal/runner/analyze.go @@ -2,6 +2,7 @@ package runner import ( "bufio" + "bytes" "cmp" "crypto/sha256" "encoding/csv" @@ -36,7 +37,8 @@ type TestEvent struct { Package string Test string Elapsed float64 - Output string + Output string `json:"Output"` + OutputBytes []byte `json:"-"` FailedBuild string } @@ -88,7 +90,8 @@ type aggregate struct { timeoutIters []int skipIters []int slowIters []int - outputs map[int]*strings.Builder + outputs map[int]*bytes.Buffer + logPaths map[int]string elapseds []time.Duration } @@ -244,23 +247,38 @@ var readerPool = sync.Pool{ // Analyze reads per-iteration test2json streams and classifies tests. // Malformed lines are silently skipped (go test can interleave non-JSON). -func Analyze(iterations []io.Reader, slowThreshold time.Duration) (*Report, LogMap, error) { +func Analyze(iterations []io.Reader, slowThreshold time.Duration) (*Report, LogMap, func(), error) { + tmpDir, err := os.MkdirTemp("", "testrig-analyze-*") + if err != nil { + return nil, nil, nil, err + } + cleanup := func() { + _ = os.RemoveAll(tmpDir) + } + aggs := make(map[testKey]*aggregate) interner := newStringInterner() for i, r := range iterations { - if err := scanIterationJSONL(r, i, aggs, nil, slowThreshold, interner); err != nil { - return nil, nil, err + if err := scanIterationJSONL(r, i, aggs, nil, slowThreshold, interner, tmpDir); err != nil { + cleanup() + return nil, nil, nil, err + } + if err := reattributeTimeoutsIter(aggs, i, tmpDir); err != nil { + cleanup() + return nil, nil, nil, err + } + if err := flushOutputsToDisk(i, aggs, tmpDir); err != nil { + cleanup() + return nil, nil, nil, err } } - reattributeTimeouts(aggs, newAggregate) rep, logs := buildReportFromAggs(aggs, len(iterations), slowThreshold) - return rep, logs, nil + return rep, logs, cleanup, nil } func newAggregate() *aggregate { return &aggregate{ lastRunIter: -1, - outputs: map[int]*strings.Builder{}, } } @@ -273,13 +291,13 @@ func (a *aggregate) recordElapsed(d time.Duration) { // parseTestEvent parses a single JSONL test event into ev. If interner is not nil, // it deduplicates Action, Package, and Test strings. -func parseTestEvent(line []byte, ev *TestEvent, interner *stringInterner) error { +func parseTestEvent(line []byte, ev *TestEvent, interner *stringInterner, parseTime bool, customBuf []byte) error { return jsonparser.ObjectEach( line, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error { switch string(key) { case "Time": - if dataType == jsonparser.String { + if parseTime && dataType == jsonparser.String { s, _ := jsonparser.ParseString(value) ev.Time, _ = time.Parse(time.RFC3339Nano, s) } @@ -307,8 +325,10 @@ func parseTestEvent(line []byte, ev *TestEvent, interner *stringInterner) error } case "Output": if dataType == jsonparser.String { - s, _ := jsonparser.ParseString(value) - ev.Output = s + // We only unescape here to get a flat byte slice to write. + // Often this does not allocate if there are no escapes. + b, _ := jsonparser.Unescape(value, customBuf) + ev.OutputBytes = b } case "FailedBuild": if dataType == jsonparser.String { @@ -330,6 +350,7 @@ func scanIterationJSONL( meta *iterationScanMeta, slowThreshold time.Duration, interner *stringInterner, + tmpDir string, ) error { reader := readerPool.Get().(*bufio.Reader) reader.Reset(r) @@ -338,6 +359,9 @@ func scanIterationJSONL( readerPool.Put(reader) }() + customBuf := make([]byte, 0, 8192) + var totalBuffered int + for { line, err := reader.ReadSlice('\n') if err == bufio.ErrBufferFull { @@ -348,9 +372,16 @@ func scanIterationJSONL( if len(line) > 0 && line[0] == '{' { var ev TestEvent - err := parseTestEvent(line, &ev, interner) + customBuf = customBuf[:0] + err := parseTestEvent(line, &ev, interner, false, customBuf) if err == nil { - applyTestEvent(aggs, iterIdx, &ev, meta, slowThreshold) + applyTestEvent(aggs, iterIdx, &ev, meta, slowThreshold, &totalBuffered) + if tmpDir != "" && totalBuffered > 32*1024*1024 { + if err := flushOutputsToDisk(iterIdx, aggs, tmpDir); err != nil { + return fmt.Errorf("iteration %d: flush output: %w", iterIdx, err) + } + totalBuffered = 0 + } } } @@ -369,6 +400,7 @@ func applyTestEvent( ev *TestEvent, meta *iterationScanMeta, slowThreshold time.Duration, + totalBuffered *int, ) { key := testKey{Package: ev.Package, Test: ev.Test} a := aggs[key] @@ -415,18 +447,24 @@ func applyTestEvent( delete(a.outputs, iterIdx) } case "output": - if strings.Contains(ev.Output, timeoutPanic) { + if bytes.Contains(ev.OutputBytes, []byte(timeoutPanic)) { a.timedOut = true if len(a.timeoutIters) == 0 || a.timeoutIters[len(a.timeoutIters)-1] != iterIdx { a.timeoutIters = append(a.timeoutIters, iterIdx) } } + if a.outputs == nil { + a.outputs = make(map[int]*bytes.Buffer) + } buf := a.outputs[iterIdx] if buf == nil { - buf = &strings.Builder{} + buf = &bytes.Buffer{} a.outputs[iterIdx] = buf } - buf.WriteString(ev.Output) + buf.Write(ev.OutputBytes) + if totalBuffered != nil { + *totalBuffered += len(ev.OutputBytes) + } } } @@ -704,10 +742,12 @@ func DigestIterationJSONL(r io.Reader, slowThreshold time.Duration) (IterationDi aggs := make(map[testKey]*aggregate) var meta iterationScanMeta interner := newStringInterner() - if err := scanIterationJSONL(r, 0, aggs, &meta, slowThreshold, interner); err != nil { + if err := scanIterationJSONL(r, 0, aggs, &meta, slowThreshold, interner, ""); err != nil { + return IterationDigest{}, err + } + if err := reattributeTimeoutsIter(aggs, 0, ""); err != nil { return IterationDigest{}, err } - reattributeTimeouts(aggs, newAggregate) ran := countNamedTestsRanInAggs(aggs) rep, _ := buildReportFromAggs(aggs, 1, slowThreshold) d := iterationDigestFromReport(rep) @@ -755,10 +795,19 @@ func timedOutTestNamesFromReport(rep *Report) []string { // AnalyzeResults opens every `iteration-*.log.jsonl` file in resultsDir, in // numeric-iteration order, and delegates to Analyze. -func AnalyzeResults(resultsDir string, slowThreshold time.Duration) (*Report, LogMap, error) { +func AnalyzeResults(resultsDir string, slowThreshold time.Duration) (*Report, LogMap, func(), error) { + tmpDir, err := os.MkdirTemp("", "testrig-analyze-*") + if err != nil { + return nil, nil, nil, err + } + cleanup := func() { + _ = os.RemoveAll(tmpDir) + } + matches, err := filepath.Glob(filepath.Join(resultsDir, "iteration-*.log.jsonl")) if err != nil { - return nil, nil, err + cleanup() + return nil, nil, nil, err } sort.Slice(matches, func(i, j int) bool { return iterNumber(matches[i]) < iterNumber(matches[j]) @@ -772,14 +821,22 @@ func AnalyzeResults(resultsDir string, slowThreshold time.Duration) (*Report, Lo return err } defer func() { _ = f.Close() }() - return scanIterationJSONL(f, i, aggs, nil, slowThreshold, interner) + return scanIterationJSONL(f, i, aggs, nil, slowThreshold, interner, tmpDir) }(); err != nil { - return nil, nil, err + cleanup() + return nil, nil, nil, err + } + if err := reattributeTimeoutsIter(aggs, i, tmpDir); err != nil { + cleanup() + return nil, nil, nil, err + } + if err := flushOutputsToDisk(i, aggs, tmpDir); err != nil { + cleanup() + return nil, nil, nil, err } } - reattributeTimeouts(aggs, newAggregate) rep, logs := buildReportFromAggs(aggs, len(matches), slowThreshold) - return rep, logs, nil + return rep, logs, cleanup, nil } // WriteReport writes the report as pretty JSON to /report.json. @@ -791,7 +848,7 @@ func WriteReport(resultsDir string, rep *Report) error { return os.WriteFile(filepath.Join(resultsDir, "report.json"), b, 0600) } -// WriteLogFiles writes per-test per-iteration log files under /logs/ +// WriteLogFiles moves per-test per-iteration temporary log files into /logs/ // for flagged tests and populates each flagged TestEntry's Logs slice with a // compact problem-kind, iteration-range, and path pattern. func WriteLogFiles(resultsDir string, rep *Report, logs LogMap) error { @@ -813,8 +870,8 @@ func WriteLogFiles(resultsDir string, rep *Report, logs LogMap) error { budgetIteration := longestIterationString(iterations) written := make([]int, 0, len(iterations)) for _, it := range iterations { - out := m[it] - if out == "" { + tmpPath := m[it] + if tmpPath == "" { continue } name := diagnoseLogFilenameForIterWithBudget( @@ -824,9 +881,31 @@ func WriteLogFiles(resultsDir string, rep *Report, logs LogMap) error { budgetIteration, ) abs := filepath.Join(logsDir, name) - if err := os.WriteFile(abs, []byte(out), 0600); err != nil { - return err + + // Rename the temporary file to the final destination + if err := os.Rename(tmpPath, abs); err != nil { + if os.IsNotExist(err) { + continue + } + // Fallback to copy if rename fails across filesystems + src, readErr := os.Open(tmpPath) //nolint:gosec // G304: path is securely generated temp file + if readErr != nil { + return readErr + } + dst, createErr := os.Create(abs) //nolint:gosec // G304: path from filepath.Join + if createErr != nil { + _ = src.Close() + return createErr + } + _, copyErr := io.Copy(dst, src) + _ = src.Close() + _ = dst.Close() + if copyErr != nil { + return copyErr + } + _ = os.Remove(tmpPath) } + written = append(written, it) } if len(written) > 0 { @@ -1201,64 +1280,207 @@ func flakeFailRatio(e TestEntry) float64 { return float64(e.Fails) / float64(runs) } -// reattributeTimeouts fixes the go-test-json quirk where a `panic: test timed out` -// is attached to whichever test most recently emitted events rather than the -// actually-stuck one. The real culprits are listed in the panic's -// "running tests:" block — move the timeout mark (and the captured stack -// trace) onto those tests. -func reattributeTimeouts(aggs map[testKey]*aggregate, newAgg func() *aggregate) { +func reattributeTimeoutsIter(aggs map[testKey]*aggregate, i int, tmpDir string) error { keys := make([]testKey, 0, len(aggs)) for k := range aggs { keys = append(keys, k) } for _, key := range keys { a := aggs[key] - if !a.timedOut { + if !a.timedOut || len(a.timeoutIters) == 0 || a.timeoutIters[len(a.timeoutIters)-1] != i { continue } - var keptTimeouts []int - for _, i := range a.timeoutIters { - buf := a.outputs[i] - if buf == nil { - keptTimeouts = append(keptTimeouts, i) - continue + + names, r, f, err := getTimeoutOutputs(a, i, key, tmpDir) + if err != nil { + return err + } + if r == nil { + continue + } + + if len(names) == 0 || slices.Contains(names, key.Test) { + if f != nil { + _ = f.Close() } - output := buf.String() - names := parseRunningTests(output) - if len(names) == 0 { - keptTimeouts = append(keptTimeouts, i) - continue + continue + } + + // Remove the timeout from the parent + a.timeoutIters = a.timeoutIters[:len(a.timeoutIters)-1] + if len(a.timeoutIters) == 0 { + a.timedOut = false + } + + for _, name := range names { + nk := testKey{Package: key.Package, Test: name} + na := aggs[nk] + if na == nil { + na = newAggregate() + aggs[nk] = na } - if slices.Contains(names, key.Test) { - keptTimeouts = append(keptTimeouts, i) - continue + na.timedOut = true + if len(na.timeoutIters) == 0 || na.timeoutIters[len(na.timeoutIters)-1] != i { + na.timeoutIters = append(na.timeoutIters, i) } - for _, name := range names { - nk := testKey{Package: key.Package, Test: name} - na := aggs[nk] - if na == nil { - na = newAgg() - aggs[nk] = na - } - na.timedOut = true - if len(na.timeoutIters) == 0 || na.timeoutIters[len(na.timeoutIters)-1] != i { - na.timeoutIters = append(na.timeoutIters, i) - } - if na.lastRunIter != i { - na.runs++ - na.lastRunIter = i + if na.lastRunIter != i { + na.runs++ + na.lastRunIter = i + } + + if err := copyTimeoutToTarget(i, na, f, r, tmpDir); err != nil { + if f != nil { + _ = f.Close() } - if na.outputs[i] == nil { - na.outputs[i] = &strings.Builder{} + return err + } + } + + if f != nil { + _ = f.Close() + } + } + return nil +} + +func getTimeoutOutputs(a *aggregate, i int, key testKey, tmpDir string) ([]string, io.Reader, *os.File, error) { + var r io.Reader + var f *os.File + var err error + + if tmpDir != "" { + if err := flushOutputForBuffer(i, a, tmpDir); err != nil { + return nil, nil, nil, fmt.Errorf( + "reattribute timeouts iter %d pkg %s test %s: flush: %w", + i, + key.Package, + key.Test, + err, + ) + } + if a.logPaths != nil && a.logPaths[i] != "" { + f, err = os.Open(a.logPaths[i]) //nolint:gosec + if err != nil { + return nil, nil, nil, fmt.Errorf( + "reattribute timeouts iter %d pkg %s test %s: open: %w", + i, + key.Package, + key.Test, + err, + ) + } + r = f + } + } else { + if buf := a.outputs[i]; buf != nil && buf.Len() > 0 { + r = bytes.NewReader(buf.Bytes()) + } + } + + if r == nil { + return nil, nil, nil, nil + } + + names, err := parseRunningTests(r) + if err != nil { + if f != nil { + _ = f.Close() + } + return nil, nil, nil, fmt.Errorf( + "reattribute timeouts iter %d pkg %s test %s: parse: %w", + i, + key.Package, + key.Test, + err, + ) + } + return names, r, f, nil +} + +func copyTimeoutToTarget(i int, na *aggregate, f *os.File, r io.Reader, tmpDir string) error { + if tmpDir != "" { + if _, err := f.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("seek: %w", err) + } + if err := flushOutputForBuffer(i, na, tmpDir); err != nil { + return fmt.Errorf("flush target: %w", err) + } + var dst *os.File + var err error + if na.logPaths != nil && na.logPaths[i] != "" { + dst, err = os.OpenFile(na.logPaths[i], os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + } else { + dst, err = os.CreateTemp(tmpDir, "log-*") + if err == nil { + if na.logPaths == nil { + na.logPaths = make(map[int]string) } - _, _ = na.outputs[i].WriteString(output) + na.logPaths[i] = dst.Name() } } - a.timeoutIters = keptTimeouts - if len(a.timeoutIters) == 0 { - a.timedOut = false + if err != nil { + return fmt.Errorf("open target log: %w", err) + } + _, copyErr := io.Copy(dst, f) + _ = dst.Close() + if copyErr != nil { + return fmt.Errorf("copy log: %w", copyErr) + } + } else { + if na.outputs == nil { + na.outputs = make(map[int]*bytes.Buffer) + } + if na.outputs[i] == nil { + na.outputs[i] = &bytes.Buffer{} + } + if br, ok := r.(*bytes.Reader); ok { + _, _ = br.Seek(0, io.SeekStart) + _, _ = io.Copy(na.outputs[i], br) + } + } + return nil +} + +func flushOutputForBuffer(iterIdx int, a *aggregate, tmpDir string) error { + buf := a.outputs[iterIdx] + if buf == nil || buf.Len() == 0 { + return nil + } + + var f *os.File + var err error + if a.logPaths != nil && a.logPaths[iterIdx] != "" { + f, err = os.OpenFile(a.logPaths[iterIdx], os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + } else { + f, err = os.CreateTemp(tmpDir, "log-*") + if err == nil { + if a.logPaths == nil { + a.logPaths = make(map[int]string) + } + a.logPaths[iterIdx] = f.Name() + } + } + if err != nil { + return err + } + _, err = buf.WriteTo(f) + if err != nil { + _ = f.Close() + return err + } + _ = f.Close() + + delete(a.outputs, iterIdx) + return nil +} + +func flushOutputsToDisk(iterIdx int, aggs map[testKey]*aggregate, tmpDir string) error { + for _, a := range aggs { + if err := flushOutputForBuffer(iterIdx, a, tmpDir); err != nil { + return err } } + return nil } // parseRunningTests extracts test names from a `panic: test timed out` block: @@ -1273,14 +1495,35 @@ func reattributeTimeouts(aggs map[testKey]*aggregate, newAgg func() *aggregate) // aren't test names: before any name has been seen they are tolerated as // preamble (e.g. extra header text); after a name has been seen they mark the // end of the names section (e.g. trailing prose before the stack trace). -func parseRunningTests(output string) []string { - const marker = "running tests:" - _, tail, found := strings.Cut(output, marker) - if !found { - return nil - } +func parseRunningTests(r io.Reader) ([]string, error) { + reader := bufio.NewReader(r) var names []string - for line := range strings.SplitSeq(tail, "\n") { + found := false + for { + lineBytes, isPrefix, err := reader.ReadLine() + if err != nil { + if err == io.EOF { + break + } + return names, err + } + + line := string(lineBytes) + + // Consume the rest of the line if it was too long for ReadLine buffer + for isPrefix { + _, isPrefix, err = reader.ReadLine() + if err != nil { + break + } + } + + if !found { + if strings.Contains(line, "running tests:") { + found = true + } + continue + } if strings.HasPrefix(line, "goroutine ") { break } @@ -1299,7 +1542,7 @@ func parseRunningTests(output string) []string { } names = append(names, trim) } - return names + return names, nil } // buildLogMap returns the raw per-iteration output for every (pkg, test) that @@ -1307,13 +1550,13 @@ func parseRunningTests(output string) []string { func buildLogMap(aggs map[testKey]*aggregate) LogMap { out := LogMap{} for k, a := range aggs { - if len(a.outputs) == 0 { + if len(a.logPaths) == 0 { continue } m := map[int]string{} - for i, buf := range a.outputs { - if buf != nil && buf.Len() > 0 { - m[i] = buf.String() + for i, p := range a.logPaths { + if p != "" { + m[i] = p } } if len(m) > 0 { diff --git a/internal/runner/analyze_bench_test.go b/internal/runner/analyze_bench_test.go index 70891c6..ce47441 100644 --- a/internal/runner/analyze_bench_test.go +++ b/internal/runner/analyze_bench_test.go @@ -41,7 +41,7 @@ func BenchmarkAnalyze_RealThreeIterations(b *testing.B) { for i, p := range payloads { rs[i] = bytes.NewReader(p) } - _, _, err := Analyze(rs, benchSlowThreshold) + _, _, _, err := Analyze(rs, benchSlowThreshold) if err != nil { b.Fatal(err) } @@ -76,13 +76,13 @@ func BenchmarkAnalyzeResults_RealDir(b *testing.B) { for _, p := range payloads { total += int64(len(p)) } - if _, _, err := AnalyzeResults(dir, benchSlowThreshold); err != nil { + if _, _, _, err := AnalyzeResults(dir, benchSlowThreshold); err != nil { b.Fatalf("warm-up AnalyzeResults: %v", err) } b.ReportAllocs() b.SetBytes(total) for b.Loop() { - _, _, err := AnalyzeResults(dir, benchSlowThreshold) + _, _, _, err := AnalyzeResults(dir, benchSlowThreshold) if err != nil { b.Fatal(err) } diff --git a/internal/runner/analyze_files_test.go b/internal/runner/analyze_files_test.go index ea433af..4bb257c 100644 --- a/internal/runner/analyze_files_test.go +++ b/internal/runner/analyze_files_test.go @@ -19,7 +19,7 @@ func TestWriteLogFiles(t *testing.T) { {"Action":"fail","Package":"github.com/foo/bar","Test":"TestFail","Elapsed":0.1} ` dir := t.TempDir() - rep, logs, err := Analyze(readers(iter), 30*time.Second) + rep, logs, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) require.Len(t, rep.Failures, 1) @@ -48,7 +48,7 @@ func TestWriteLogFilesWritesOnlyProblemIterations(t *testing.T) { `, } dir := t.TempDir() - rep, logs, err := Analyze(readers(iters...), 30*time.Second) + rep, logs, _, err := Analyze(readers(iters...), 30*time.Second) require.NoError(t, err) require.Len(t, rep.Flakes, 1) @@ -84,7 +84,7 @@ func TestWriteLogFilesCompressesSlowIterations(t *testing.T) { `, } dir := t.TempDir() - rep, logs, err := Analyze(readers(iters...), 30*time.Second) + rep, logs, _, err := Analyze(readers(iters...), 30*time.Second) require.NoError(t, err) require.Len(t, rep.Slow, 1) @@ -103,7 +103,7 @@ func TestWriteLogFilesTruncatesLongFilenames(t *testing.T) { {"Action":"fail","Package":"github.com/foo/bar","Test":"` + longTest + `","Elapsed":0.1} ` dir := t.TempDir() - rep, logs, err := Analyze(readers(iter), 30*time.Second) + rep, logs, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) require.Len(t, rep.Failures, 1) @@ -155,7 +155,7 @@ func TestWriteLogFilesNoLogsForNonFlaggedTests(t *testing.T) { {"Action":"pass","Package":"p","Test":"T","Elapsed":0.01} ` dir := t.TempDir() - rep, logs, err := Analyze(readers(iter), 30*time.Second) + rep, logs, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) assert.Empty(t, rep.Flakes) assert.Empty(t, rep.Failures) @@ -167,6 +167,29 @@ func TestWriteLogFilesNoLogsForNonFlaggedTests(t *testing.T) { assert.Empty(t, entries, "no log files should be written for a clean-pass test") } +func TestWriteLogFilesSkipsMissingTempPath(t *testing.T) { + t.Parallel() + + iter := `{"Action":"output","Package":"p","Test":"T","Output":"fail-log\n"} +{"Action":"fail","Package":"p","Test":"T","Elapsed":0.01} +` + dir := t.TempDir() + rep, logs, _, err := Analyze(readers(iter), 30*time.Second) + require.NoError(t, err) + require.Len(t, rep.Failures, 1) + + key := testKey{Package: "p", Test: "T"} + logs[key][0] = filepath.Join(dir, "missing.log") + + require.NoError(t, WriteLogFiles(dir, rep, logs)) + + assert.Empty(t, rep.Failures[0].Logs, "missing temp must not produce ProblemLog entry") + + entries, err := os.ReadDir(filepath.Join(dir, "logs")) + require.NoError(t, err) + assert.Empty(t, entries) +} + func TestWriteCSV(t *testing.T) { t.Parallel() @@ -185,7 +208,7 @@ func TestWriteCSV(t *testing.T) { `, } dir := t.TempDir() - rep, _, err := Analyze(readers(iters...), 30*time.Second) + rep, _, _, err := Analyze(readers(iters...), 30*time.Second) require.NoError(t, err) require.NoError(t, WriteCSV(dir, rep)) @@ -226,7 +249,7 @@ func TestWriteCSVRenamesSlowWhenAlsoTimeout(t *testing.T) { {"Action":"fail","Package":"p","Test":"T","Elapsed":600.0} ` dir := t.TempDir() - rep, _, err := Analyze(readers(iter), 30*time.Second) + rep, _, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) require.NoError(t, WriteCSV(dir, rep)) diff --git a/internal/runner/analyze_mem_test.go b/internal/runner/analyze_mem_test.go index b00a354..bb66ae3 100644 --- a/internal/runner/analyze_mem_test.go +++ b/internal/runner/analyze_mem_test.go @@ -40,7 +40,7 @@ func TestAnalyzeMemory_Limit(t *testing.T) { var before, after runtime.MemStats runtime.ReadMemStats(&before) - rep, _, err := Analyze(readers, 30*time.Second) + rep, _, _, err := Analyze(readers, 30*time.Second) require.NoError(t, err) runtime.ReadMemStats(&after) diff --git a/internal/runner/analyze_test.go b/internal/runner/analyze_test.go index e896dd2..c39bdaf 100644 --- a/internal/runner/analyze_test.go +++ b/internal/runner/analyze_test.go @@ -2,6 +2,7 @@ package runner import ( "bufio" + "bytes" "encoding/json" "fmt" "io" @@ -17,6 +18,16 @@ import ( "github.com/smartcontractkit/testrig/internal/termstyle" ) +func readLogMapContent(t *testing.T, path string) string { + t.Helper() + if path == "" { + return "" + } + b, err := os.ReadFile(path) //nolint:gosec + require.NoError(t, err) + return string(b) +} + func readers(iters ...string) []io.Reader { rs := make([]io.Reader, len(iters)) for i, s := range iters { @@ -32,7 +43,7 @@ func TestAnalyzePackageLevelTimeoutIterationSummary(t *testing.T) { {"Action":"fail","Package":"pkg/hang","Elapsed":120.0} `, } - rep, _, err := Analyze(readers(iterations...), 30*time.Second) + rep, _, _, err := Analyze(readers(iterations...), 30*time.Second) require.NoError(t, err) require.Len(t, rep.IterationSummaries, 1) assert.Equal(t, "timeout", rep.IterationSummaries[0].Result) @@ -44,7 +55,7 @@ func TestAnalyzeHandlesLongLines(t *testing.T) { over := strings.Repeat("x", bufio.MaxScanTokenSize+1) + "\n" iter := `{"Action":"pass","Package":"p","Test":"T","Elapsed":0.01}` + "\n" + over + `{"Action":"pass","Package":"p","Test":"T2","Elapsed":0.01}` + "\n" - rep, _, err := Analyze(readers(iter), 30*time.Second) + rep, _, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) require.NotNil(t, rep) require.Len(t, rep.IterationSummaries, 1) @@ -61,7 +72,7 @@ badpkg.go:1:2: undefined: MissingType ` + `{"Action":"output","Package":"example.com/badpkg","Output":"# example.com/badpkg\n"} {"Action":"fail","Package":"example.com/badpkg","Elapsed":0.0} ` - rep, _, err := Analyze(readers(iter), 30*time.Second) + rep, _, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) require.Len(t, rep.Failures, 1) assert.Equal(t, "example.com/badpkg", rep.Failures[0].Package) @@ -115,7 +126,7 @@ func TestAnalyzeTestdataFiles(t *testing.T) { require.NoError(t, err) defer func() { _ = f.Close() }() - rep, _, err := Analyze([]io.Reader{f}, 30*time.Second) + rep, _, _, err := Analyze([]io.Reader{f}, 30*time.Second) require.NoError(t, err) require.Len(t, rep.IterationSummaries, 1) @@ -156,7 +167,7 @@ func TestAnalyzePackageLevelFailureIterationSummary(t *testing.T) { iterations := []string{ `{"Action":"fail","Package":"pkg/build","Elapsed":0.0}` + "\n", } - rep, _, err := Analyze(readers(iterations...), 30*time.Second) + rep, _, _, err := Analyze(readers(iterations...), 30*time.Second) require.NoError(t, err) require.Len(t, rep.IterationSummaries, 1) assert.Equal(t, "fail", rep.IterationSummaries[0].Result) @@ -443,7 +454,7 @@ func TestAnalyze(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers(tc.iterations...), tc.slowThreshold) + rep, _, _, err := Analyze(readers(tc.iterations...), tc.slowThreshold) require.NoError(t, err) assert.Equal(t, len(tc.iterations), rep.Iterations) assert.Equal(t, tc.wantFlakes, publicTestEntries(rep.Flakes), "flakes") @@ -618,7 +629,7 @@ func TestReportSummary(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers(tc.iterations...), tc.slowThreshold) + rep, _, _, err := Analyze(readers(tc.iterations...), tc.slowThreshold) require.NoError(t, err) tc.check(t, rep.Summary) }) @@ -635,7 +646,7 @@ func TestPrintSummaryOverallContains(t *testing.T) { { name: "flake_rates_and_slow_line", prep: func(t *testing.T) *Report { - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -655,7 +666,7 @@ func TestPrintSummaryOverallContains(t *testing.T) { { name: "iteration_wall_clock_runtimes", prep: func(t *testing.T) *Report { - rep, _, err := Analyze( + rep, _, _, err := Analyze( readers(`{"Action":"pass","Package":"p","Test":"T","Elapsed":0.01}`), 30*time.Second, ) @@ -688,7 +699,7 @@ func TestPrintSummaryOverallContains(t *testing.T) { func TestPrintSummaryOverall_usesSeverityColors(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -787,7 +798,7 @@ func TestAnalyzeCapturesLogsForFailures(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() - rep, logs, err := Analyze(readers(tc.iterations...), 30*time.Second) + rep, logs, _, err := Analyze(readers(tc.iterations...), 30*time.Second) require.NoError(t, err) var entries []TestEntry switch tc.category { @@ -800,7 +811,7 @@ func TestAnalyzeCapturesLogsForFailures(t *testing.T) { } require.Len(t, entries, 1, "expected exactly one %s entry", tc.category) require.Contains(t, logs, tc.wantKey, "log map should contain the flagged test") - assert.Equal(t, tc.wantOutput, logs[tc.wantKey][tc.wantIter]) + assert.Equal(t, tc.wantOutput, readLogMapContent(t, logs[tc.wantKey][tc.wantIter])) }) } } @@ -817,7 +828,7 @@ func TestAnalyzeReattributesTimeoutToRunningTests(t *testing.T) { {"Action":"output","Package":"p","Test":"TestFast","Output":"goroutine 1 [chan receive]:\n"} {"Action":"fail","Package":"p","Elapsed":5.01} ` - rep, logs, err := Analyze(readers(iter), 30*time.Second) + rep, logs, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) names := make([]string, 0, len(rep.Timeouts)) @@ -831,7 +842,7 @@ func TestAnalyzeReattributesTimeoutToRunningTests(t *testing.T) { for _, e := range rep.Timeouts { k := testKey{Package: e.Package, Test: e.Test} require.Contains(t, logs, k) - assert.Contains(t, logs[k][0], "panic: test timed out after 5s") + assert.Contains(t, readLogMapContent(t, logs[k][0]), "panic: test timed out after 5s") } } @@ -842,12 +853,96 @@ func TestAnalyzeKeepsTimeoutOnCulpritWhenItWasTheReportedTest(t *testing.T) { {"Action":"output","Package":"p","Test":"TestSlow","Output":"\t\tTestSlow (5s)\n"} {"Action":"fail","Package":"p","Elapsed":5.01} ` - rep, _, err := Analyze(readers(iter), 30*time.Second) + rep, _, _, err := Analyze(readers(iter), 30*time.Second) require.NoError(t, err) require.Len(t, rep.Timeouts, 1) assert.Equal(t, "TestSlow", rep.Timeouts[0].Test) } +func TestReattributeTimeoutsIterUnreadableLogPath(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + badPath := filepath.Join(dir, "unreadable.log") + require.NoError(t, os.WriteFile( + badPath, + []byte("panic: test timed out\n\trunning tests:\n\t\tTestSlow (5s)\n"), + 0o600, + )) + require.NoError(t, os.Chmod(badPath, 0o000)) + t.Cleanup(func() { _ = os.Chmod(badPath, 0o600) }) + + aggs := map[testKey]*aggregate{ + {Package: "p", Test: "TestFast"}: { + timedOut: true, + timeoutIters: []int{0}, + logPaths: map[int]string{0: badPath}, + }, + } + err := reattributeTimeoutsIter(aggs, 0, dir) + require.Error(t, err) + assert.Contains(t, err.Error(), "reattribute timeouts iter 0") +} + +func TestReattributeTimeoutsIterLargeOutput(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + + // Create an output larger than 64KB (bufio.Scanner max token size) + hugeLine := strings.Repeat("a", 100*1024) + output := hugeLine + "\npanic: test timed out\n\trunning tests:\n\t\tTestSlow (5s)\n" + + aggs := map[testKey]*aggregate{ + {Package: "p", Test: "TestFast"}: { + timedOut: true, + timeoutIters: []int{0}, + outputs: map[int]*bytes.Buffer{ + 0: bytes.NewBufferString(output), + }, + }, + } + err := reattributeTimeoutsIter(aggs, 0, dir) + require.NoError(t, err) + + require.NotNil(t, aggs[testKey{Package: "p", Test: "TestSlow"}]) + na := aggs[testKey{Package: "p", Test: "TestSlow"}] + assert.True(t, na.timedOut) + assert.Contains(t, na.timeoutIters, 0) + assert.NotNil(t, na.logPaths[0]) + + b, err := os.ReadFile(na.logPaths[0]) + require.NoError(t, err) + assert.Equal(t, output, string(b)) +} + +func TestScanIterationJSONLFlushOutputFailure(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + notADir := filepath.Join(dir, "notadir") + require.NoError(t, os.WriteFile(notADir, []byte("x"), 0o600)) + + chunk := strings.Repeat("x", 400*1024) + line, err := json.Marshal(map[string]string{ + "Action": "output", + "Package": "p", + "Test": "T", + "Output": chunk, + }) + require.NoError(t, err) + var b strings.Builder + for range 100 { + b.Write(line) + b.WriteByte('\n') + } + + aggs := make(map[testKey]*aggregate) + err = scanIterationJSONL(strings.NewReader(b.String()), 0, aggs, nil, 0, newStringInterner(), notADir) + require.Error(t, err) + assert.Contains(t, err.Error(), "flush output") +} + func TestPrintSummaryTimeoutShowsTestNotPassCounts(t *testing.T) { t.Parallel() rep := &Report{ @@ -901,7 +996,7 @@ func TestAnalyzeResultsRoundtrip(t *testing.T) { must(t, os.WriteFile(filepath.Join(dir, "iteration-1.log.jsonl"), []byte(`{"Action":"pass","Package":"pkg/z","Test":"TestFlaky","Elapsed":0.1}`+"\n"), 0600)) - rep, _, err := AnalyzeResults(dir, 30*time.Second) + rep, _, _, err := AnalyzeResults(dir, 30*time.Second) require.NoError(t, err) require.Len(t, rep.Flakes, 1) assert.Equal(t, "TestFlaky", rep.Flakes[0].Test) @@ -981,7 +1076,7 @@ func TestAnalyzeIterationSummaries(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers(tc.iterations...), 30*time.Second) + rep, _, _, err := Analyze(readers(tc.iterations...), 30*time.Second) require.NoError(t, err) require.Len(t, rep.IterationSummaries, len(tc.want)) // Strip Duration/ShuffleSeed — set by runner, not Analyze. @@ -999,7 +1094,7 @@ func TestAnalyzeSkipsMalformedLines(t *testing.T) { input := `not json at all {"Action":"pass","Package":"p","Test":"T","Elapsed":0.01} ` - rep, _, err := Analyze(readers(input), 30*time.Second) + rep, _, _, err := Analyze(readers(input), 30*time.Second) require.NoError(t, err) assert.Empty(t, rep.Flakes) assert.Empty(t, rep.Failures) @@ -1119,7 +1214,7 @@ func TestFillIterationRuntimeSummaryTable(t *testing.T) { func TestMarshalAIDiagnoseComplete_fromAnalyze(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"p","Test":"T","Elapsed":0.1}`, `{"Action":"pass","Package":"p","Test":"T","Elapsed":0.1}`, ), 30*time.Second) @@ -1142,7 +1237,7 @@ func TestAnalyzeSlowTestsNoDuplication(t *testing.T) { iter := `{"Action":"pass","Package":"pkg/slow","Test":"TestSlow","Elapsed":10.0} {"Action":"pass","Package":"pkg/slow","Elapsed":10.0} ` - rep, _, err := Analyze([]io.Reader{strings.NewReader(iter)}, 1*time.Second) + rep, _, _, err := Analyze([]io.Reader{strings.NewReader(iter)}, 1*time.Second) require.NoError(t, err) require.Len(t, rep.Slow, 1) diff --git a/internal/runner/diagnose_output_test.go b/internal/runner/diagnose_output_test.go index 32c8491..d0e4454 100644 --- a/internal/runner/diagnose_output_test.go +++ b/internal/runner/diagnose_output_test.go @@ -17,7 +17,7 @@ import ( func TestMarshalAIDiagnoseComplete(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -93,7 +93,7 @@ func TestFormatSummaryFlatLine(t *testing.T) { func TestPrintSummaryVerdict_noIssues(t *testing.T) { t.Parallel() - rep, _, err := Analyze( + rep, _, _, err := Analyze( readers(`{"Action":"pass","Package":"p","Test":"T","Elapsed":0.01}`), 30*time.Second, ) diff --git a/internal/runner/diagnose_overall_table_test.go b/internal/runner/diagnose_overall_table_test.go index 0faed2c..94e5504 100644 --- a/internal/runner/diagnose_overall_table_test.go +++ b/internal/runner/diagnose_overall_table_test.go @@ -14,7 +14,7 @@ import ( func TestRenderOverallRatesTable_allClear(t *testing.T) { t.Parallel() - rep, _, err := Analyze( + rep, _, _, err := Analyze( readers(`{"Action":"pass","Package":"p","Test":"T","Elapsed":0.01}`), 30*time.Second, ) @@ -28,7 +28,7 @@ func TestRenderOverallRatesTable_allClear(t *testing.T) { func TestRenderOverallRatesTable_flakyRow(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -44,7 +44,7 @@ func TestRenderOverallRatesTable_flakyRow(t *testing.T) { func TestOverallScopeAndWallLine(t *testing.T) { t.Parallel() - rep, _, err := Analyze( + rep, _, _, err := Analyze( readers(`{"Action":"pass","Package":"p","Test":"T","Elapsed":0.01}`), 30*time.Second, ) @@ -104,7 +104,7 @@ func TestBuildOverallRateRows_flakyIterationsLast(t *testing.T) { func TestFormatOverallFlakyIterRate_CIColoredByGap(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index e122a98..5fcbb79 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -236,7 +236,11 @@ func FinishDiagnoseAnalysis( var report *Report var logs LogMap var analyzeErr error - report, logs, analyzeErr = AnalyzeResults(resultsDir, conf.SlowThreshold) + var cleanup func() + report, logs, cleanup, analyzeErr = AnalyzeResults(resultsDir, conf.SlowThreshold) + if cleanup != nil { + defer cleanup() + } stopAnalyzing(analyzeErr) if analyzeErr != nil { out.Stderrf("analyze results: %v\n", analyzeErr) diff --git a/internal/runner/stats_test.go b/internal/runner/stats_test.go index 65ac3eb..37d2566 100644 --- a/internal/runner/stats_test.go +++ b/internal/runner/stats_test.go @@ -117,7 +117,7 @@ func TestFormatFlakyTestLine_packageLevel_includesCI(t *testing.T) { func TestReportSummary_hasCI(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -140,7 +140,7 @@ func TestReportSummary_hasCI(t *testing.T) { func TestPrintOverallStats_includesCI(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"fail","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -155,7 +155,7 @@ func TestPrintOverallStats_includesCI(t *testing.T) { func TestReportSummary_hasCI_noFlakes(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) @@ -171,7 +171,7 @@ func TestReportSummary_hasCI_noFlakes(t *testing.T) { func TestPrintOverallStats_includesCI_noFlakes(t *testing.T) { t.Parallel() - rep, _, err := Analyze(readers( + rep, _, _, err := Analyze(readers( `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.5}`, `{"Action":"pass","Package":"pkg/foo","Test":"TestX","Elapsed":0.4}`, ), 30*time.Second) diff --git a/internal/runner/trace.go b/internal/runner/trace.go index 92744ab..d28bac3 100644 --- a/internal/runner/trace.go +++ b/internal/runner/trace.go @@ -72,8 +72,8 @@ type testState struct { elapsed float64 } -func (ts *testState) addOutput(s string) { - ts.output = append(ts.output, s...) +func (ts *testState) addOutput(b []byte) { + ts.output = append(ts.output, b...) if len(ts.output) > maxOutputSize { ts.output = ts.output[len(ts.output)-maxOutputSize:] ts.truncated = true @@ -135,7 +135,7 @@ func scanTraceEvents( continue } var ev TestEvent - err := parseTestEvent(line, &ev, nil) + err := parseTestEvent(line, &ev, nil, true, nil) if err != nil { if out != nil { out.Stderrf("trace: skip malformed jsonl (iteration %d): %v\n", iter, err) @@ -168,7 +168,7 @@ func scanTraceEvents( case "cont": ts.contTime = ev.Time case "output": - ts.addOutput(ev.Output) + ts.addOutput(ev.OutputBytes) case "pass", "fail", "skip": ts.endTime = ev.Time ts.status = ev.Action