From f0c6129ee185fa0779c44bf61d996ed07c90c679 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Wed, 24 Jun 2026 21:05:16 -0400 Subject: [PATCH] feat(parser): migrate codex provider Codex sessions have an append-only JSONL transcript plus a session_index.jsonl title sidecar. Moving Codex behind a concrete provider keeps that composite source identity and incremental append capability explicit at the provider boundary. The provider preserves dated and archived discovery, live-over-archived lookup, shallow index watch planning, index-event classification, index-aware mtimes, source hashing, full parse output, and append parsing with full-parse fallback signals. fix(parser): preserve codex provider sidecar semantics Codex index changes are part of source freshness, so the provider cannot treat unchanged transcript size as no new data when the index mtime drove the fingerprint. The provider also needs to keep legacy live-over-archived UUID behavior and classify removed transcript paths syntactically. Index events now conservatively refresh sibling Codex sources because this provider layer has no DB state for title diffing; the sync engine can still apply its DB-aware filtering before provider dispatch is fully authoritative. Validation: go test -tags "fts5" ./internal/parser -run TestCodexProvider -count=1; go vet ./...; git diff --check. go test -tags "fts5" ./internal/parser -count=1 currently fails on TestProviderMigrationModes because inherited lower provider branches such as claude still need their branch-local shadow opt-ins. fix(parser): make codex provider sidecars authoritative The Codex provider could not safely infer sidecar-only freshness from a single max mtime. Rather than advertise append-only parsing with incomplete sidecar state, keep provider-authoritative Codex parses on the full-parse path until the facade can model sidecar dirtiness explicitly. Also route persisted path lookup and changed-path classification through the same UUID canonicalization as discovery so archived duplicates do not win over live dated transcripts. Validation: go test -tags "fts5" ./internal/parser -run 'Test(CodexProvider|ProviderMigrationModes)' -count=1; go test -tags "fts5" ./internal/parser -count=1; go vet ./...; git diff --check test(sync): compare codex shadow parity Codex is shadow-compared on this branch, so add source-level migration coverage that compares provider observation with ParseCodexSession. The fixture uses the real sessions/YYYY/MM/DD layout plus sibling session_index.jsonl, proving the provider preserves title sidecar behavior, parser output, and data-version planning. Validation: go test -tags "fts5" ./internal/parser ./internal/sync -run 'TestObserveProviderSourceMatchesCodexLegacyParser|TestCodexProvider|TestParseCodex|TestProviderMigrationModes' -count=1; go test -tags "fts5" ./internal/parser ./internal/sync -count=1; go fmt ./...; go vet ./...; ./custom-gcl run --config .golangci.nilaway.yml ./internal/parser/... ./internal/sync/...; git diff --check fix(parser): accept codex legacy-shaped sources Provider-authoritative Codex sync still has to rediscover sessions that were stored by the legacy parser even when their rollout filename does not expose a UUID-shaped session id. Without that compatibility path, the later dispatch migration can drop or fail to reprocess valid Codex transcripts that ParseCodexSession can read from session metadata. Keep the UUID-aware source contract as the preferred path and fall back to root-scoped JSONL sources only when Codex path metadata does not apply, so normal duplicate canonicalization remains unchanged while legacy-shaped fixtures stay reachable. Validation: go test ./internal/parser -count=1; go fmt ./...; go vet ./...; go test -tags "fts5" ./internal/parser ./internal/sync -run 'TestCodexProvider|TestSyncEngineCodex|TestSyncSingleSessionHashCodex|TestSyncEngineSkipCache' -count=1; git diff --check refactor(parser): fold codex into provider Make the Codex provider own its source discovery, lookup, and parse behavior instead of shimming the package-level free functions. Delete DiscoverCodexSessions, FindCodexSourceFile, ParseCodexSession, and ParseCodexSessionFrom: discovery and find-source bodies move onto the codex source set (discoverSessionPaths, findSourceFile), and parse moves onto the provider (parseSession, parseSessionFrom). Drop the Codex AgentDef DiscoverFunc/FindSourceFunc hooks and make Codex provider-authoritative; ShallowWatchRootsFunc and the exec-source helpers (IsCodexExecSessionFile, ResolveCodexShallowWatchRoots, the one-time codex_exec skip migration) stay since only the four parser entrypoints must go. A provider has no database handle, so the engine reproduces the DB-aware and mtime-aware bookkeeping the legacy single-session JSONL path performed, scoped to Codex to preserve behavior exactly: - shouldSkipProviderSourceByDB folds the session_index.jsonl sidecar into a DB-stored fingerprint skip, so an unchanged transcript is not reparsed when only the shared index mtime advanced and this session's title did not change, and a resync still skips after the in-memory skip cache is cleared. - The provider Parse force-replaces stored rows because Codex emits a full parse (it does not advertise incremental append); a late token_count line appended to an existing turn rewrites the stored message instead of being dropped by an append-only write. - Index events keep flowing through the engine's DB-aware classifyCodexIndexPath rather than the provider's broad index fan-out: the engine fans out only to sessions whose stored title changed and pins the chosen on-disk copy (SourceRefForPath) so the provider's live-over-archived canonicalization cannot resurrect a stale duplicate over the stored copy. - SyncAllSince re-expands a UUID's live and archived duplicates (AllSourcePathsForUUID) before the mtime cutoff filter, restoring the legacy discover-then-filter order so a changed archived copy newer than the cutoff is not lost behind an older live copy. Route parse-diff, the token-use disk probe, and the SSH remote resolve script through provider Discover/FindSource for provider-authoritative agents that no longer carry a DiscoverFunc, so Codex sources stay discoverable, resolvable on disk, and transferable (including the session_index.jsonl sidecar). Replace the deleted shadow-baseline test with provider-API coverage (provider Discover/Parse through ObserveProviderSource) plus a guard that the four legacy entrypoints stay gone, route the package and engine tests through the provider methods, and remove codex_provider.go from the pending shim scan list. This also fixes the previously known-failing TestSyncPathsCodexIndexEventRefreshesStoredDuplicate, since the index event now honors the stored archived copy. test(sync): host shared shadow source helper at codex fold The per-provider shadow/parse tests share writeProviderShadowSourceFile to write source fixtures. The Codex fold is the lowest branch that calls it, so the canonical definition lives here; later provider folds inherit it instead of redeclaring their own copies. test(sync): remove unused codex stat assignments The pre-commit lint hook rejects two Codex appended-fixture tests because they assign os.Stat results back to info without using the value. The tests already assert the append and close operations that matter for setup. Removing the unused assignments keeps staticcheck clean for the Codex provider migration branch. fix(parser): pin codex duplicate sources Codex discovery and raw-ID lookup should still prefer the live dated transcript, but exact filesystem events and DB-stored source hints are different: the caller has already selected a concrete source path. Canonicalizing those paths back to a stale live duplicate can overwrite an updated archived transcript. Changed-path classification now returns the source pinned to the event path, and non-fresh stored path/fingerprint lookup returns the exact source so SyncSingleSession preserves the archived path already recorded in the database. Validation: go test -tags "fts5" ./internal/parser -run 'TestCodexProvider(FindSourcePinsExactArchivedDuplicate|ChangedPathPinsArchivedDuplicate|SourceMethods|DiscoverDedupesLiveAndArchivedByUUID)' -count=1; go test -tags "fts5" ./internal/sync -run 'TestSync(PathsCodexArchivedDuplicateEventPinsChangedFile|SingleSessionCodexPreservesStoredArchivedDuplicate|PathsCodexIndexEventRefreshesStoredDuplicate|AllSinceCodexKeepsChangedArchivedDuplicate)' -count=1; go test -tags "fts5" ./internal/parser -run 'TestCodexProvider|TestParseCodex|TestDiscoverCodex' -count=1; go test -tags "fts5" ./internal/sync -run 'Test.*Codex.*' -count=1; go vet ./...; git diff --check fix(sync): keep codex freshness skips out of cache Codex provider DB-fresh skips are successful freshness decisions, not parse failures or intentional no-session skips. Recording them in the persistent skip cache can hide a later parser data-version bump because the cache check runs before the DB freshness check.\n\nKeep DB-fresh provider skips non-cacheable and make existing skip-cache entries fall through when a stored row at that path has a stale data version. The same bypass helper still preserves the existing stale-project self-healing behavior.\n\nValidation: go test -tags "fts5" ./internal/sync -run 'TestProcessFile(SkipCacheReparsesStaleCodex(Project|DataVersion)|CodexDBFreshSkipIsNotCached)|Test.*Codex.*' -count=1; go test -tags "fts5" ./internal/parser -run 'TestCodexProvider|TestParseCodex|TestDiscoverCodex' -count=1; go test -tags "fts5" ./internal/parser ./internal/sync -count=1; go vet ./...; git diff --check fix(sync): surface codex provider discovery failures Provider-backed parse-diff should not report a clean or incomplete diff when provider discovery failed. Returning that error keeps requested provider-authoritative agents honest and matches the expectation that parse-diff is a verification surface, not a best-effort sync.\n\nAlso pin coverage for stale Codex index entries whose transcripts no longer resolve, so the existing empty-candidate guard cannot regress into an invalid empty work item.\n\nValidation: go test -tags "fts5" ./internal/sync -run 'Test(ParseDiffProviderDiscoveryErrorFails|ClassifyCodexIndexPathSkipsMissingTranscript|ProcessFile(SkipCacheReparsesStaleCodex(Project|DataVersion)|CodexDBFreshSkipIsNotCached))' -count=1; go test -tags "fts5" ./internal/parser ./internal/sync -count=1; go vet ./...; git diff --check fix(sync): drop duplicate shadowCallerProvider Discover in codex test --- cmd/agentsview/token_use.go | 67 +- internal/parser/codex.go | 24 +- internal/parser/codex_parser_test.go | 123 +++- internal/parser/codex_provider.go | 627 ++++++++++++++++++ internal/parser/codex_provider_test.go | 368 ++++++++++ internal/parser/discovery.go | 104 --- internal/parser/discovery_test.go | 4 +- internal/parser/provider.go | 2 + internal/parser/provider_migration.go | 2 +- internal/parser/provider_shim_scan_test.go | 6 +- internal/parser/provider_test.go | 22 +- internal/parser/skill_inference_test.go | 4 +- internal/parser/types.go | 2 - internal/sync/engine.go | 393 ++++++++--- internal/sync/engine_integration_test.go | 98 +++ internal/sync/engine_test.go | 191 +++++- internal/sync/parsediff.go | 21 +- internal/sync/provider_shadow_caller_test.go | 36 + internal/sync/provider_shadow_codex_test.go | 83 +++ internal/sync/provider_shadow_support_test.go | 19 + 20 files changed, 1904 insertions(+), 292 deletions(-) create mode 100644 internal/parser/codex_provider.go create mode 100644 internal/parser/codex_provider_test.go create mode 100644 internal/sync/provider_shadow_codex_test.go create mode 100644 internal/sync/provider_shadow_support_test.go diff --git a/cmd/agentsview/token_use.go b/cmd/agentsview/token_use.go index bccd5e55d..8d3799603 100644 --- a/cmd/agentsview/token_use.go +++ b/cmd/agentsview/token_use.go @@ -86,11 +86,11 @@ func resolveRawSessionID( // Canonical disk probe: if the input starts with a known // agent prefix, trust that interpretation first and strip - // before calling FindSourceFunc (which rejects IDs with + // before resolving the source (which rejects IDs with // colons via IsValidSessionID). for _, def := range parser.Registry { if def.IDPrefix == "" || !def.FileBased || - def.FindSourceFunc == nil { + !agentHasDiskSourceLookup(def) { continue } if !strings.HasPrefix(input, def.IDPrefix) { @@ -98,7 +98,7 @@ func resolveRawSessionID( } bareID := strings.TrimPrefix(input, def.IDPrefix) for _, dir := range agentDirs[def.Type] { - if def.FindSourceFunc(dir, bareID) != "" { + if findAgentSourceFile(def, dir, bareID) != "" { return input, true } } @@ -110,11 +110,11 @@ func resolveRawSessionID( // colon-bearing raw IDs (Kimi, OpenClaw, Kiro IDE) may // match. for _, def := range parser.Registry { - if !def.FileBased || def.FindSourceFunc == nil { + if !def.FileBased || !agentHasDiskSourceLookup(def) { continue } for _, dir := range agentDirs[def.Type] { - if def.FindSourceFunc(dir, input) != "" { + if findAgentSourceFile(def, dir, input) != "" { return def.IDPrefix + input, true } } @@ -123,6 +123,63 @@ func resolveRawSessionID( return input, false } +// agentHasDiskSourceLookup reports whether a session source can be located on +// disk by raw ID for the agent: via the legacy AgentDef FindSourceFunc hook, or +// via a provider-authoritative provider's FindSource for agents whose lookup was +// folded onto the provider (e.g. Codex). +func agentHasDiskSourceLookup(def parser.AgentDef) bool { + if def.FindSourceFunc != nil { + return true + } + if parser.ProviderMigrationModes()[def.Type] != + parser.ProviderMigrationProviderAuthoritative { + return false + } + _, ok := parser.ProviderFactoryByType(def.Type) + return ok +} + +// findAgentSourceFile resolves a raw agent session ID to an on-disk source path +// under dir, using the legacy FindSourceFunc when present and otherwise the +// provider's FindSource (RawSessionID lookup). Returns "" when no source +// resolves or the agent has no on-disk lookup. +func findAgentSourceFile(def parser.AgentDef, dir, rawID string) string { + if def.FindSourceFunc != nil { + return def.FindSourceFunc(dir, rawID) + } + factory, ok := parser.ProviderFactoryByType(def.Type) + if !ok { + return "" + } + provider := factory.NewProvider(parser.ProviderConfig{Roots: []string{dir}}) + source, found, err := provider.FindSource( + context.Background(), + parser.FindSourceRequest{RawSessionID: rawID}, + ) + if err != nil || !found { + return "" + } + if path, ok := providerSourcePath(source); ok { + return path + } + return "" +} + +// providerSourcePath extracts the on-disk path a provider SourceRef points to, +// preferring the display path and falling back to the fingerprint key or key. +func providerSourcePath(source parser.SourceRef) (string, bool) { + for _, candidate := range []string{ + source.DisplayPath, + source.FingerprintKey, + source.Key, + } { + if candidate != "" { + return candidate, true + } + } + return "", false +} + // usageExitCode classifies a SessionUsage into an exit code: 2 when // the session is not in the DB, 0 when token data OR cost is present, // 3 when the session exists but has neither. Cost-only sessions diff --git a/internal/parser/codex.go b/internal/parser/codex.go index e8e463e5c..47045462b 100644 --- a/internal/parser/codex.go +++ b/internal/parser/codex.go @@ -1299,11 +1299,12 @@ func IsCodexExecSessionFile(path string) bool { return false } -// ParseCodexSession parses a Codex JSONL session file. -// The includeExec parameter is retained for backward -// compatibility; exec-originated sessions are now always -// parsed and imported. -func ParseCodexSession( +// parseSession parses a Codex JSONL session file into a session and its +// messages. The includeExec parameter is retained for backward compatibility; +// exec-originated sessions are now always parsed and imported. This is the +// provider-owned parse entrypoint; the package-level free function was folded +// onto the provider. +func (p *codexProvider) parseSession( path, machine string, includeExec bool, ) (*ParsedSession, []ParsedMessage, error) { info, err := os.Stat(path) @@ -1650,12 +1651,13 @@ func CodexTranscriptConsumedSize(path string) (int64, error) { return readJSONLFrom(path, 0, func(line string) {}) } -// ParseCodexSessionFrom parses only new lines from a Codex -// JSONL file starting at the given byte offset. Returns only -// the newly parsed messages (with ordinals starting at -// startOrdinal) and the latest timestamp seen. Used for -// incremental re-parsing of large append-only session files. -func ParseCodexSessionFrom( +// parseSessionFrom parses only new lines from a Codex JSONL file starting at +// the given byte offset. Returns only the newly parsed messages (with ordinals +// starting at startOrdinal) and the latest timestamp seen. Used for incremental +// re-parsing of large append-only session files. This is the provider-owned +// incremental parse entrypoint; the package-level free function was folded onto +// the provider. +func (p *codexProvider) parseSessionFrom( path string, offset int64, startOrdinal int, diff --git a/internal/parser/codex_parser_test.go b/internal/parser/codex_parser_test.go index bda04ee29..c21ac6437 100644 --- a/internal/parser/codex_parser_test.go +++ b/internal/parser/codex_parser_test.go @@ -19,11 +19,74 @@ func runCodexParserTest(t *testing.T, fileName, content string, includeExec bool fileName = "test.jsonl" } path := createTestFile(t, fileName, content) - sess, msgs, err := ParseCodexSession(path, "local", includeExec) + sess, msgs, err := parseCodexTestSession(t, path, "local", includeExec) require.NoError(t, err) return sess, msgs } +// newCodexTestProvider builds a concrete codexProvider so package tests can +// exercise the folded parse, discovery, and source-lookup behavior directly +// through provider methods now that the package-level ParseCodexSession, +// ParseCodexSessionFrom, DiscoverCodexSessions, and FindCodexSourceFile free +// functions are gone. +func newCodexTestProvider(t *testing.T, roots ...string) *codexProvider { + t.Helper() + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: roots, + Machine: "local", + }) + require.True(t, ok) + cp, ok := provider.(*codexProvider) + require.True(t, ok) + return cp +} + +// parseCodexTestSession parses a Codex session through the provider-owned +// parseSession method, replacing the removed package-level ParseCodexSession. +func parseCodexTestSession( + t *testing.T, path, machine string, includeExec bool, +) (*ParsedSession, []ParsedMessage, error) { + t.Helper() + return newCodexTestProvider(t).parseSession(path, machine, includeExec) +} + +// parseCodexTestSessionFrom parses appended Codex lines through the +// provider-owned parseSessionFrom method, replacing the removed package-level +// ParseCodexSessionFrom. +func parseCodexTestSessionFrom( + t *testing.T, path string, offset int64, startOrdinal int, includeExec bool, +) ([]ParsedMessage, time.Time, int64, error) { + t.Helper() + return newCodexTestProvider(t).parseSessionFrom(path, offset, startOrdinal, includeExec) +} + +// discoverCodexTestSessions discovers Codex session paths under root through +// the provider source set, returning the legacy DiscoveredFile shape the tests +// assert against, replacing the removed DiscoverCodexSessions. +func discoverCodexTestSessions(t *testing.T, root string) []DiscoveredFile { + t.Helper() + provider := newCodexTestProvider(t, root) + paths := provider.sources.discoverSessionPaths(root) + if len(paths) == 0 { + return nil + } + files := make([]DiscoveredFile, 0, len(paths)) + for _, path := range paths { + files = append(files, DiscoveredFile{ + Path: path, + Agent: AgentCodex, + }) + } + return files +} + +// findCodexTestSourceFile resolves a Codex session UUID to a transcript path +// through the provider source set, replacing the removed FindCodexSourceFile. +func findCodexTestSourceFile(t *testing.T, root, sessionID string) string { + t.Helper() + return newCodexTestProvider(t, root).sources.findSourceFile(root, sessionID) +} + func assertToolResultEvents( t *testing.T, got []ParsedToolResultEvent, @@ -65,7 +128,7 @@ func TestParseCodexSession_UsesThreadNameFromSessionIndex(t *testing.T) { index := `{"id":"abc-123","thread_name":"Renamed from Codex","updated_at":"2026-06-11T17:34:20.3755243Z"}` + "\n" require.NoError(t, os.WriteFile(indexPath, []byte(index), 0o644)) - sess, msgs, err := ParseCodexSession(sessionPath, "local", false) + sess, msgs, err := parseCodexTestSession(t, sessionPath, "local", false) require.NoError(t, err) require.NotNil(t, sess) assert.Equal(t, "Renamed from Codex", sess.SessionName) @@ -86,7 +149,7 @@ func TestParseCodexSession_LeavesSessionNameEmptyWithoutThreadName(t *testing.T) index := `{"id":"abc-123","updated_at":"2026-06-11T17:34:20.3755243Z"}` + "\n" require.NoError(t, os.WriteFile(indexPath, []byte(index), 0o644)) - sess, _, err := ParseCodexSession(sessionPath, "local", false) + sess, _, err := parseCodexTestSession(t, sessionPath, "local", false) require.NoError(t, err) require.NotNil(t, sess) assert.Empty(t, sess.SessionName) @@ -106,7 +169,7 @@ func TestParseCodexSession_UsesThreadNameFromArchivedSessions(t *testing.T) { index := `{"id":"abc-123","thread_name":"Archived title","updated_at":"2026-06-11T17:34:20.3755243Z"}` + "\n" require.NoError(t, os.WriteFile(indexPath, []byte(index), 0o644)) - sess, _, err := ParseCodexSession(sessionPath, "local", false) + sess, _, err := parseCodexTestSession(t, sessionPath, "local", false) require.NoError(t, err) require.NotNil(t, sess) assert.Equal(t, "Archived title", sess.SessionName) @@ -125,7 +188,7 @@ func TestParseCodexSession_MtimeIncludesSessionIndex(t *testing.T) { index := `{"id":"abc-123","thread_name":"Original","updated_at":"2026-06-11T17:34:20Z"}` + "\n" require.NoError(t, os.WriteFile(indexPath, []byte(index), 0o644)) - sess1, _, err := ParseCodexSession(sessionPath, "local", false) + sess1, _, err := parseCodexTestSession(t, sessionPath, "local", false) require.NoError(t, err) mtime1 := sess1.File.Mtime @@ -135,7 +198,7 @@ func TestParseCodexSession_MtimeIncludesSessionIndex(t *testing.T) { require.NoError(t, os.WriteFile(indexPath, []byte(renamed), 0o644)) require.NoError(t, os.Chtimes(indexPath, future, future)) - sess2, _, err := ParseCodexSession(sessionPath, "local", false) + sess2, _, err := parseCodexTestSession(t, sessionPath, "local", false) require.NoError(t, err) assert.Greater(t, sess2.File.Mtime, mtime1, "mtime must advance when session_index.jsonl is updated") assert.Equal(t, "Renamed", sess2.SessionName) @@ -1324,7 +1387,7 @@ func TestParseCodexSessionFrom_ForkReplaySpansOffset(t *testing.T) { ) path := createTestFile(t, "fork-incremental.jsonl", initial) - sess, msgs, err := ParseCodexSession(path, "local", false) + sess, msgs, err := parseCodexTestSession(t, path, "local", false) require.NoError(t, err) require.NotNil(t, sess) assert.Equal(t, "codex:"+forkID, sess.ID) @@ -1350,7 +1413,7 @@ func TestParseCodexSessionFrom_ForkReplaySpansOffset(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - newMsgs, _, _, err := ParseCodexSessionFrom(path, offset, 0, false) + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, 0, false) require.NoError(t, err) // Only the genuine turn survives; the replayed assistant answer @@ -1621,7 +1684,7 @@ func TestParseCodexSessionFrom_Incremental(t *testing.T) { path := createTestFile(t, "incremental.jsonl", initial) // Full parse to get baseline. - sess, msgs, err := ParseCodexSession(path, "local", false) + sess, msgs, err := parseCodexTestSession(t, path, "local", false) require.NoError(t, err) require.NotNil(t, sess) assert.Equal(t, "codex:inc-1", sess.ID) @@ -1651,7 +1714,7 @@ func TestParseCodexSessionFrom_Incremental(t *testing.T) { require.NoError(t, f.Close()) // Incremental parse from the offset. - newMsgs, endedAt, _, err := ParseCodexSessionFrom( + newMsgs, endedAt, _, err := parseCodexTestSessionFrom(t, path, offset, 1, false, ) require.NoError(t, err) @@ -1703,7 +1766,7 @@ func TestParseCodexSessionFrom_LateTokenCountRequiresFullParse(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 2, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 2, false) require.Error(t, err) assert.True(t, IsIncrementalFullParseFallback(err)) } @@ -1738,7 +1801,7 @@ func TestParseCodexSessionFrom_FunctionCallOutputRequiresFullParse(t *testing.T) require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 2, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 2, false) require.Error(t, err) assert.True(t, IsIncrementalFullParseFallback(err)) } @@ -1768,7 +1831,7 @@ func TestParseCodexSessionFrom_DedupsReemittedPrompt(t *testing.T) { testjsonl.CodexMsgJSON("assistant", "looking", tsEarlyS5), ) path := createTestFile(t, "incremental.jsonl", initial) - sess, msgs, err := ParseCodexSession(path, "local", false) + sess, msgs, err := parseCodexTestSession(t, path, "local", false) require.NoError(t, err) require.Equal(t, 1, sess.UserMessageCount) require.Len(t, msgs, 2) @@ -1782,7 +1845,7 @@ func TestParseCodexSessionFrom_DedupsReemittedPrompt(t *testing.T) { testjsonl.CodexMsgJSON("assistant", "No issues found.", tsLateS5), )) - newMsgs, _, _, err := ParseCodexSessionFrom(path, offset, len(msgs), false) + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, len(msgs), false) require.NoError(t, err) require.Len(t, newMsgs, 2) assert.Equal(t, RoleUser, newMsgs[0].Role) @@ -1797,7 +1860,7 @@ func TestParseCodexSessionFrom_DedupsReemittedPrompt(t *testing.T) { testjsonl.CodexMsgJSON("assistant", "looking", tsEarlyS5), ) path := createTestFile(t, "incremental.jsonl", initial) - sess, msgs, err := ParseCodexSession(path, "local", false) + sess, msgs, err := parseCodexTestSession(t, path, "local", false) require.NoError(t, err) require.Equal(t, 1, sess.UserMessageCount) require.Len(t, msgs, 2) @@ -1812,7 +1875,7 @@ func TestParseCodexSessionFrom_DedupsReemittedPrompt(t *testing.T) { testjsonl.CodexMsgJSON("assistant", "No issues found.", tsLateS5), )) - newMsgs, _, _, err := ParseCodexSessionFrom(path, offset, len(msgs), false) + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, len(msgs), false) require.NoError(t, err) require.Len(t, newMsgs, 1) assert.Equal(t, RoleAssistant, newMsgs[0].Role) @@ -1829,7 +1892,7 @@ func TestParseCodexSessionFrom_DedupsReemittedPrompt(t *testing.T) { testjsonl.CodexMsgJSON("user", "something else entirely", "2024-01-01T10:00:03Z"), ) path := createTestFile(t, "incremental.jsonl", initial) - sess, msgs, err := ParseCodexSession(path, "local", false) + sess, msgs, err := parseCodexTestSession(t, path, "local", false) require.NoError(t, err) require.Equal(t, 2, sess.UserMessageCount) @@ -1839,7 +1902,7 @@ func TestParseCodexSessionFrom_DedupsReemittedPrompt(t *testing.T) { appendLines(t, path, testjsonl.CodexMsgJSON("user", prompt, tsLate)) - newMsgs, _, _, err := ParseCodexSessionFrom(path, offset, len(msgs), false) + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, len(msgs), false) require.NoError(t, err) require.Len(t, newMsgs, 1) assert.Equal(t, RoleUser, newMsgs[0].Role) @@ -1878,7 +1941,7 @@ func TestParseCodexSessionFrom_SkipsSessionMeta(t *testing.T) { f.WriteString(extra) f.Close() - newMsgs, _, _, err := ParseCodexSessionFrom( + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, 5, false, ) require.NoError(t, err) @@ -1902,7 +1965,7 @@ func TestParseCodexSessionFrom_NoNewData(t *testing.T) { offset := info.Size() // Parse from end of file — no new data. - newMsgs, endedAt, _, err := ParseCodexSessionFrom( + newMsgs, endedAt, _, err := parseCodexTestSessionFrom(t, path, offset, 10, false, ) require.NoError(t, err) @@ -1935,7 +1998,7 @@ func TestParseCodexSessionFrom_SubagentOutputRequiresFullParse(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 2, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 2, false) require.Error(t, err) assert.Contains(t, err.Error(), "full parse") } @@ -1966,7 +2029,7 @@ func TestParseCodexSessionFrom_CollabAgentSpawnEndRequiresFullParse(t *testing.T require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 2, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 2, false) require.Error(t, err) assert.Contains(t, err.Error(), "full parse") } @@ -2004,7 +2067,7 @@ func TestParseCodexSessionFrom_WaitCallRequiresFullParse(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 4, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 4, false) require.Error(t, err) assert.Contains(t, err.Error(), "full parse") } @@ -2042,7 +2105,7 @@ func TestParseCodexSessionFrom_WaitAgentCallRequiresFullParse(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 4, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 4, false) require.Error(t, err) assert.Contains(t, err.Error(), "full parse") } @@ -2068,7 +2131,7 @@ func TestParseCodexSessionFrom_SystemMessageDoesNotRequireFullParse(t *testing.T require.NoError(t, err) require.NoError(t, f.Close()) - newMsgs, endedAt, _, err := ParseCodexSessionFrom(path, offset, 1, false) + newMsgs, endedAt, _, err := parseCodexTestSessionFrom(t, path, offset, 1, false) require.NoError(t, err) assert.Equal(t, 0, len(newMsgs)) assert.False(t, endedAt.IsZero()) @@ -2099,7 +2162,7 @@ func TestParseCodexSessionFrom_RunningNotificationRequiresFullParse(t *testing.T require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 1, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 1, false) require.Error(t, err) assert.Contains(t, err.Error(), "full parse") } @@ -2125,7 +2188,7 @@ func TestParseCodexSessionFrom_NonSubagentFunctionOutputRequiresFullParse(t *tes require.NoError(t, err) require.NoError(t, f.Close()) - _, _, _, err = ParseCodexSessionFrom(path, offset, 1, false) + _, _, _, err = parseCodexTestSessionFrom(t, path, offset, 1, false) require.Error(t, err) assert.True(t, IsIncrementalFullParseFallback(err)) } @@ -2166,7 +2229,7 @@ func TestParseCodexSessionFrom_SeedsModelFromTurnContext( require.NoError(t, err) require.NoError(t, f2.Close()) - newMsgs2, _, _, err := ParseCodexSessionFrom( + newMsgs2, _, _, err := parseCodexTestSessionFrom(t, path, offset, 2, false, ) require.NoError(t, err) @@ -2213,7 +2276,7 @@ func TestParseCodexSessionFrom_SeedsBoundaryAfterTurnContext( require.NoError(t, err) require.NoError(t, f.Close()) - newMsgs, _, _, err := ParseCodexSessionFrom( + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, 0, false, ) require.NoError(t, err) @@ -2262,7 +2325,7 @@ func TestParseCodexSessionFrom_EmptyModelReset( require.NoError(t, err) require.NoError(t, f.Close()) - newMsgs, _, _, err := ParseCodexSessionFrom( + newMsgs, _, _, err := parseCodexTestSessionFrom(t, path, offset, 2, false, ) require.NoError(t, err) diff --git a/internal/parser/codex_provider.go b/internal/parser/codex_provider.go new file mode 100644 index 000000000..9554d7db5 --- /dev/null +++ b/internal/parser/codex_provider.go @@ -0,0 +1,627 @@ +package parser + +import ( + "context" + "fmt" + "os" + "path/filepath" + "slices" + "strings" +) + +var _ Provider = (*codexProvider)(nil) + +type codexProviderFactory struct { + def AgentDef +} + +func newCodexProviderFactory(def AgentDef) ProviderFactory { + return codexProviderFactory{def: cloneAgentDef(def)} +} + +func (f codexProviderFactory) Definition() AgentDef { + return cloneAgentDef(f.def) +} + +func (f codexProviderFactory) Capabilities() Capabilities { + return codexProviderCapabilities() +} + +func (f codexProviderFactory) NewProvider(cfg ProviderConfig) Provider { + cfg = cfg.Clone() + return &codexProvider{ + ProviderBase: ProviderBase{ + Def: cloneAgentDef(f.def), + Caps: codexProviderCapabilities(), + Config: cfg, + }, + sources: newCodexSourceSet(cfg.Roots), + } +} + +type codexProvider struct { + ProviderBase + sources codexSourceSet +} + +func (p *codexProvider) Discover(ctx context.Context) ([]SourceRef, error) { + return p.sources.Discover(ctx) +} + +func (p *codexProvider) WatchPlan(ctx context.Context) (WatchPlan, error) { + return p.sources.WatchPlan(ctx) +} + +func (p *codexProvider) SourcesForChangedPath( + ctx context.Context, + req ChangedPathRequest, +) ([]SourceRef, error) { + return p.sources.SourcesForChangedPath(ctx, req) +} + +func (p *codexProvider) FindSource( + ctx context.Context, + req FindSourceRequest, +) (SourceRef, bool, error) { + req = providerFindRequestWithRawSessionID(p.Def, req) + return p.sources.FindSource(ctx, req) +} + +// AllSourcePathsForUUID returns every on-disk Codex transcript path under the +// provider's roots whose filename carries the given session UUID, without the +// live-over-archived deduplication Discover applies. A UUID can exist as both a +// live dated copy and a flat archived copy under the same root; the sync engine +// uses the full set so an mtime cutoff can judge each copy independently. +func (p *codexProvider) AllSourcePathsForUUID(uuid string) []string { + if uuid == "" { + return nil + } + seen := make(map[string]struct{}) + var paths []string + for _, root := range p.sources.roots { + for _, path := range p.sources.discoverSessionPaths(root) { + if CodexSessionUUIDFromFilename(filepath.Base(path)) != uuid { + continue + } + clean := filepath.Clean(path) + if _, ok := seen[clean]; ok { + continue + } + seen[clean] = struct{}{} + paths = append(paths, path) + } + } + return paths +} + +// SourceRefForPath builds a SourceRef pinned to the exact transcript path, +// without live-over-archived canonicalization. Discovery, raw-ID lookup, and +// fresh stored-source lookup still prefer the live dated transcript, but +// changed-path events and non-fresh stored paths preserve the already-selected +// on-disk copy. The sync engine uses this when its DB-aware or mtime-aware +// logic has chosen a duplicated Codex UUID source (e.g. a stored archived copy +// or the cutoff-newer copy), so that choice is honored instead of being flipped +// back to the preferred dated layout. Returns false when the path is not a +// recognizable Codex source. +func (p *codexProvider) SourceRefForPath(path string) (SourceRef, bool) { + for _, root := range p.sources.roots { + if source, ok := p.sources.sourceRef(root, path, true); ok { + return source, true + } + if source, ok := p.sources.directPathSource(root, path, true); ok { + return source, true + } + } + return SourceRef{}, false +} + +func (p *codexProvider) Fingerprint( + ctx context.Context, + source SourceRef, +) (SourceFingerprint, error) { + return p.sources.Fingerprint(ctx, source) +} + +func (p *codexProvider) Parse( + ctx context.Context, + req ParseRequest, +) (ParseOutcome, error) { + if err := ctx.Err(); err != nil { + return ParseOutcome{}, err + } + path, ok := p.sources.pathFromSource(req.Source) + if !ok { + return ParseOutcome{}, fmt.Errorf("codex source path unavailable") + } + machine := firstNonEmptyJSONLString(req.Machine, p.Config.Machine) + sess, msgs, err := p.parseSession(path, machine, false) + if err != nil { + return ParseOutcome{}, err + } + if sess == nil { + return ParseOutcome{ + ResultSetComplete: true, + SkipReason: SkipNoSession, + }, nil + } + if req.Fingerprint.Hash != "" { + sess.File.Hash = req.Fingerprint.Hash + } + return ParseOutcome{ + Results: []ParseResultOutcome{{ + Result: ParseResult{ + Session: *sess, + Messages: msgs, + }, + DataVersion: DataVersionCurrent, + }}, + ResultSetComplete: true, + // Codex transcripts are append-only and the provider always emits a full + // parse (it does not advertise incremental append). A full parse is the + // authoritative message set, so force-replace the stored rows; this + // preserves the legacy behavior where a late token_count line appended to + // an existing turn rewrites the stored message instead of being dropped by + // an append-only write. + ForceReplace: true, + }, nil +} + +func (p *codexProvider) ParseIncremental( + ctx context.Context, + _ IncrementalRequest, +) (IncrementalOutcome, IncrementalStatus, error) { + if err := ctx.Err(); err != nil { + return IncrementalOutcome{}, IncrementalUnsupported, err + } + return IncrementalOutcome{}, IncrementalUnsupported, nil +} + +type codexSource struct { + Root string + Path string + UUID string + Layout CodexLayout +} + +type codexSourceSet struct { + roots []string +} + +func newCodexSourceSet(roots []string) codexSourceSet { + return codexSourceSet{roots: cleanJSONLRoots(roots)} +} + +func (s codexSourceSet) Discover(ctx context.Context) ([]SourceRef, error) { + return s.discover(ctx, func(string) bool { return true }) +} + +func (s codexSourceSet) discover( + ctx context.Context, + includeRoot func(string) bool, +) ([]SourceRef, error) { + var sources []SourceRef + byKey := make(map[string]SourceRef) + for _, root := range s.roots { + if err := ctx.Err(); err != nil { + return nil, err + } + if !includeRoot(root) { + continue + } + for _, path := range s.discoverSessionPaths(root) { + source, ok := s.sourceRef(root, path, true) + if !ok { + source, ok = s.directPathSource(root, path, true) + } + if !ok { + continue + } + if current, ok := byKey[source.Key]; ok && + !preferCodexSource(source, current) { + continue + } + byKey[source.Key] = source + } + } + for _, source := range byKey { + sources = append(sources, source) + } + sortJSONLSources(sources) + return sources, nil +} + +// discoverSessionPaths finds all Codex JSONL session file paths under +// sessionsDir, covering both the standard year/month/day layout and a flat +// archived directory. Paths are returned sorted for deterministic discovery, +// matching the behavior the package-level entrypoint provided before the fold. +func (s codexSourceSet) discoverSessionPaths(sessionsDir string) []string { + var paths []string + + entries, err := os.ReadDir(sessionsDir) + if err != nil { + return nil + } + for _, entry := range entries { + if entry.IsDir() { + continue + } + if !isCodexSessionFilename(entry.Name()) { + continue + } + paths = append(paths, filepath.Join(sessionsDir, entry.Name())) + } + + walkCodexDayDirs(sessionsDir, func(dayPath string) bool { + dayEntries, err := os.ReadDir(dayPath) + if err != nil { + return true + } + for _, sf := range dayEntries { + if sf.IsDir() { + continue + } + if !isCodexSessionFilename(sf.Name()) { + continue + } + paths = append(paths, filepath.Join(dayPath, sf.Name())) + } + return true + }) + + slices.Sort(paths) + return paths +} + +// findSourceFile resolves a Codex session file by UUID under sessionsDir. +// It prefers the standard year/month/day live path when present, then falls +// back to a flat archived directory entry, matching the lookup precedence the +// package-level entrypoint provided before the fold. +func (s codexSourceSet) findSourceFile(sessionsDir, sessionID string) string { + if !IsValidSessionID(sessionID) { + return "" + } + + var archived string + entries, err := os.ReadDir(sessionsDir) + if err == nil { + for _, f := range entries { + if f.IsDir() { + continue + } + name := f.Name() + if !isCodexSessionFilename(name) { + continue + } + if extractUUIDFromRollout(name) == sessionID { + archived = filepath.Join(sessionsDir, name) + break + } + } + } + + var live string + walkCodexDayDirs(sessionsDir, func(dayPath string) bool { + if live != "" { + return false + } + dayEntries, err := os.ReadDir(dayPath) + if err != nil { + return true + } + for _, f := range dayEntries { + if f.IsDir() { + continue + } + name := f.Name() + if !isCodexSessionFilename(name) { + continue + } + if extractUUIDFromRollout(name) == sessionID { + live = filepath.Join(dayPath, name) + return false + } + } + return true + }) + if live != "" { + return live + } + return archived +} + +func (s codexSourceSet) WatchPlan(context.Context) (WatchPlan, error) { + roots := make([]WatchRoot, 0, len(s.roots)*2) + seenShallow := make(map[string]struct{}) + for _, root := range s.roots { + roots = append(roots, WatchRoot{ + Path: root, + Recursive: true, + IncludeGlobs: []string{"*.jsonl"}, + DebounceKey: string(AgentCodex) + ":sessions:" + root, + }) + for _, shallow := range ResolveCodexShallowWatchRoots(root) { + shallow = filepath.Clean(shallow) + if _, ok := seenShallow[shallow]; ok { + continue + } + seenShallow[shallow] = struct{}{} + roots = append(roots, WatchRoot{ + Path: shallow, + Recursive: false, + IncludeGlobs: []string{CodexSessionIndexFilename}, + DebounceKey: string(AgentCodex) + ":index:" + shallow, + }) + } + } + return WatchPlan{Roots: roots}, nil +} + +func (s codexSourceSet) SourcesForChangedPath( + ctx context.Context, + req ChangedPathRequest, +) ([]SourceRef, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + if filepath.Base(req.Path) == CodexSessionIndexFilename { + return s.sourcesForIndexPath(ctx, req.Path) + } + for _, root := range s.roots { + source, ok := s.sourceRef(root, req.Path, true) + if !ok { + source, ok = s.directPathSource(root, req.Path, true) + } + if ok { + return []SourceRef{source}, nil + } + if !jsonlMissingPathFallbackAllowed(req) { + continue + } + source, ok = s.sourceRef(root, req.Path, false) + if !ok { + source, ok = s.directPathSource(root, req.Path, false) + } + if ok { + return []SourceRef{source}, nil + } + } + return nil, nil +} + +func (s codexSourceSet) FindSource( + ctx context.Context, + req FindSourceRequest, +) (SourceRef, bool, error) { + if err := ctx.Err(); err != nil { + return SourceRef{}, false, err + } + for _, path := range []string{req.StoredFilePath, req.FingerprintKey} { + if path == "" { + continue + } + for _, root := range s.roots { + if source, ok := s.sourceRef(root, path, true); ok { + if !req.RequireFreshSource || req.PreferStoredSource { + return source, true, nil + } + return s.canonicalSource(ctx, source) + } + if source, ok := s.directPathSource(root, path, true); ok { + if !req.RequireFreshSource || req.PreferStoredSource { + return source, true, nil + } + return s.canonicalSource(ctx, source) + } + } + } + if req.RawSessionID == "" { + return SourceRef{}, false, nil + } + for _, root := range s.roots { + path := s.findSourceFile(root, req.RawSessionID) + if path == "" { + continue + } + if source, ok := s.sourceRef(root, path, true); ok { + return s.canonicalSource(ctx, source) + } + } + return SourceRef{}, false, nil +} + +func (s codexSourceSet) Fingerprint( + ctx context.Context, + source SourceRef, +) (SourceFingerprint, error) { + if err := ctx.Err(); err != nil { + return SourceFingerprint{}, err + } + path, ok := s.pathFromSource(source) + if !ok { + return SourceFingerprint{}, fmt.Errorf("codex source path unavailable") + } + info, err := os.Stat(path) + if err != nil { + return SourceFingerprint{}, fmt.Errorf("stat %s: %w", path, err) + } + if info.IsDir() { + return SourceFingerprint{}, fmt.Errorf("stat %s: source is a directory", path) + } + hash, err := hashJSONLSourceFile(path) + if err != nil { + return SourceFingerprint{}, err + } + return SourceFingerprint{ + Key: firstNonEmptyJSONLString(source.FingerprintKey, source.Key, path), + Size: info.Size(), + MTimeNS: CodexEffectiveMtime(path, info.ModTime().UnixNano()), + Hash: hash, + }, nil +} + +func (s codexSourceSet) pathFromSource(source SourceRef) (string, bool) { + switch src := source.Opaque.(type) { + case codexSource: + return src.Path, src.Path != "" + case *codexSource: + if src != nil && src.Path != "" { + return src.Path, true + } + } + for _, candidate := range []string{ + source.DisplayPath, + source.FingerprintKey, + source.Key, + } { + for _, root := range s.roots { + if ref, ok := s.sourceRef(root, candidate, true); ok { + src := ref.Opaque.(codexSource) + return src.Path, true + } + if ref, ok := s.directPathSource(root, candidate, true); ok { + src := ref.Opaque.(codexSource) + return src.Path, true + } + } + } + return "", false +} + +func (s codexSourceSet) sourcesForIndexPath( + ctx context.Context, + indexPath string, +) ([]SourceRef, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + indexDir := filepath.Dir(indexPath) + return s.discover(ctx, func(root string) bool { + return filepath.Dir(root) == indexDir + }) +} + +func (s codexSourceSet) sourceRef( + root string, + path string, + requireRegular bool, +) (SourceRef, bool) { + root = filepath.Clean(root) + path = filepath.Clean(path) + layout, uuid, ok := CodexSessionPathInfo(root, path) + if !ok || uuid == "" { + return SourceRef{}, false + } + if requireRegular && !IsRegularFile(path) { + return SourceRef{}, false + } + return SourceRef{ + Provider: AgentCodex, + Key: codexSourceKey(uuid), + DisplayPath: path, + FingerprintKey: path, + Opaque: codexSource{ + Root: root, + Path: path, + UUID: uuid, + Layout: layout, + }, + }, true +} + +func (s codexSourceSet) directPathSource( + root string, + path string, + requireRegular bool, +) (SourceRef, bool) { + root = filepath.Clean(root) + path = filepath.Clean(path) + if !strings.HasSuffix(path, ".jsonl") || !pathIsUnderRoot(path, root) { + return SourceRef{}, false + } + if requireRegular && !IsRegularFile(path) { + return SourceRef{}, false + } + return SourceRef{ + Provider: AgentCodex, + Key: path, + DisplayPath: path, + FingerprintKey: path, + Opaque: codexSource{ + Root: root, + Path: path, + }, + }, true +} + +func (s codexSourceSet) canonicalSource( + ctx context.Context, + source SourceRef, +) (SourceRef, bool, error) { + src, ok := source.Opaque.(codexSource) + if !ok || src.UUID == "" { + return source, true, nil + } + best := source + for _, root := range s.roots { + if err := ctx.Err(); err != nil { + return SourceRef{}, false, err + } + path := s.findSourceFile(root, src.UUID) + if path == "" { + continue + } + candidate, ok := s.sourceRef(root, path, true) + if !ok { + continue + } + if preferCodexSource(candidate, best) { + best = candidate + } + } + return best, true, nil +} + +func codexSourceKey(uuid string) string { + return string(AgentCodex) + ":" + uuid +} + +func preferCodexSource(candidate, current SourceRef) bool { + cand := candidate.Opaque.(codexSource) + curr := current.Opaque.(codexSource) + if cand.Layout != curr.Layout { + return cand.Layout == CodexLayoutDated + } + return candidate.DisplayPath < current.DisplayPath +} + +func codexProviderCapabilities() Capabilities { + return Capabilities{ + Source: SourceCapabilities{ + DiscoverSources: CapabilitySupported, + WatchSources: CapabilitySupported, + ClassifyChangedPath: CapabilitySupported, + FindSource: CapabilitySupported, + CompositeFingerprint: CapabilitySupported, + IncrementalAppend: CapabilityNotApplicable, + MultiSessionSource: CapabilityNotApplicable, + PerSessionErrors: CapabilityNotApplicable, + ExcludedSessions: CapabilityNotApplicable, + ForceReplaceOnParse: CapabilitySupported, + }, + Content: ContentCapabilities{ + FirstMessage: CapabilitySupported, + SessionName: CapabilitySupported, + Cwd: CapabilitySupported, + GitBranch: CapabilitySupported, + Relationships: CapabilitySupported, + Subagents: CapabilitySupported, + Thinking: CapabilitySupported, + ToolCalls: CapabilitySupported, + ToolResults: CapabilitySupported, + ToolResultEvents: CapabilitySupported, + PerMessageTokenUsage: CapabilitySupported, + TerminationStatus: CapabilitySupported, + Model: CapabilitySupported, + }, + } +} diff --git a/internal/parser/codex_provider_test.go b/internal/parser/codex_provider_test.go new file mode 100644 index 000000000..f86267e43 --- /dev/null +++ b/internal/parser/codex_provider_test.go @@ -0,0 +1,368 @@ +package parser + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.kenn.io/agentsview/internal/testjsonl" +) + +func TestCodexProviderFactoryReplacesLegacyAdapter(t *testing.T) { + factory, ok := ProviderFactoryByType(AgentCodex) + require.True(t, ok) + require.NotNil(t, factory) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: []string{t.TempDir()}, + Machine: "devbox", + }) + require.True(t, ok) + require.NotNil(t, provider) +} + +func TestCodexProviderSourceMethods(t *testing.T) { + base := t.TempDir() + root := filepath.Join(base, "sessions") + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e1" + sourcePath := writeCodexProviderSession(t, root, uuid, "Rename me") + indexPath := filepath.Join(base, CodexSessionIndexFilename) + require.NoError(t, os.WriteFile(indexPath, []byte( + `{"id":"`+uuid+`","thread_name":"Renamed title","updated_at":"2026-06-11T17:34:20Z"}`+"\n", + ), 0o644)) + newer := time.Now().Add(time.Hour) + require.NoError(t, os.Chtimes(indexPath, newer, newer)) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + plan, err := provider.WatchPlan(context.Background()) + require.NoError(t, err) + require.Len(t, plan.Roots, 2) + assert.Equal(t, root, plan.Roots[0].Path) + assert.True(t, plan.Roots[0].Recursive) + assert.Equal(t, []string{"*.jsonl"}, plan.Roots[0].IncludeGlobs) + assert.Equal(t, base, plan.Roots[1].Path) + assert.False(t, plan.Roots[1].Recursive) + assert.Equal(t, []string{CodexSessionIndexFilename}, plan.Roots[1].IncludeGlobs) + + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 1) + source := discovered[0] + assert.Equal(t, AgentCodex, source.Provider) + assert.Equal(t, sourcePath, source.DisplayPath) + assert.Equal(t, sourcePath, source.FingerprintKey) + + found, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "host~codex:" + uuid, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, sourcePath, found.DisplayPath) + + for _, path := range []string{sourcePath, indexPath} { + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: path, EventKind: "write"}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) + } + + info, err := os.Stat(sourcePath) + require.NoError(t, err) + fingerprint, err := provider.Fingerprint(context.Background(), found) + require.NoError(t, err) + assert.Equal(t, sourcePath, fingerprint.Key) + assert.Equal(t, info.Size(), fingerprint.Size) + assert.Equal(t, newer.UnixNano(), fingerprint.MTimeNS) + assert.NotEmpty(t, fingerprint.Hash) + + outcome, err := provider.Parse(context.Background(), ParseRequest{ + Source: found, + Fingerprint: fingerprint, + }) + require.NoError(t, err) + require.True(t, outcome.ResultSetComplete) + require.Len(t, outcome.Results, 1) + result := outcome.Results[0] + assert.Equal(t, DataVersionCurrent, result.DataVersion) + assert.Equal(t, "codex:"+uuid, result.Result.Session.ID) + assert.Equal(t, AgentCodex, result.Result.Session.Agent) + assert.Equal(t, "devbox", result.Result.Session.Machine) + assert.Equal(t, "api", result.Result.Session.Project) + assert.Equal(t, "Renamed title", result.Result.Session.SessionName) + assert.Equal(t, fingerprint.Hash, result.Result.Session.File.Hash) + assert.Len(t, result.Result.Messages, 1) +} + +func TestCodexProviderDoesNotAdvertiseIncrementalAppend(t *testing.T) { + root := t.TempDir() + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e2" + writeCodexProviderSession(t, root, uuid, "hello") + + provider, ok := NewProvider(AgentCodex, ProviderConfig{Roots: []string{root}}) + require.True(t, ok) + assert.Equal(t, + CapabilityNotApplicable, + provider.Capabilities().Source.IncrementalAppend, + ) + source, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "codex:" + uuid, + }) + require.NoError(t, err) + require.True(t, ok) + + outcome, status, err := provider.ParseIncremental( + context.Background(), + IncrementalRequest{ + Source: source, + Fingerprint: SourceFingerprint{}, + SessionID: "codex:" + uuid, + Offset: 0, + StartOrdinal: 1, + }, + ) + require.NoError(t, err) + assert.Equal(t, IncrementalUnsupported, status) + assert.Empty(t, outcome.Messages) +} + +func TestCodexProviderDiscoverDedupesLiveAndArchivedByUUID(t *testing.T) { + base := t.TempDir() + liveRoot := filepath.Join(base, "sessions") + archivedRoot := filepath.Join(base, "archived_sessions") + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e5" + livePath := writeCodexProviderSession(t, liveRoot, uuid, "live") + archivedPath := writeCodexProviderArchivedSession( + t, archivedRoot, uuid, "archived", + ) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: []string{archivedRoot, liveRoot}, + }) + require.True(t, ok) + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 1) + assert.Equal(t, livePath, discovered[0].DisplayPath) + assert.NotEqual(t, archivedPath, discovered[0].DisplayPath) +} + +func TestCodexProviderFindSourcePinsExactArchivedDuplicate(t *testing.T) { + base := t.TempDir() + liveRoot := filepath.Join(base, "sessions") + archivedRoot := filepath.Join(base, "archived_sessions") + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e6" + livePath := writeCodexProviderSession(t, liveRoot, uuid, "live") + archivedPath := writeCodexProviderArchivedSession( + t, archivedRoot, uuid, "archived", + ) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: []string{archivedRoot, liveRoot}, + }) + require.True(t, ok) + + found, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + StoredFilePath: archivedPath, + FullSessionID: "codex:" + uuid, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, archivedPath, found.DisplayPath) + + found, ok, err = provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "codex:" + uuid, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, livePath, found.DisplayPath) +} + +func TestCodexProviderFindSourceAcceptsLegacyShapedStoredPath(t *testing.T) { + root := t.TempDir() + sessionID := "test-uuid" + sourcePath := filepath.Join( + root, + "2024", + "01", + "15", + "rollout-20240115-"+sessionID+".jsonl", + ) + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON( + sessionID, + "/home/user/code/api", + "codex_cli_rs", + tsEarly, + ), + testjsonl.CodexMsgJSON("user", "Add tests", tsEarlyS1), + ) + require.NoError(t, os.MkdirAll(filepath.Dir(sourcePath), 0o755)) + require.NoError(t, os.WriteFile(sourcePath, []byte(content), 0o644)) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 1) + assert.Equal(t, sourcePath, discovered[0].DisplayPath) + + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: sourcePath, EventKind: "write"}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) + + source, found, err := provider.FindSource(context.Background(), FindSourceRequest{ + StoredFilePath: sourcePath, + FingerprintKey: sourcePath, + }) + require.NoError(t, err) + require.True(t, found) + assert.Equal(t, AgentCodex, source.Provider) + assert.Equal(t, sourcePath, source.DisplayPath) + assert.Equal(t, sourcePath, source.FingerprintKey) + + fingerprint, err := provider.Fingerprint(context.Background(), source) + require.NoError(t, err) + assert.Equal(t, sourcePath, fingerprint.Key) + assert.NotEmpty(t, fingerprint.Hash) + + outcome, err := provider.Parse(context.Background(), ParseRequest{ + Source: source, + Fingerprint: fingerprint, + Machine: "devbox", + }) + require.NoError(t, err) + require.True(t, outcome.ResultSetComplete) + require.Len(t, outcome.Results, 1) + result := outcome.Results[0] + assert.Equal(t, "codex:"+sessionID, result.Result.Session.ID) + assert.Equal(t, "api", result.Result.Session.Project) + assert.Equal(t, "devbox", result.Result.Session.Machine) + assert.Equal(t, fingerprint.Hash, result.Result.Session.File.Hash) + assert.Len(t, result.Result.Messages, 1) +} + +func TestCodexProviderChangedPathPinsArchivedDuplicate(t *testing.T) { + base := t.TempDir() + liveRoot := filepath.Join(base, "sessions") + archivedRoot := filepath.Join(base, "archived_sessions") + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e7" + _ = writeCodexProviderSession(t, liveRoot, uuid, "live") + archivedPath := writeCodexProviderArchivedSession( + t, archivedRoot, uuid, "archived", + ) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{ + Roots: []string{archivedRoot, liveRoot}, + }) + require.True(t, ok) + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: archivedPath, EventKind: "write"}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, archivedPath, changed[0].DisplayPath) +} + +func TestCodexProviderChangedPathClassifiesRemovedTranscript(t *testing.T) { + root := t.TempDir() + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e8" + sourcePath := writeCodexProviderSession(t, root, uuid, "remove") + provider, ok := NewProvider(AgentCodex, ProviderConfig{Roots: []string{root}}) + require.True(t, ok) + require.NoError(t, os.Remove(sourcePath)) + + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: sourcePath, EventKind: "remove"}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) +} + +func TestCodexProviderIndexPathClassifiesAllSiblingSources(t *testing.T) { + base := t.TempDir() + root := filepath.Join(base, "sessions") + firstUUID := "019eb791-cf7d-75c1-8439-9ed74c1229e9" + secondUUID := "019eb791-cf7d-75c1-8439-9ed74c1229ea" + firstPath := writeCodexProviderSession(t, root, firstUUID, "first") + secondPath := writeCodexProviderSession(t, root, secondUUID, "second") + indexPath := filepath.Join(base, CodexSessionIndexFilename) + require.NoError(t, os.WriteFile(indexPath, []byte( + `{"id":"`+firstUUID+`","thread_name":"Only first remains","updated_at":"2026-06-11T17:34:20Z"}`+"\n", + ), 0o644)) + + provider, ok := NewProvider(AgentCodex, ProviderConfig{Roots: []string{root}}) + require.True(t, ok) + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: indexPath, EventKind: "write"}, + ) + require.NoError(t, err) + assert.Equal(t, []string{firstPath, secondPath}, sourceDisplayPaths(changed)) +} + +func writeCodexProviderSession( + t *testing.T, + root, uuid, prompt string, +) string { + t.Helper() + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON(uuid, "/home/user/code/api", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", prompt, tsEarlyS1), + ) + return writeCodexProviderSessionContent(t, root, uuid, content) +} + +func writeCodexProviderArchivedSession( + t *testing.T, + root, uuid, prompt string, +) string { + t.Helper() + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON(uuid, "/home/user/code/archive", "codex_cli_rs", tsEarly), + testjsonl.CodexMsgJSON("user", prompt, tsEarlyS1), + ) + path := filepath.Join(root, "rollout-2026-06-11T12-44-06-"+uuid+".jsonl") + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + require.NoError(t, os.WriteFile(path, []byte(content), 0o644)) + return path +} + +func writeCodexProviderSessionContent( + t *testing.T, + root, uuid, content string, +) string { + t.Helper() + path := filepath.Join( + root, + "2026", + "06", + "11", + "rollout-2026-06-11T12-44-06-"+uuid+".jsonl", + ) + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + require.NoError(t, os.WriteFile(path, []byte(content), 0o644)) + return path +} diff --git a/internal/parser/discovery.go b/internal/parser/discovery.go index 42a329c97..a72d951f6 100644 --- a/internal/parser/discovery.go +++ b/internal/parser/discovery.go @@ -478,54 +478,6 @@ func ClaudeProjectSessionFiles(projectsDir string) []DiscoveredFile { return files } -// DiscoverCodexSessions finds all Codex JSONL session files under -// either the standard year/month/day layout or a flat archived dir. -func DiscoverCodexSessions(sessionsDir string) []DiscoveredFile { - var files []DiscoveredFile - - entries, err := os.ReadDir(sessionsDir) - if err != nil { - return nil - } - for _, entry := range entries { - if entry.IsDir() { - continue - } - if !isCodexSessionFilename(entry.Name()) { - continue - } - files = append(files, DiscoveredFile{ - Path: filepath.Join(sessionsDir, entry.Name()), - Agent: AgentCodex, - }) - } - - walkCodexDayDirs(sessionsDir, func(dayPath string) bool { - entries, err := os.ReadDir(dayPath) - if err != nil { - return true - } - for _, sf := range entries { - if sf.IsDir() { - continue - } - if !isCodexSessionFilename(sf.Name()) { - continue - } - files = append(files, DiscoveredFile{ - Path: filepath.Join(dayPath, sf.Name()), - Agent: AgentCodex, - }) - } - return true - }) - - sort.Slice(files, func(i, j int) bool { - return files[i].Path < files[j].Path - }) - return files -} - // claudeFindSourceFile finds the original JSONL file for a Claude // session ID by searching all project directories. It is the // provider-owned lookup body used by the Claude provider source set's @@ -598,62 +550,6 @@ func claudeFindSourceFile( return "" } -// FindCodexSourceFile finds a Codex session file by UUID. -// Prefers the standard year/month/day live path when present, -// then falls back to a flat archived dir entry. -func FindCodexSourceFile(sessionsDir, sessionID string) string { - if !IsValidSessionID(sessionID) { - return "" - } - - var archived string - entries, err := os.ReadDir(sessionsDir) - if err == nil { - for _, f := range entries { - if f.IsDir() { - continue - } - name := f.Name() - if !isCodexSessionFilename(name) { - continue - } - if extractUUIDFromRollout(name) == sessionID { - archived = filepath.Join(sessionsDir, name) - break - } - } - } - - var live string - walkCodexDayDirs(sessionsDir, func(dayPath string) bool { - if live != "" { - return false - } - entries, err := os.ReadDir(dayPath) - if err != nil { - return true - } - for _, f := range entries { - if f.IsDir() { - continue - } - name := f.Name() - if !isCodexSessionFilename(name) { - continue - } - if extractUUIDFromRollout(name) == sessionID { - live = filepath.Join(dayPath, name) - return false - } - } - return true - }) - if live != "" { - return live - } - return archived -} - func isCodexSessionFilename(name string) bool { return strings.HasPrefix(name, "rollout-") && strings.HasSuffix(name, ".jsonl") diff --git a/internal/parser/discovery_test.go b/internal/parser/discovery_test.go index b44bb3c4b..85c782fc6 100644 --- a/internal/parser/discovery_test.go +++ b/internal/parser/discovery_test.go @@ -199,7 +199,7 @@ func TestDiscoverCodexSessions(t *testing.T) { t.Run(tt.name, func(t *testing.T) { dir := t.TempDir() setupFileSystem(t, dir, tt.files) - files := DiscoverCodexSessions(dir) + files := discoverCodexTestSessions(t, dir) assertDiscoveredFiles(t, files, tt.wantFiles, AgentCodex) }) } @@ -439,7 +439,7 @@ func TestFindCodexSourceFile(t *testing.T) { dir := t.TempDir() setupFileSystem(t, dir, tt.files) - got := FindCodexSourceFile(dir, tt.targetID) + got := findCodexTestSourceFile(t, dir, tt.targetID) want := "" if tt.wantFile != "" { want = filepath.Join(dir, tt.wantFile) diff --git a/internal/parser/provider.go b/internal/parser/provider.go index f8705fc57..87d978fcf 100644 --- a/internal/parser/provider.go +++ b/internal/parser/provider.go @@ -358,6 +358,8 @@ func providerFactoryForDef(def AgentDef) ProviderFactory { return newClaudeProviderFactory(def) case AgentCommandCode: return newCommandCodeProviderFactory(def) + case AgentCodex: + return newCodexProviderFactory(def) case AgentCowork: return newCoworkProviderFactory(def) case AgentCortex: diff --git a/internal/parser/provider_migration.go b/internal/parser/provider_migration.go index 90e33269c..0bdc02698 100644 --- a/internal/parser/provider_migration.go +++ b/internal/parser/provider_migration.go @@ -19,7 +19,7 @@ const ( var providerMigrationModes = map[AgentType]ProviderMigrationMode{ AgentClaude: ProviderMigrationProviderAuthoritative, AgentCowork: ProviderMigrationProviderAuthoritative, - AgentCodex: ProviderMigrationLegacyOnly, + AgentCodex: ProviderMigrationProviderAuthoritative, AgentCopilot: ProviderMigrationLegacyOnly, AgentGemini: ProviderMigrationLegacyOnly, AgentMiMoCode: ProviderMigrationProviderAuthoritative, diff --git a/internal/parser/provider_shim_scan_test.go b/internal/parser/provider_shim_scan_test.go index 8e4aa92c4..28683f39d 100644 --- a/internal/parser/provider_shim_scan_test.go +++ b/internal/parser/provider_shim_scan_test.go @@ -49,14 +49,18 @@ var providerNeutralEntrypoints = map[string]bool{ var pendingShimProviderFiles = map[string]bool{ "antigravity_cli_provider.go": true, "antigravity_provider.go": true, - "codex_provider.go": true, + "claude_provider.go": true, "copilot_provider.go": true, + "cowork_provider.go": true, "db_backed_provider.go": true, "gemini_provider.go": true, + "hermes_provider.go": true, "kiro_ide_provider.go": true, "kiro_provider.go": true, + "opencode_provider.go": true, "positron_provider.go": true, "shelley_provider.go": true, + "vibe_provider.go": true, "visualstudio_copilot_provider.go": true, "vscode_copilot_provider.go": true, "zed_provider.go": true, diff --git a/internal/parser/provider_test.go b/internal/parser/provider_test.go index e1f73995b..d3b701e58 100644 --- a/internal/parser/provider_test.go +++ b/internal/parser/provider_test.go @@ -149,7 +149,7 @@ func TestProviderRegistryMirrorsAgentRegistry(t *testing.T) { } func TestLegacyProviderCapabilitiesMatchBaseDefaults(t *testing.T) { - provider, ok := NewProvider(AgentCodex, ProviderConfig{ + provider, ok := NewProvider(AgentGemini, ProviderConfig{ Roots: []string{t.TempDir()}, Machine: "devbox", }) @@ -177,7 +177,7 @@ func TestLegacyProviderCapabilitiesMatchBaseDefaults(t *testing.T) { source, found, err := provider.FindSource(ctx, FindSourceRequest{ RawSessionID: "session", - FullSessionID: "codex:session", + FullSessionID: "gemini:session", StoredFilePath: "/tmp/session.jsonl", FingerprintKey: "/tmp/session.jsonl", }) @@ -186,7 +186,7 @@ func TestLegacyProviderCapabilitiesMatchBaseDefaults(t *testing.T) { assert.Empty(t, source) _, err = provider.Fingerprint(ctx, SourceRef{ - Provider: AgentCodex, + Provider: AgentGemini, Key: "session", DisplayPath: "/tmp/session.jsonl", FingerprintKey: "/tmp/session.jsonl", @@ -195,9 +195,9 @@ func TestLegacyProviderCapabilitiesMatchBaseDefaults(t *testing.T) { assert.True(t, errors.Is(err, ErrUnsupportedProviderFeature)) incremental, status, err := provider.ParseIncremental(ctx, IncrementalRequest{ - Source: SourceRef{Provider: AgentCodex, Key: "session"}, + Source: SourceRef{Provider: AgentGemini, Key: "session"}, Fingerprint: SourceFingerprint{Key: "/tmp/session.jsonl"}, - SessionID: "codex:session", + SessionID: "gemini:session", StartOrdinal: 1, Machine: "devbox", }) @@ -212,11 +212,11 @@ func TestProviderFactoryLookupAndConfigSnapshot(t *testing.T) { Machine: "devbox", } - factory, ok := ProviderFactoryByType(AgentCodex) + factory, ok := ProviderFactoryByType(AgentGemini) require.True(t, ok) - assert.Equal(t, AgentCodex, factory.Definition().Type) + assert.Equal(t, AgentGemini, factory.Definition().Type) - provider, ok := NewProvider(AgentCodex, cfg) + provider, ok := NewProvider(AgentGemini, cfg) require.True(t, ok) require.NotNil(t, provider) @@ -233,7 +233,7 @@ func TestProviderFactoryLookupAndConfigSnapshot(t *testing.T) { } func TestLegacyProviderParseReturnsUnsupported(t *testing.T) { - provider, ok := NewProvider(AgentCodex, ProviderConfig{ + provider, ok := NewProvider(AgentGemini, ProviderConfig{ Roots: []string{t.TempDir()}, Machine: "devbox", }) @@ -241,7 +241,7 @@ func TestLegacyProviderParseReturnsUnsupported(t *testing.T) { outcome, err := provider.Parse(context.Background(), ParseRequest{ Source: SourceRef{ - Provider: AgentCodex, + Provider: AgentGemini, Key: "source", DisplayPath: "/tmp/source.jsonl", FingerprintKey: "/tmp/source.jsonl", @@ -257,7 +257,7 @@ func TestLegacyProviderParseReturnsUnsupported(t *testing.T) { assert.True(t, errors.Is(err, ErrUnsupportedProviderFeature)) var unsupported UnsupportedProviderFeatureError require.ErrorAs(t, err, &unsupported) - assert.Equal(t, AgentCodex, unsupported.Provider) + assert.Equal(t, AgentGemini, unsupported.Provider) assert.Equal(t, ProviderFeatureParse, unsupported.Feature) } diff --git a/internal/parser/skill_inference_test.go b/internal/parser/skill_inference_test.go index eef4df90e..50595396b 100644 --- a/internal/parser/skill_inference_test.go +++ b/internal/parser/skill_inference_test.go @@ -477,7 +477,7 @@ func TestParseCodexSessionFromInfersSkillNameFromSeededCwd(t *testing.T) { testjsonl.CodexMsgJSON("user", "use the dashboard skill", tsEarlyS1), ) file := createTestFile(t, "incremental-skill.jsonl", initial) - _, msgs, err := ParseCodexSession(file, "local", false) + _, msgs, err := parseCodexTestSession(t, file, "local", false) require.NoError(t, err) info, err := os.Stat(file) @@ -494,7 +494,7 @@ func TestParseCodexSessionFromInfersSkillNameFromSeededCwd(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - newMsgs, _, _, err := ParseCodexSessionFrom(file, offset, len(msgs), false) + newMsgs, _, _, err := parseCodexTestSessionFrom(t, file, offset, len(msgs), false) require.NoError(t, err) require.Len(t, newMsgs, 1) require.Len(t, newMsgs[0].ToolCalls, 1) diff --git a/internal/parser/types.go b/internal/parser/types.go index 3d261b97d..732b6b3d7 100644 --- a/internal/parser/types.go +++ b/internal/parser/types.go @@ -125,8 +125,6 @@ var Registry = []AgentDef{ }, IDPrefix: "codex:", FileBased: true, - DiscoverFunc: DiscoverCodexSessions, - FindSourceFunc: FindCodexSourceFile, ShallowWatchRootsFunc: ResolveCodexShallowWatchRoots, }, { diff --git a/internal/sync/engine.go b/internal/sync/engine.go index a6831962e..023053f6a 100644 --- a/internal/sync/engine.go +++ b/internal/sync/engine.go @@ -581,6 +581,17 @@ func (e *Engine) classifyProviderChangedPath( default: continue } + // Codex index (session_index.jsonl) events are owned by the engine's + // DB-aware classifyCodexIndexPath, which fans out only to sessions whose + // stored title changed and resolves a UUID's live/archived duplicate to + // the path the DB already tracks. The provider's broad index fan-out + // would re-add every sibling and prefer the live-over-archived layout, + // resurrecting a stale duplicate over the stored copy, so suppress it + // here and let the engine method classify the index event. + if agentType == parser.AgentCodex && + filepath.Base(path) == parser.CodexSessionIndexFilename { + continue + } roots := e.agentDirs[agentType] if len(roots) == 0 { continue @@ -762,6 +773,19 @@ func providerDeletedPhysicalSQLiteSource( func dedupeDiscoveredFiles( files []parser.DiscoveredFile, +) []parser.DiscoveredFile { + return dedupeDiscoveredFilesByPreference(files, preferDiscoveredFile) +} + +func dedupeDiscoveredFilesPreferNewestCodex( + files []parser.DiscoveredFile, +) []parser.DiscoveredFile { + return dedupeDiscoveredFilesByPreference(files, preferNewestCodexDiscoveredFile) +} + +func dedupeDiscoveredFilesByPreference( + files []parser.DiscoveredFile, + prefer func(candidate, current parser.DiscoveredFile) bool, ) []parser.DiscoveredFile { if len(files) < 2 { return files @@ -771,7 +795,7 @@ func dedupeDiscoveredFiles( for _, file := range files { key := discoveredFileKey(file) if current, ok := bestByKey[key]; ok { - if preferDiscoveredFile(file, current) { + if prefer(file, current) { bestByKey[key] = file } continue @@ -814,6 +838,27 @@ func preferDiscoveredFile( return false } +func preferNewestCodexDiscoveredFile( + candidate, current parser.DiscoveredFile, +) bool { + if candidate.Agent == parser.AgentCodex && current.Agent == parser.AgentCodex { + candMTime, candOK := discoveredFileMTime(candidate.Path) + currMTime, currOK := discoveredFileMTime(current.Path) + if candOK && currOK && candMTime != currMTime { + return candMTime > currMTime + } + } + return preferDiscoveredFile(candidate, current) +} + +func discoveredFileMTime(path string) (int64, bool) { + info, err := os.Stat(path) + if err != nil { + return 0, false + } + return info.ModTime().UnixNano(), true +} + func (e *Engine) expandClaudeDuplicateCandidates( files []parser.DiscoveredFile, ) []parser.DiscoveredFile { @@ -940,20 +985,6 @@ func (e *Engine) classifyOnePath( // shapes, so the legacy block was removed when Claude was folded // onto its provider. - // Codex: either ////.jsonl - // or /.jsonl for archived sessions. - for _, codexDir := range e.agentDirs[parser.AgentCodex] { - if codexDir == "" { - continue - } - if _, _, ok := parser.CodexSessionPathInfo(codexDir, path); ok { - return parser.DiscoveredFile{ - Path: path, - Agent: parser.AgentCodex, - }, true - } - } - // Copilot: /session-state/.jsonl // or: /session-state//events.jsonl for _, copilotDir := range e.agentDirs[parser.AgentCopilot] { @@ -2316,12 +2347,27 @@ func (e *Engine) syncAllLocked( } all = append(all, providerFound...) - if !since.IsZero() { + quickSyncCutoff := !since.IsZero() + if quickSyncCutoff { all = e.dedupeClaudeDiscoveredFiles(all) + // A Codex UUID can exist as both a live dated transcript and a flat + // archived copy. The provider's discovery deduplicates them to the + // preferred (live) layout, but the mtime cutoff filter runs before the + // engine's own dedup, so a changed archived copy that is newer than the + // cutoff would be lost behind an older live copy that the cutoff drops. + // Re-expand to every on-disk duplicate before filtering so the cutoff + // sees each copy's real mtime; the quick-sync dedupe below then keeps + // the newest surviving duplicate before falling back to normal layout + // preference. + all = e.expandCodexProviderDuplicates(all, scope) all = e.filterFilesByMtime(ctx, all, since) } - all = dedupeDiscoveredFiles(all) + if quickSyncCutoff { + all = dedupeDiscoveredFilesPreferNewestCodex(all) + } else { + all = dedupeDiscoveredFiles(all) + } all = e.dedupeClaudeDiscoveredFiles(all) all = e.filterShadowedLegacyKiroFiles(all) @@ -2774,6 +2820,85 @@ func (e *Engine) discoverProviderSources( return files, failures } +// expandCodexProviderDuplicates re-adds the on-disk duplicate paths of each +// discovered Codex source. The provider deduplicates a UUID's live and archived +// copies to the preferred layout at discovery time; this restores the dropped +// duplicates (scoped to the configured roots) so an mtime cutoff filter can +// judge each copy on its own mtime, matching the legacy discover-then-filter +// order. Non-Codex files and Codex files without a UUID-shaped name pass through +// unchanged. Duplicates are keyed by path so nothing is added twice. +func (e *Engine) expandCodexProviderDuplicates( + files []parser.DiscoveredFile, scope *rootSyncScope, +) []parser.DiscoveredFile { + pather := e.codexUUIDPathLister(scope) + if pather == nil { + return files + } + seen := make(map[string]struct{}, len(files)) + for _, f := range files { + seen[string(f.Agent)+"\x00"+filepath.Clean(f.Path)] = struct{}{} + } + out := files + for _, f := range files { + if f.Agent != parser.AgentCodex { + continue + } + uuid := parser.CodexSessionUUIDFromFilename(filepath.Base(f.Path)) + if uuid == "" { + continue + } + for _, dup := range pather(uuid) { + key := string(parser.AgentCodex) + "\x00" + filepath.Clean(dup) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + out = append(out, parser.DiscoveredFile{ + Path: dup, + Agent: parser.AgentCodex, + ProviderProcess: true, + ProviderSource: e.codexPinnedProviderSource(dup), + }) + } + } + return out +} + +// codexUUIDPathLister returns a function that lists every on-disk Codex +// transcript path for a UUID under the in-scope roots, or nil when the Codex +// provider is unavailable. It scopes a single provider to the in-scope roots so +// the returned paths cover both the live dated and flat archived copies of a +// duplicated UUID, including duplicates that share one root. +func (e *Engine) codexUUIDPathLister( + scope *rootSyncScope, +) func(string) []string { + factory, ok := e.providerFactories[parser.AgentCodex] + if !ok || factory == nil { + return nil + } + roots := make([]string, 0, len(e.agentDirs[parser.AgentCodex])) + for _, root := range e.agentDirs[parser.AgentCodex] { + if root == "" || !scope.includes(root) { + continue + } + roots = append(roots, root) + } + if len(roots) == 0 { + return nil + } + provider := factory.NewProvider(parser.ProviderConfig{ + Roots: roots, + Machine: e.machine, + }) + lister, ok := provider.(interface { + AllSourcePathsForUUID(string) []string + }) + if !ok { + return nil + } + return lister.AllSourcePathsForUUID +} + // recordSyncStarted persists the start time of a sync run // into pg_sync_state. Callers use this to compute mtime // cutoffs for future quick incremental syncs. @@ -3741,7 +3866,7 @@ func (e *Engine) processFile( cachedMtime, cached := e.skipCache[file.Path] e.skipMu.RUnlock() if cached && cachedMtime == mtime { - if e.pathNeedsProjectReparse(file.Path) { + if e.pathNeedsCachedSkipBypass(file.Path) { e.clearSkip(file.Path) } else { res := processResult{ @@ -3757,8 +3882,6 @@ func (e *Engine) processFile( var res processResult switch file.Agent { - case parser.AgentCodex: - res = e.processCodex(file, info) case parser.AgentCopilot: res = e.processCopilot(file, info) case parser.AgentReasonix: @@ -3810,6 +3933,25 @@ func (e *Engine) pathNeedsProjectReparse(path string) bool { return ok && parser.NeedsProjectReparse(project) } +func (e *Engine) pathNeedsCachedSkipBypass(path string) bool { + return e.pathNeedsProjectReparse(path) || + e.pathNeedsDataVersionReparse(path) +} + +func (e *Engine) pathNeedsDataVersionReparse(path string) bool { + if e == nil || e.db == nil { + return false + } + lookupPath := path + if e.pathRewriter != nil { + lookupPath = e.pathRewriter(path) + } + if _, _, ok := e.db.GetFileInfoByPath(lookupPath); !ok { + return false + } + return e.db.GetDataVersionByPath(lookupPath) < db.CurrentDataVersion() +} + func (e *Engine) processProviderFile( ctx context.Context, file parser.DiscoveredFile, @@ -3888,12 +4030,20 @@ func (e *Engine) processProviderFile( cachedMtime, cached := e.skipCache[cacheKey] e.skipMu.RUnlock() if cached && cachedMtime == fingerprint.MTimeNS { - return processResult{ - skip: true, - mtime: fingerprint.MTimeNS, - cacheSkip: true, - cacheKey: cacheKey, - }, true + // A cached skip must not hide a session whose stored row needs + // self-healing (e.g. a parser data-version bump or generated + // roborev CI worktree project): clear the entry and fall through + // to a full reparse, mirroring the legacy process arm. + if e.pathNeedsCachedSkipBypass(file.Path) { + e.clearSkip(cacheKey) + } else { + return processResult{ + skip: true, + mtime: fingerprint.MTimeNS, + cacheSkip: true, + cacheKey: cacheKey, + }, true + } } } @@ -3912,6 +4062,25 @@ func (e *Engine) processProviderFile( } incForceReplace := incRes.forceReplace + // DB-stored fingerprint skip. The provider has no database handle, so the + // engine reproduces the legacy DB-aware skip that single-session JSONL + // providers relied on: an unchanged source whose stored size and effective + // mtime already match is not reparsed, even when the in-memory skip cache + // was cleared (e.g. by SyncSingleSession) or never populated (a fresh + // engine). For Codex this also folds in the session_index.jsonl sidecar: + // a shared index mtime bump that did not change this session's title must + // not trigger a reparse. + if !e.forceParse && !file.ForceParse && + e.shouldSkipProviderSourceByDB(file, fingerprint) { + return processResult{ + skip: true, + mtime: fingerprint.MTimeNS, + cacheSkip: cacheSkip, + cacheKey: cacheKey, + noCacheSkip: true, + }, true + } + outcome, err := provider.Parse(ctx, parser.ParseRequest{ Source: source, Fingerprint: fingerprint, @@ -4935,18 +5104,41 @@ func (e *Engine) tryIncrementalJSONL( }, true } -func (e *Engine) shouldSkipCodex( - path string, info os.FileInfo, +// shouldSkipProviderSourceByDB reports whether a provider-dispatched source is +// already stored at the parsed fingerprint and can be skipped without a reparse. +// It is the engine-side replacement for the DB-aware skip the legacy +// single-session JSONL processors performed, since a provider has no database +// handle. It is scoped to Codex: Codex's effective mtime folds in the shared +// session_index.jsonl sidecar, so a size-and-effective-mtime match plus a +// per-session title check preserves the legacy "skip when only the global index +// advanced but this session's name did not" semantics. Other providers keep +// their existing in-memory skip-cache behavior unchanged. +func (e *Engine) shouldSkipProviderSourceByDB( + file parser.DiscoveredFile, fingerprint parser.SourceFingerprint, ) bool { - if e.forceParse { // parse-diff: always re-parse + if file.Agent != parser.AgentCodex { return false } + return e.shouldSkipCodexFingerprint(file.Path, fingerprint) +} + +// shouldSkipCodexFingerprint reproduces the legacy shouldSkipCodex decision in +// terms of a provider SourceFingerprint. The fingerprint MTimeNS already folds +// in session_index.jsonl via CodexEffectiveMtime, so: +// - a stored size mismatch or stale data version forces a reparse; +// - an exact effective-mtime match skips; +// - an effective mtime ahead of the stored mtime driven only by the index +// (the raw transcript mtime is still at or below the stored mtime) skips +// unless this session's stored title differs from the current index title. +func (e *Engine) shouldSkipCodexFingerprint( + path string, fingerprint parser.SourceFingerprint, +) bool { lookupPath := path if e.pathRewriter != nil { lookupPath = e.pathRewriter(path) } storedSize, storedMtime, ok := e.db.GetFileInfoByPath(lookupPath) - if !ok || storedSize != info.Size() { + if !ok || storedSize != fingerprint.Size { return false } if project, ok := e.db.GetProjectByPath(lookupPath); ok && @@ -4957,11 +5149,14 @@ func (e *Engine) shouldSkipCodex( db.CurrentDataVersion() { return false } - fileMtime := info.ModTime().UnixNano() - effectiveMtime := parser.CodexEffectiveMtime(path, fileMtime) + effectiveMtime := fingerprint.MTimeNS if storedMtime == effectiveMtime { return true } + fileMtime := effectiveMtime + if info, err := os.Stat(path); err == nil { + fileMtime = info.ModTime().UnixNano() + } return effectiveMtime > storedMtime && fileMtime <= storedMtime && !e.codexIndexSessionNameChanged(path) @@ -5044,7 +5239,7 @@ func (e *Engine) classifyCodexIndexPath( } var candidates []parser.DiscoveredFile for _, root := range sessionRoots { - if src := parser.FindCodexSourceFile(root, uuid); src != "" { + if src := e.codexSourceFileForUUID(root, uuid); src != "" { candidates = append(candidates, parser.DiscoveredFile{ Path: src, Agent: parser.AgentCodex, @@ -5057,11 +5252,72 @@ func (e *Engine) classifyCodexIndexPath( // A UUID can exist in both sessions/ and archived_sessions/. // Prefer the path the DB already tracks so a title rename does // not reparse a stale duplicate over the stored copy. - out = append(out, pickPreferredCodexDiscoveredFile(e.db, candidates)) + chosen := pickPreferredCodexDiscoveredFile(e.db, candidates) + // Pin the provider source to the chosen path and route it through the + // provider so processProviderFile parses exactly this copy instead of + // re-canonicalizing the UUID to the preferred dated layout, which would + // undo the DB-aware selection above. + chosen.ProviderProcess = true + chosen.ProviderSource = e.codexPinnedProviderSource(chosen.Path) + out = append(out, chosen) } return out } +// codexSourceFileForUUID resolves a Codex session UUID to its on-disk +// transcript path under a single sessions root, preferring the live dated +// layout over a flat archived entry. It scopes a Codex provider to that one +// root so the provider's cross-root live-over-archived canonicalization does +// not collapse a per-root duplicate; classifyCodexIndexPath then applies its +// own DB-aware preference across the per-root candidates. Returns "" when the +// provider, source lookup, or path resolution fails. +func (e *Engine) codexSourceFileForUUID(root, uuid string) string { + factory, ok := e.providerFactories[parser.AgentCodex] + if !ok || factory == nil { + return "" + } + provider := factory.NewProvider(parser.ProviderConfig{ + Roots: []string{root}, + Machine: e.machine, + }) + source, found, err := provider.FindSource( + context.Background(), + parser.FindSourceRequest{RawSessionID: uuid}, + ) + if err != nil || !found { + return "" + } + return providerDiscoveredPath(source) +} + +// codexPinnedProviderSource builds a Codex provider SourceRef pinned to the +// exact path, bypassing the provider's live-over-archived canonicalization. It +// is used when the engine's DB-aware or mtime-aware logic has already chosen +// which on-disk copy of a duplicated UUID to parse, so processProviderFile +// parses that copy instead of the provider's preferred dated layout. Returns +// nil when the Codex provider or the path's source shape is unavailable. +func (e *Engine) codexPinnedProviderSource(path string) *parser.SourceRef { + factory, ok := e.providerFactories[parser.AgentCodex] + if !ok || factory == nil { + return nil + } + provider := factory.NewProvider(parser.ProviderConfig{ + Roots: e.agentDirs[parser.AgentCodex], + Machine: e.machine, + }) + pinner, ok := provider.(interface { + SourceRefForPath(string) (parser.SourceRef, bool) + }) + if !ok { + return nil + } + source, ok := pinner.SourceRefForPath(path) + if !ok { + return nil + } + return &source +} + // codexStoredNameDiffers reports whether the stored session_name for a Codex // session differs from the given index title. Unknown sessions return false: // a brand-new session is synced through its own transcript event, not the @@ -5107,75 +5363,6 @@ func pickPreferredCodexDiscoveredFile( return chosen } -func (e *Engine) processCodex( - file parser.DiscoveredFile, info os.FileInfo, -) processResult { - - // Fast path: skip by file_path + effective mtime (includes session_index.jsonl). - if e.shouldSkipCodex(file.Path, info) { - return processResult{skip: true} - } - - projectNeedsReparse := e.pathNeedsProjectReparse(file.Path) - forceReplace := false - - codexParseFn := func( - path string, offset int64, startOrd int, _ string, - ) ([]parser.ParsedMessage, time.Time, int64, error) { - return parser.ParseCodexSessionFrom( - path, offset, startOrd, false, - ) - } - if res, ok := e.tryIncrementalJSONL( - file, info, parser.AgentCodex, codexParseFn, - ); ok { - if !projectNeedsReparse { - // Force a full parse whenever the index title differs from the - // stored session_name. A mtime gate (indexMtime > storedMtime) is - // not enough here: the incremental write folds the index mtime into - // the stored file_mtime, so a later rename whose index mtime is <= - // that stored value slips past the gate. shouldSkipCodex's - // storedMtime==effectiveMtime fast path would then skip the refresh - // forever, stranding the stale title. Comparing the name directly - // closes that window. - if !e.codexIndexSessionNameChanged(file.Path) { - return res - } - // The index title changed, so a full parse still needs to refresh - // session metadata. Keep any fallback signal discovered while probing - // appended bytes so existing rows rewritten by the full parse are not - // dropped by the append-only write path. - forceReplace = res.forceReplace - } - } else { - forceReplace = res.forceReplace - } - - sess, msgs, err := parser.ParseCodexSession( - file.Path, e.machine, false, - ) - if err != nil { - return processResult{err: err} - } - if sess == nil { - return processResult{skip: true} - } - - sess.File.Inode, sess.File.Device = getFileIdentity(info) - - hash, err := ComputeFileHash(file.Path) - if err == nil && sess.File.Hash == "" { - sess.File.Hash = hash - } - - return processResult{ - results: []parser.ParseResult{ - {Session: *sess, Messages: msgs}, - }, - forceReplace: forceReplace, - } -} - func (e *Engine) processCopilot( file parser.DiscoveredFile, info os.FileInfo, ) processResult { diff --git a/internal/sync/engine_integration_test.go b/internal/sync/engine_integration_test.go index e791f2839..ff496138a 100644 --- a/internal/sync/engine_integration_test.go +++ b/internal/sync/engine_integration_test.go @@ -2652,6 +2652,104 @@ func TestSyncPathsCodexIndexEventRefreshesStoredDuplicate(t *testing.T) { assertSessionMessageCount(t, env.db, "codex:"+uuid, 2) } +func TestSyncPathsCodexArchivedDuplicateEventPinsChangedFile(t *testing.T) { + root := t.TempDir() + codexDir := filepath.Join(root, "sessions") + archivedDir := filepath.Join(root, "archived_sessions") + require.NoError(t, os.MkdirAll(codexDir, 0o755)) + require.NoError(t, os.MkdirAll(archivedDir, 0o755)) + env := setupTestEnv(t, WithCodexDirs([]string{codexDir, archivedDir})) + + uuid := "f7a8b9ca-7890-1234-ef01-456789012346" + staleLiveContent := testjsonl.NewSessionBuilder(). + AddCodexMeta(tsEarly, uuid, "/home/user/code/api", "user"). + AddCodexMessage(tsEarlyS1, "user", "Stale live copy"). + String() + archivedContent := testjsonl.NewSessionBuilder(). + AddCodexMeta(tsEarly, uuid, "/home/user/code/api", "user"). + AddCodexMessage(tsEarlyS1, "user", "Archived copy"). + String() + updatedArchivedContent := testjsonl.NewSessionBuilder(). + AddCodexMeta(tsEarly, uuid, "/home/user/code/api", "user"). + AddCodexMessage(tsEarlyS1, "user", "Archived copy"). + AddCodexMessage(tsEarlyS5, "assistant", "Updated archived reply"). + String() + + livePath := env.writeCodexSession( + t, + filepath.Join("2026", "05", "04"), + "rollout-2026-05-04T02-10-04-"+uuid+".jsonl", + staleLiveContent, + ) + archivedPath := env.writeSession( + t, archivedDir, + "rollout-2026-05-04T14-31-58-"+uuid+".jsonl", + archivedContent, + ) + initialTime := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(livePath, initialTime, initialTime), "chtimes live") + require.NoError(t, os.Chtimes(archivedPath, initialTime, initialTime), "chtimes archived") + + env.engine.SyncAll(context.Background(), nil) + assert.Equal(t, livePath, env.db.GetSessionFilePath("codex:"+uuid)) + + newTime := time.Now().Add(-30 * time.Minute) + require.NoError(t, os.WriteFile(archivedPath, []byte(updatedArchivedContent), 0o644)) + require.NoError(t, os.Chtimes(archivedPath, newTime, newTime), "chtimes archived update") + + env.engine.SyncPaths([]string{archivedPath}) + + assert.Equal(t, archivedPath, env.db.GetSessionFilePath("codex:"+uuid), + "archived transcript event must parse the changed file, not the stale live duplicate") + assertSessionMessageCount(t, env.db, "codex:"+uuid, 2) +} + +func TestSyncSingleSessionCodexPreservesStoredArchivedDuplicate(t *testing.T) { + root := t.TempDir() + codexDir := filepath.Join(root, "sessions") + archivedDir := filepath.Join(root, "archived_sessions") + require.NoError(t, os.MkdirAll(codexDir, 0o755)) + require.NoError(t, os.MkdirAll(archivedDir, 0o755)) + env := setupTestEnv(t, WithCodexDirs([]string{codexDir, archivedDir})) + + uuid := "f7a8b9ca-7890-1234-ef01-456789012347" + archivedContent := testjsonl.NewSessionBuilder(). + AddCodexMeta(tsEarly, uuid, "/home/user/code/api", "user"). + AddCodexMessage(tsEarlyS1, "user", "Archived copy"). + AddCodexMessage(tsEarlyS5, "assistant", "Archived reply"). + String() + staleLiveContent := testjsonl.NewSessionBuilder(). + AddCodexMeta(tsEarly, uuid, "/home/user/code/api", "user"). + AddCodexMessage(tsEarlyS1, "user", "Stale live copy"). + String() + + archivedPath := env.writeSession( + t, archivedDir, + "rollout-2026-05-04T14-31-58-"+uuid+".jsonl", + archivedContent, + ) + initialTime := time.Now().Add(-2 * time.Hour) + require.NoError(t, os.Chtimes(archivedPath, initialTime, initialTime), "chtimes archived") + + env.engine.SyncAll(context.Background(), nil) + require.Equal(t, archivedPath, env.db.GetSessionFilePath("codex:"+uuid), + "DB must track the archived copy before a stale live duplicate appears") + + livePath := env.writeCodexSession( + t, + filepath.Join("2026", "05", "04"), + "rollout-2026-05-04T02-10-04-"+uuid+".jsonl", + staleLiveContent, + ) + require.NoError(t, os.Chtimes(livePath, initialTime, initialTime), "chtimes live") + + require.NoError(t, env.engine.SyncSingleSession("codex:"+uuid)) + + assert.Equal(t, archivedPath, env.db.GetSessionFilePath("codex:"+uuid), + "single-session resync must preserve the stored archived source") + assertSessionMessageCount(t, env.db, "codex:"+uuid, 2) +} + func TestSyncPathsGeminiRejectsWrongStructure(t *testing.T) { env := setupTestEnv(t) diff --git a/internal/sync/engine_test.go b/internal/sync/engine_test.go index c505c3796..181672c71 100644 --- a/internal/sync/engine_test.go +++ b/internal/sync/engine_test.go @@ -1420,8 +1420,10 @@ func TestShouldSkipCodexReparsesStaleProject(t *testing.T) { }, } - assert.False(t, e.shouldSkipCodex(path, info), - "stale generated roborev CI projects must be reparsed") + assert.False(t, e.shouldSkipCodexFingerprint(path, parser.SourceFingerprint{ + Size: info.Size(), + MTimeNS: info.ModTime().UnixNano(), + }), "stale generated roborev CI projects must be reparsed") } func TestProcessFileSkipCacheReparsesStaleCodexProject(t *testing.T) { @@ -1459,6 +1461,13 @@ func TestProcessFileSkipCacheReparsesStaleCodexProject(t *testing.T) { db: database, idPrefix: "host~", skipCache: map[string]int64{path: info.ModTime().UnixNano()}, + agentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {root}, + }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCodex: parser.ProviderMigrationProviderAuthoritative, + }, pathRewriter: func(path string) string { return "host:" + path }, @@ -1475,6 +1484,156 @@ func TestProcessFileSkipCacheReparsesStaleCodexProject(t *testing.T) { assert.Equal(t, "agentsview", res.results[0].Session.Project) } +func TestProcessFileSkipCacheReparsesStaleCodexDataVersion(t *testing.T) { + database := openTestDB(t) + root := t.TempDir() + path := filepath.Join(root, "rollout-2026-06-21T18-59-38-abc.jsonl") + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON( + "abc", + "/home/user/code/agentsview", + "user", + "2024-01-01T10:00:00Z", + ), + testjsonl.CodexMsgJSON("user", "review this", "2024-01-01T10:00:01Z"), + ) + require.NoError(t, os.WriteFile(path, []byte(content), 0o600)) + info, err := os.Stat(path) + require.NoError(t, err, "stat codex fixture") + + sess := db.Session{ + ID: "host~codex:abc", + Project: "agentsview", + Machine: "host", + Agent: "codex", + FilePath: strPtr("host:" + path), + FileSize: int64Ptr(info.Size()), + FileMtime: int64Ptr(info.ModTime().UnixNano()), + } + require.NoError(t, database.UpsertSession(sess)) + require.NoError(t, database.SetSessionDataVersion( + sess.ID, db.CurrentDataVersion()-1, + )) + + e := &Engine{ + db: database, + idPrefix: "host~", + skipCache: map[string]int64{path: info.ModTime().UnixNano()}, + agentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {root}, + }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCodex: parser.ProviderMigrationProviderAuthoritative, + }, + pathRewriter: func(path string) string { + return "host:" + path + }, + } + + res := e.processFile(context.Background(), parser.DiscoveredFile{ + Agent: parser.AgentCodex, + Path: path, + }) + require.NoError(t, res.err) + require.False(t, res.skip, + "skip cache must not hide stale parser data versions") + require.Len(t, res.results, 1) +} + +func TestProcessFileCodexDBFreshSkipIsNotCached(t *testing.T) { + database := openTestDB(t) + root := t.TempDir() + path := filepath.Join(root, "rollout-2026-06-21T18-59-38-abc.jsonl") + content := testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON( + "abc", + "/home/user/code/agentsview", + "user", + "2024-01-01T10:00:00Z", + ), + testjsonl.CodexMsgJSON("user", "review this", "2024-01-01T10:00:01Z"), + ) + require.NoError(t, os.WriteFile(path, []byte(content), 0o600)) + info, err := os.Stat(path) + require.NoError(t, err, "stat codex fixture") + + sess := db.Session{ + ID: "host~codex:abc", + Project: "agentsview", + Machine: "host", + Agent: "codex", + FilePath: strPtr("host:" + path), + FileSize: int64Ptr(info.Size()), + FileMtime: int64Ptr(info.ModTime().UnixNano()), + } + require.NoError(t, database.UpsertSession(sess)) + require.NoError(t, database.SetSessionDataVersion( + sess.ID, db.CurrentDataVersion(), + )) + + e := &Engine{ + db: database, + idPrefix: "host~", + skipCache: map[string]int64{}, + agentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {root}, + }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCodex: parser.ProviderMigrationProviderAuthoritative, + }, + pathRewriter: func(path string) string { + return "host:" + path + }, + } + + res := e.processFile(context.Background(), parser.DiscoveredFile{ + Agent: parser.AgentCodex, + Path: path, + }) + require.NoError(t, res.err) + require.True(t, res.skip) + assert.True(t, res.noCacheSkip) + assert.Empty(t, e.SnapshotSkipCache()) +} + +func TestClassifyCodexIndexPathSkipsMissingTranscript(t *testing.T) { + database := openTestDB(t) + root := t.TempDir() + codexDir := filepath.Join(root, "sessions") + require.NoError(t, os.MkdirAll(codexDir, 0o755)) + indexPath := filepath.Join(root, parser.CodexSessionIndexFilename) + uuid := "019eb791-cf7d-75c1-8439-9ed74c1229e7" + missingPath := filepath.Join( + codexDir, + "2026", "06", "11", + "rollout-2026-06-11T12-44-06-"+uuid+".jsonl", + ) + require.NoError(t, database.UpsertSession(db.Session{ + ID: "codex:" + uuid, + Project: "agentsview", + Machine: "local", + Agent: string(parser.AgentCodex), + SessionName: strPtr("Old title"), + FilePath: &missingPath, + })) + require.NoError(t, os.WriteFile(indexPath, []byte( + `{"id":"`+uuid+`","thread_name":"New title",`+ + `"updated_at":"2026-06-11T17:34:20Z"}`+"\n", + ), 0o644)) + engine := NewEngine(database, EngineConfig{ + AgentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {codexDir}, + }, + Machine: "local", + }) + + files := engine.classifyCodexIndexPath(indexPath) + + assert.Empty(t, files) +} + func TestProcessCodexAppendedStaleProjectDoesFullReparse(t *testing.T) { database := openTestDB(t) root := t.TempDir() @@ -1525,21 +1684,25 @@ func TestProcessCodexAppendedStaleProjectDoesFullReparse(t *testing.T) { ) + "\n") require.NoError(t, err, "append codex fixture") require.NoError(t, f.Close(), "close codex fixture") - info, err = os.Stat(path) - require.NoError(t, err, "stat appended codex fixture") - e := &Engine{ db: database, idPrefix: "host~", + agentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {root}, + }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCodex: parser.ProviderMigrationProviderAuthoritative, + }, pathRewriter: func(path string) string { return "host:" + path }, } - res := e.processCodex(parser.DiscoveredFile{ + res := e.processFile(context.Background(), parser.DiscoveredFile{ Agent: parser.AgentCodex, Path: path, - }, info) + }) require.NoError(t, res.err) require.Nil(t, res.incremental, "stale project metadata must force full parse even when file appended") @@ -1615,21 +1778,25 @@ func TestProcessCodexAppendedStaleProjectCarriesForceReplace(t *testing.T) { ) + "\n") require.NoError(t, err, "append codex fixture") require.NoError(t, f.Close(), "close codex fixture") - info, err = os.Stat(path) - require.NoError(t, err, "stat appended codex fixture") - e := &Engine{ db: database, idPrefix: "host~", + agentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {root}, + }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCodex: parser.ProviderMigrationProviderAuthoritative, + }, pathRewriter: func(path string) string { return "host:" + path }, } - res := e.processCodex(parser.DiscoveredFile{ + res := e.processFile(context.Background(), parser.DiscoveredFile{ Agent: parser.AgentCodex, Path: path, - }, info) + }) require.NoError(t, res.err) require.Nil(t, res.incremental, "stale project metadata must force full parse even when file appended") diff --git a/internal/sync/parsediff.go b/internal/sync/parsediff.go index 580a82c41..a3e5300e7 100644 --- a/internal/sync/parsediff.go +++ b/internal/sync/parsediff.go @@ -3,7 +3,6 @@ package sync import ( "context" "fmt" - "log" "os" "path/filepath" "sort" @@ -80,7 +79,11 @@ func (e *Engine) ParseDiff(ctx context.Context, opts ParseDiffOptions) (*ParseDi } continue } - files = append(files, e.parseDiffProviderSources(ctx, def.Type)...) + providerFiles, err := e.parseDiffProviderSources(ctx, def.Type) + if err != nil { + return nil, err + } + files = append(files, providerFiles...) } // DiscoverFunc does not emit the shared-SQLite source for Kiro // (data.sqlite3) or db-mode OpenCode (opencode.db) — normal sync @@ -215,14 +218,14 @@ func (e *Engine) ParseDiff(ctx context.Context, opts ParseDiffOptions) (*ParseDi func (e *Engine) parseDiffProviderSources( ctx context.Context, agentType parser.AgentType, -) []parser.DiscoveredFile { +) ([]parser.DiscoveredFile, error) { factory, ok := e.providerFactories[agentType] if !ok || factory == nil { - return nil + return nil, nil } roots := e.agentDirs[agentType] if len(roots) == 0 { - return nil + return nil, nil } provider := factory.NewProvider(parser.ProviderConfig{ Roots: roots, @@ -230,8 +233,10 @@ func (e *Engine) parseDiffProviderSources( }) sources, err := provider.Discover(ctx) if err != nil { - log.Printf("parse-diff %s provider discovery: %v", agentType, err) - return nil + return nil, fmt.Errorf( + "parse-diff %s provider discovery: %w", + agentType, err, + ) } def := provider.Definition() var files []parser.DiscoveredFile @@ -253,7 +258,7 @@ func (e *Engine) parseDiffProviderSources( ProviderProcess: true, }) } - return files + return files, nil } func (e *Engine) parseDiffAgentDiscoverable(def parser.AgentDef) bool { diff --git a/internal/sync/provider_shadow_caller_test.go b/internal/sync/provider_shadow_caller_test.go index a389eac76..024e27f13 100644 --- a/internal/sync/provider_shadow_caller_test.go +++ b/internal/sync/provider_shadow_caller_test.go @@ -349,6 +349,42 @@ func TestProviderVirtualSourceBackedByEventPreservesHashInDBPath(t *testing.T) { assert.False(t, providerVirtualSourceBackedByEvent(sourcePath, filepath.Dir(dbPath))) } +func TestParseDiffProviderDiscoveryErrorFails(t *testing.T) { + root := t.TempDir() + discoverErr := errors.New("discover failed") + provider := &shadowCallerProvider{ + shadowTestProvider: shadowTestProvider{ + ProviderBase: parser.ProviderBase{ + Def: parser.AgentDef{ + Type: parser.AgentCodex, + DisplayName: "Codex", + }, + }, + }, + discoverErr: discoverErr, + } + engine := NewDiffEngine(dbtest.OpenTestDB(t), EngineConfig{ + AgentDirs: map[parser.AgentType][]string{ + parser.AgentCodex: {root}, + }, + ProviderFactories: []parser.ProviderFactory{ + shadowCallerFactory{provider: provider}, + }, + ProviderMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCodex: parser.ProviderMigrationProviderAuthoritative, + }, + }) + + report, err := engine.ParseDiff(context.Background(), ParseDiffOptions{ + Agents: []parser.AgentType{parser.AgentCodex}, + }) + + require.Error(t, err) + assert.Nil(t, report) + assert.ErrorContains(t, err, "parse-diff codex provider discovery") + assert.ErrorIs(t, err, discoverErr) +} + func TestProcessFileShadowRecordsCachedSkipAsNotComparable(t *testing.T) { root := t.TempDir() sourcePath := filepath.Join(root, "-Users-dev-code-demo", "shadow-skip.jsonl") diff --git a/internal/sync/provider_shadow_codex_test.go b/internal/sync/provider_shadow_codex_test.go new file mode 100644 index 000000000..0ef157e65 --- /dev/null +++ b/internal/sync/provider_shadow_codex_test.go @@ -0,0 +1,83 @@ +package sync + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.kenn.io/agentsview/internal/parser" + "go.kenn.io/agentsview/internal/testjsonl" +) + +// TestObserveProviderSourceParsesCodexSourceWithIndexTitle exercises the folded +// Codex provider end to end through ObserveProviderSource. The legacy +// ParseCodexSession entrypoint was deleted in the fold, so this replaces the +// shadow-baseline comparison with provider-API coverage that pins the parsed +// session shape: discovery finds the dated transcript, the sibling +// session_index.jsonl supplies the thread title as session_name, and the +// observed parse output and data-version planning match the source. +func TestObserveProviderSourceParsesCodexSourceWithIndexTitle(t *testing.T) { + base := t.TempDir() + root := filepath.Join(base, "sessions") + uuid := "019eb791-cf7d-75c1-8439-9ed74c12abcd" + sourcePath := filepath.Join( + root, + "2026", + "06", + "11", + "rollout-2026-06-11T12-44-06-"+uuid+".jsonl", + ) + writeProviderShadowSourceFile( + t, + sourcePath, + testjsonl.JoinJSONL( + testjsonl.CodexSessionMetaJSON( + uuid, + "/home/user/code/api", + "codex_cli_rs", + "2026-06-11T12:44:06Z", + ), + testjsonl.CodexMsgJSON("user", "provider question", "2026-06-11T12:44:07Z"), + ), + ) + writeProviderShadowSourceFile( + t, + filepath.Join(base, parser.CodexSessionIndexFilename), + `{"id":"`+uuid+`","thread_name":"Provider title","updated_at":"2026-06-11T17:34:20Z"}`+"\n", + ) + + provider, ok := parser.NewProvider(parser.AgentCodex, parser.ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, sources, 1) + + observation, err := ObserveProviderSource(context.Background(), provider, ProviderObserveRequest{ + Source: sources[0], + Machine: "devbox", + }) + require.NoError(t, err) + require.Len(t, observation.Results, 1) + + session := observation.Results[0].Session + assert.Equal(t, "codex:"+uuid, session.ID) + assert.Equal(t, parser.AgentCodex, session.Agent) + assert.Equal(t, "devbox", session.Machine) + assert.Equal(t, "/home/user/code/api", session.Cwd) + assert.Equal(t, "Provider title", session.SessionName) + assert.Equal(t, "provider question", session.FirstMessage) + assert.Equal(t, sourcePath, session.File.Path) + assert.Equal(t, observation.Fingerprint.Hash, session.File.Hash) + + require.Len(t, observation.Results[0].Messages, 1) + assert.Equal(t, parser.RoleUser, observation.Results[0].Messages[0].Role) + + assert.Equal(t, []string{session.ID}, observation.Planned.DataVersionSessionIDs()) + assert.Empty(t, observation.Planned.Diagnostics) +} diff --git a/internal/sync/provider_shadow_support_test.go b/internal/sync/provider_shadow_support_test.go new file mode 100644 index 000000000..03714305b --- /dev/null +++ b/internal/sync/provider_shadow_support_test.go @@ -0,0 +1,19 @@ +package sync + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +// writeProviderShadowSourceFile writes a provider source fixture, creating the +// parent directory. It is the shared helper for the per-provider shadow/parse +// tests (the Codex fold is the lowest caller; later provider folds reuse it). +func writeProviderShadowSourceFile(t *testing.T, path, content string) { + t.Helper() + + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + require.NoError(t, os.WriteFile(path, []byte(content), 0o644)) +}