From e47ff7d3b1944c008455d0cb0ec5e5be53977ca9 Mon Sep 17 00:00:00 2001 From: Marius van Niekerk Date: Wed, 24 Jun 2026 21:33:45 -0400 Subject: [PATCH] feat(parser): migrate cortex provider Cortex has a shallow metadata-file source shape, with optional companion history JSONL handled inside the existing parser. Moving it behind a concrete provider keeps source discovery and lookup explicit without adding a new source abstraction. The provider preserves the legacy Cortex session-file predicate, backup/history companion exclusions, symlinked file behavior, deleted-path classification, source fingerprinting, and parse normalization for session names, cwd, and tool content. Validation: go fmt ./...; go test -tags "fts5" ./internal/parser -run TestCortexProvider -count=1; go test -tags "fts5" ./internal/parser -count=1; go vet ./...; make test-short; git diff --check fix(parser): include cortex history companions Cortex split-history sessions parse messages from a sibling .history.jsonl file, so provider-backed live sync has to treat that companion as part of the same source. Otherwise a history-only append can be watched but never mapped back to the metadata session, or can keep the same freshness identity and skip reparsing. This keeps the persisted source key on the .json metadata file while adding companion watch classification and a composite fingerprint over the metadata and history files when the companion exists. Validation: go fmt ./...; go test -tags "fts5" ./internal/parser -run TestCortexProviderClassifiesAndFingerprintsHistoryCompanion -count=1; go test -tags "fts5" ./internal/parser -run TestCortexProvider -count=1; go test -tags "fts5" ./internal/parser -count=1; go vet ./...; make test-short; git diff --check; make nilaway test(parser): opt cortex into provider shadow Cortex now has a concrete facade provider on this branch, so its migration mode should enter shadow comparison instead of staying legacy-only and additive. Lower provider opt-ins stay inherited and later provider branches own their own manifest changes. Validation: go test -tags "fts5" ./internal/parser -run TestProviderMigrationModes -count=1; go test -tags "fts5" ./internal/parser -count=1; go vet ./...; git diff --check test(sync): compare cortex shadow parity Cortex is shadow-compared on this branch, so add source-level migration coverage that compares provider observation with ParseCortexSession. The fixture uses Cortex's split metadata/history format so the test proves the provider path preserves companion-history parse behavior while still planning the primary session ID. Validation: go test -tags "fts5" ./internal/parser ./internal/sync -run 'TestObserveProviderSourceMatchesCortexLegacyParser|TestCortexProvider|TestParseCortex' -count=1; go test -tags "fts5" ./internal/parser ./internal/sync -count=1; go fmt ./...; go vet ./...; git diff --check; ./custom-gcl run --config .golangci.nilaway.yml ./internal/parser/... ./internal/sync/... refactor(parser): fold cortex into provider Move Cortex parse ownership onto the concrete provider and remove the package-level discover/find/parse entrypoints. Route Cortex sync classification and processing through provider changed-path handling so this branch migrates the provider instead of adding another shim. fix(sync): include cortex companion mtimes in quick sync Provider-authoritative Cortex discovery emits the metadata JSON as the source, but its freshness identity also includes the split .history.jsonl companion. SyncAllSince was still filtering on the metadata file mtime before provider fingerprinting, so history-only updates could be dropped during quick sync.\n\nUse provider fingerprint mtimes for provider-process discovered files before applying the since cutoff, falling back to the existing per-agent stat logic when the provider has no mtime. Cortex full parses now replace messages as well, because split history rewrites can change existing ordinals rather than only append.\n\nValidation: go test -tags "fts5" ./internal/sync -run TestSyncAllSinceCortexHistoryUpdateTriggersResync -count=1; go test -tags "fts5" ./internal/parser -run Cortex -count=1; go test -tags "fts5" ./internal/sync -run 'Cortex|TestClassifyOnePath_Cortex|TestSyncAllSinceCortexHistoryUpdateTriggersResync' -count=1; go test -tags "fts5" ./internal/sync -count=1; go fmt ./...; go vet ./...; git diff --check fix(parser): thread ctx through cortex source lookups --- internal/parser/cortex.go | 67 +---- internal/parser/cortex_provider.go | 301 +++++++++++++++++++++++ internal/parser/cortex_provider_test.go | 218 ++++++++++++++++ internal/parser/cortex_test.go | 69 ++++-- internal/parser/provider.go | 2 + internal/parser/provider_migration.go | 2 +- internal/parser/types.go | 6 +- internal/sync/classify_cortex_test.go | 17 +- internal/sync/cortex_integration_test.go | 70 ++++++ internal/sync/engine.go | 124 ++++------ 10 files changed, 717 insertions(+), 159 deletions(-) create mode 100644 internal/parser/cortex_provider.go create mode 100644 internal/parser/cortex_provider_test.go create mode 100644 internal/sync/cortex_integration_test.go diff --git a/internal/parser/cortex.go b/internal/parser/cortex.go index 03ddf537d..c8dce0331 100644 --- a/internal/parser/cortex.go +++ b/internal/parser/cortex.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "os" - "path/filepath" "regexp" "strings" "time" @@ -335,11 +334,11 @@ func parseCortexTimestamps(_ string) map[string]time.Time { return make(map[string]time.Time) } -// ParseCortexSession parses a Cortex session from its .json metadata -// file. If the file contains an embedded "history" array, it is used -// directly. If no history is embedded (the split-file format), the -// companion .history.jsonl file is read instead. -func ParseCortexSession( +// parseSession parses a Cortex session from its .json metadata file. If the +// file contains an embedded "history" array, it is used directly. If no history +// is embedded (the split-file format), the companion .history.jsonl file is +// read instead. +func (p *cortexProvider) parseSession( path, machine string, ) (*ParsedSession, []ParsedMessage, error) { info, err := os.Stat(path) @@ -511,59 +510,3 @@ func IsCortexSessionFile(name string) bool { stem := strings.TrimSuffix(name, ".json") return IsValidSessionID(stem) } - -// DiscoverCortexSessions finds all primary session metadata files -// in the Cortex conversations directory (~/.snowflake/cortex/conversations). -// Backup files (*.back.*.json) are silently skipped. Both embedded-history -// sessions (.json with a "history" key) and split sessions -// (.json + .history.jsonl) are returned as a single entry -// pointing to the .json metadata file. -func DiscoverCortexSessions( - conversationsDir string, -) []DiscoveredFile { - if conversationsDir == "" { - return nil - } - - entries, err := os.ReadDir(conversationsDir) - if err != nil { - return nil - } - - var files []DiscoveredFile - for _, entry := range entries { - if entry.IsDir() { - continue - } - name := entry.Name() - if !IsCortexSessionFile(name) { - continue - } - files = append(files, DiscoveredFile{ - Path: filepath.Join(conversationsDir, name), - Agent: AgentCortex, - }) - } - - return files -} - -// FindCortexSourceFile locates a Cortex session file by UUID. Accepts -// both the raw UUID and the prefixed "cortex:" form. Returns the -// path to the .json metadata file if found, otherwise "". -func FindCortexSourceFile( - conversationsDir, sessionID string, -) string { - // Strip "cortex:" prefix before validation — callers may - // pass the full prefixed ID. - sessionID = strings.TrimPrefix(sessionID, "cortex:") - if conversationsDir == "" || !IsValidSessionID(sessionID) { - return "" - } - - candidate := filepath.Join(conversationsDir, sessionID+".json") - if _, err := os.Stat(candidate); err == nil { - return candidate - } - return "" -} diff --git a/internal/parser/cortex_provider.go b/internal/parser/cortex_provider.go new file mode 100644 index 000000000..d201f2761 --- /dev/null +++ b/internal/parser/cortex_provider.go @@ -0,0 +1,301 @@ +package parser + +import ( + "context" + "crypto/sha256" + "fmt" + "os" + "path/filepath" + "strings" +) + +var _ Provider = (*cortexProvider)(nil) + +type cortexProviderFactory struct { + def AgentDef +} + +func newCortexProviderFactory(def AgentDef) ProviderFactory { + return cortexProviderFactory{def: cloneAgentDef(def)} +} + +func (f cortexProviderFactory) Definition() AgentDef { + return cloneAgentDef(f.def) +} + +func (f cortexProviderFactory) Capabilities() Capabilities { + return cortexProviderCapabilities() +} + +func (f cortexProviderFactory) NewProvider(cfg ProviderConfig) Provider { + cfg = cfg.Clone() + return &cortexProvider{ + ProviderBase: ProviderBase{ + Def: cloneAgentDef(f.def), + Caps: cortexProviderCapabilities(), + Config: cfg, + }, + sources: newCortexSourceSet(cfg.Roots), + } +} + +type cortexProvider struct { + ProviderBase + sources JSONLSourceSet +} + +func (p *cortexProvider) Discover(ctx context.Context) ([]SourceRef, error) { + return p.sources.Discover(ctx) +} + +func (p *cortexProvider) WatchPlan(ctx context.Context) (WatchPlan, error) { + plan, err := p.sources.WatchPlan(ctx) + if err != nil { + return WatchPlan{}, err + } + for i := range plan.Roots { + plan.Roots[i].IncludeGlobs = append( + plan.Roots[i].IncludeGlobs, + "*.history.jsonl", + ) + } + return plan, nil +} + +func (p *cortexProvider) SourcesForChangedPath( + ctx context.Context, + req ChangedPathRequest, +) ([]SourceRef, error) { + sources, err := p.sources.SourcesForChangedPath(ctx, req) + if err != nil || len(sources) > 0 { + return sources, err + } + if source, ok, err := p.sourceForHistoryCompanion(ctx, req); err != nil { + return nil, err + } else if ok { + return []SourceRef{source}, nil + } + return nil, nil +} + +func (p *cortexProvider) FindSource( + ctx context.Context, + req FindSourceRequest, +) (SourceRef, bool, error) { + return p.sources.FindSource(ctx, providerFindRequestWithRawSessionID(p.Def, req)) +} + +func (p *cortexProvider) Fingerprint( + ctx context.Context, + source SourceRef, +) (SourceFingerprint, error) { + if err := ctx.Err(); err != nil { + return SourceFingerprint{}, err + } + path, ok, err := p.sources.pathFromSource(ctx, source) + if err != nil { + return SourceFingerprint{}, err + } + if !ok { + return SourceFingerprint{}, fmt.Errorf("cortex source path unavailable") + } + info, err := os.Stat(path) + if err != nil { + return SourceFingerprint{}, fmt.Errorf("stat %s: %w", path, err) + } + if info.IsDir() { + return SourceFingerprint{}, fmt.Errorf("stat %s: source is a directory", path) + } + fingerprint := SourceFingerprint{ + Key: firstNonEmptyJSONLString( + source.FingerprintKey, + source.Key, + path, + ), + Size: info.Size(), + MTimeNS: info.ModTime().UnixNano(), + } + + h := sha256.New() + if err := addCortexFingerprintPart(h, "metadata", path, info); err != nil { + return SourceFingerprint{}, err + } + historyPath := cortexHistoryCompanionPath(path) + if historyInfo, ok, err := cortexCompanionInfo(historyPath); err != nil { + return SourceFingerprint{}, err + } else if ok && historyInfo != nil { + fingerprint.Size += historyInfo.Size() + mtime := historyInfo.ModTime().UnixNano() + if mtime > fingerprint.MTimeNS { + fingerprint.MTimeNS = mtime + } + if err := addCortexFingerprintPart(h, "history", historyPath, historyInfo); err != nil { + return SourceFingerprint{}, err + } + } + fingerprint.Hash = fmt.Sprintf("%x", h.Sum(nil)) + return fingerprint, nil +} + +func (p *cortexProvider) Parse( + ctx context.Context, + req ParseRequest, +) (ParseOutcome, error) { + if err := ctx.Err(); err != nil { + return ParseOutcome{}, err + } + path, ok, err := p.sources.pathFromSource(ctx, req.Source) + if err != nil { + return ParseOutcome{}, err + } + if !ok { + return ParseOutcome{}, fmt.Errorf("cortex source path unavailable") + } + machine := firstNonEmptyJSONLString(req.Machine, p.Config.Machine) + sess, msgs, err := p.parseSession(path, machine) + if err != nil { + return ParseOutcome{}, err + } + if sess == nil { + return ParseOutcome{ + ResultSetComplete: true, + SkipReason: SkipNoSession, + }, nil + } + if req.Fingerprint.Hash != "" { + sess.File.Hash = req.Fingerprint.Hash + } + return ParseOutcome{ + Results: []ParseResultOutcome{{ + Result: ParseResult{ + Session: *sess, + Messages: msgs, + }, + DataVersion: DataVersionCurrent, + }}, + ResultSetComplete: true, + }, nil +} + +func newCortexSourceSet(roots []string) JSONLSourceSet { + return newJSONLSourceSet(AgentCortex, roots, + withExtensions(".json"), + withFollowSymlinkFiles(), + withIncludePath(isCortexSourcePath), + withSessionIDFromPath(cortexSessionIDFromPath), + withProjectHint(func(root, path string) string { return "" }), + ) +} + +func (p *cortexProvider) sourceForHistoryCompanion( + ctx context.Context, + req ChangedPathRequest, +) (SourceRef, bool, error) { + if req.Path == "" { + return SourceRef{}, false, nil + } + path := filepath.Clean(req.Path) + for _, root := range p.sources.roots { + if req.WatchRoot != "" && !samePath(req.WatchRoot, root) { + continue + } + source, ok, err := cortexSourceForHistoryCompanion(ctx, p.sources, root, path) + if err != nil { + return SourceRef{}, false, err + } + if ok { + return source, true, nil + } + } + return SourceRef{}, false, nil +} + +func cortexSourceForHistoryCompanion( + ctx context.Context, + sources JSONLSourceSet, + root string, + path string, +) (SourceRef, bool, error) { + root = filepath.Clean(root) + if !samePath(filepath.Dir(path), root) { + return SourceRef{}, false, nil + } + stem, ok := strings.CutSuffix(filepath.Base(path), ".history.jsonl") + if !ok || !IsCortexSessionFile(stem+".json") { + return SourceRef{}, false, nil + } + metadataPath := filepath.Join(root, stem+".json") + if source, ok, err := sources.sourceForPath(ctx, metadataPath); err != nil { + return SourceRef{}, false, err + } else if ok { + return source, true, nil + } + return SourceRef{}, false, nil +} + +func isCortexSourcePath(root, path string) bool { + if !samePath(filepath.Dir(path), filepath.Clean(root)) { + return false + } + return IsCortexSessionFile(filepath.Base(path)) +} + +func cortexSessionIDFromPath(root, path string) string { + if !isCortexSourcePath(root, path) { + return "" + } + return strings.TrimSuffix(filepath.Base(path), ".json") +} + +func cortexHistoryCompanionPath(path string) string { + return strings.TrimSuffix(path, ".json") + ".history.jsonl" +} + +func cortexCompanionInfo(path string) (os.FileInfo, bool, error) { + info, err := os.Stat(path) + if os.IsNotExist(err) { + return nil, false, nil + } + if err != nil { + return nil, false, fmt.Errorf("stat %s: %w", path, err) + } + if info.IsDir() { + return nil, false, nil + } + return info, true, nil +} + +func addCortexFingerprintPart( + h interface{ Write([]byte) (int, error) }, + label string, + path string, + info os.FileInfo, +) error { + hash, err := hashJSONLSourceFile(path) + if err != nil { + return err + } + _, _ = fmt.Fprintf( + h, + "%s:%s:%d:%d:%s\n", + label, + filepath.Base(path), + info.Size(), + info.ModTime().UnixNano(), + hash, + ) + return nil +} + +func cortexProviderCapabilities() Capabilities { + return Capabilities{ + Source: jsonlFileProviderSourceCapabilities(), + Content: ContentCapabilities{ + FirstMessage: CapabilitySupported, + SessionName: CapabilitySupported, + Cwd: CapabilitySupported, + ToolCalls: CapabilitySupported, + ToolResults: CapabilitySupported, + }, + } +} diff --git a/internal/parser/cortex_provider_test.go b/internal/parser/cortex_provider_test.go new file mode 100644 index 000000000..3d89c137c --- /dev/null +++ b/internal/parser/cortex_provider_test.go @@ -0,0 +1,218 @@ +package parser + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCortexProviderFactoryReplacesLegacyAdapter(t *testing.T) { + factory, ok := ProviderFactoryByType(AgentCortex) + require.True(t, ok) + require.NotNil(t, factory) + + caps := factory.Capabilities() + assert.Equal(t, CapabilitySupported, caps.Source.DiscoverSources) + assert.Equal(t, CapabilitySupported, caps.Source.WatchSources) + assert.Equal(t, CapabilitySupported, caps.Source.ClassifyChangedPath) + assert.Equal(t, CapabilitySupported, caps.Source.FindSource) + assert.Equal(t, CapabilitySupported, caps.Source.CompositeFingerprint) + assert.Equal(t, CapabilitySupported, caps.Content.FirstMessage) + assert.Equal(t, CapabilitySupported, caps.Content.SessionName) + assert.Equal(t, CapabilitySupported, caps.Content.Cwd) + assert.Equal(t, CapabilitySupported, caps.Content.ToolCalls) + assert.Equal(t, CapabilitySupported, caps.Content.ToolResults) + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{t.TempDir()}, + Machine: "devbox", + }) + require.True(t, ok) + require.NotNil(t, provider) +} + +func TestCortexProviderSourceMethods(t *testing.T) { + root := t.TempDir() + otherID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + sourcePath := filepath.Join(root, cortexTestUUID+".json") + otherPath := filepath.Join(root, otherID+".json") + writeSourceFile(t, sourcePath, minimalCortexSession(cortexTestUUID)) + writeSourceFile(t, otherPath, minimalCortexSession(otherID)) + writeSourceFile(t, filepath.Join(root, cortexTestUUID+".history.jsonl"), "{}\n") + writeSourceFile(t, filepath.Join(root, cortexTestUUID+".back.123.json"), "{}\n") + writeSourceFile(t, filepath.Join(root, "has spaces.json"), "{}\n") + writeSourceFile(t, filepath.Join(root, "nested", cortexTestUUID+".json"), "{}\n") + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 2) + assert.Equal(t, []string{sourcePath, otherPath}, sourceDisplayPaths(discovered)) + assert.Equal(t, []string{"", ""}, sourceProjects(discovered)) + + plan, err := provider.WatchPlan(context.Background()) + require.NoError(t, err) + require.Len(t, plan.Roots, 1) + assert.Equal(t, root, plan.Roots[0].Path) + assert.False(t, plan.Roots[0].Recursive) + assert.Equal(t, []string{"*.json", "*.history.jsonl"}, plan.Roots[0].IncludeGlobs) + + found, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "host~cortex:" + cortexTestUUID, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, sourcePath, found.DisplayPath) + + fingerprint, err := provider.Fingerprint(context.Background(), found) + require.NoError(t, err) + assert.Equal(t, sourcePath, fingerprint.Key) + assert.NotZero(t, fingerprint.Size) + assert.NotZero(t, fingerprint.MTimeNS) + + found, ok, err = provider.FindSource(context.Background(), FindSourceRequest{ + StoredFilePath: otherPath, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, otherPath, found.DisplayPath) + + _, ok, err = provider.FindSource(context.Background(), FindSourceRequest{ + RawSessionID: "../" + cortexTestUUID, + }) + require.NoError(t, err) + assert.False(t, ok) + + require.NoError(t, os.Remove(sourcePath)) + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: sourcePath, EventKind: "remove", WatchRoot: root}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) +} + +func TestCortexProviderClassifiesAndFingerprintsHistoryCompanion(t *testing.T) { + root := t.TempDir() + sourcePath := filepath.Join(root, cortexTestUUID+".json") + historyPath := filepath.Join(root, cortexTestUUID+".history.jsonl") + writeSourceFile(t, sourcePath, `{ + "session_id":"`+cortexTestUUID+`", + "working_directory":"/home/user/project" + }`) + writeSourceFile( + t, + historyPath, + `{"role":"user","id":"m1","content":[{"type":"text","text":"from history"}]}`+"\n", + ) + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + changed, err := provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: historyPath, EventKind: "write", WatchRoot: root}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) + assert.Equal(t, sourcePath, changed[0].FingerprintKey) + + before, err := provider.Fingerprint(context.Background(), changed[0]) + require.NoError(t, err) + assert.Equal(t, sourcePath, before.Key) + assert.NotEmpty(t, before.Hash) + + writeSourceFile( + t, + historyPath, + `{"role":"user","id":"m1","content":[{"type":"text","text":"updated history"}]}`+"\n", + ) + after, err := provider.Fingerprint(context.Background(), changed[0]) + require.NoError(t, err) + assert.Equal(t, sourcePath, after.Key) + assert.NotEqual(t, before.Hash, after.Hash) + assert.NotEqual(t, before.Size, after.Size) + + require.NoError(t, os.Remove(historyPath)) + changed, err = provider.SourcesForChangedPath( + context.Background(), + ChangedPathRequest{Path: historyPath, EventKind: "remove", WatchRoot: root}, + ) + require.NoError(t, err) + require.Len(t, changed, 1) + assert.Equal(t, sourcePath, changed[0].DisplayPath) +} + +func TestCortexProviderSourceMethodsFollowSymlinkedSessionFile(t *testing.T) { + root := t.TempDir() + targetRoot := t.TempDir() + sourcePath := filepath.Join(root, cortexTestUUID+".json") + targetPath := filepath.Join(targetRoot, cortexTestUUID+".json") + writeSourceFile(t, targetPath, minimalCortexSession(cortexTestUUID)) + if err := os.Symlink(targetPath, sourcePath); err != nil { + t.Skipf("symlink not supported: %v", err) + } + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + + discovered, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, discovered, 1) + assert.Equal(t, sourcePath, discovered[0].DisplayPath) + + found, ok, err := provider.FindSource(context.Background(), FindSourceRequest{ + FullSessionID: "host~cortex:" + cortexTestUUID, + }) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, sourcePath, found.DisplayPath) +} + +func TestCortexProviderParse(t *testing.T) { + root := t.TempDir() + sourcePath := filepath.Join(root, cortexTestUUID+".json") + writeSourceFile(t, sourcePath, minimalCortexSession(cortexTestUUID)) + + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{root}, + Machine: "devbox", + }) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, sources, 1) + + outcome, err := provider.Parse(context.Background(), ParseRequest{ + Source: sources[0], + Fingerprint: SourceFingerprint{Key: sourcePath, Hash: "abc123"}, + }) + require.NoError(t, err) + require.True(t, outcome.ResultSetComplete) + require.Len(t, outcome.Results, 1) + result := outcome.Results[0] + assert.Equal(t, DataVersionCurrent, result.DataVersion) + assert.Equal(t, "cortex:"+cortexTestUUID, result.Result.Session.ID) + assert.Equal(t, "project", result.Result.Session.Project) + assert.Equal(t, "devbox", result.Result.Session.Machine) + assert.Equal(t, "abc123", result.Result.Session.File.Hash) + assert.Equal(t, "Test session", result.Result.Session.SessionName) + assert.Len(t, result.Result.Messages, 2) +} diff --git a/internal/parser/cortex_test.go b/internal/parser/cortex_test.go index a1191619f..061038a61 100644 --- a/internal/parser/cortex_test.go +++ b/internal/parser/cortex_test.go @@ -1,6 +1,7 @@ package parser import ( + "context" "os" "path/filepath" "runtime" @@ -37,11 +38,23 @@ func minimalCortexSession(sessionID string) string { }` } +func parseCortexSessionForTest( + t *testing.T, + path, machine string, +) (*ParsedSession, []ParsedMessage, error) { + t.Helper() + provider, ok := NewProvider(AgentCortex, ProviderConfig{Machine: machine}) + require.True(t, ok) + cortex, ok := provider.(*cortexProvider) + require.True(t, ok) + return cortex.parseSession(path, machine) +} + func TestParseCortexSession_Basic(t *testing.T) { content := minimalCortexSession(cortexTestUUID) path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -61,7 +74,7 @@ func TestParseCortexSession_EmptySessionID(t *testing.T) { content := `{"session_id": "", "history": []}` path := createTestFile(t, "empty.json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) assert.Nil(t, sess) assert.Nil(t, msgs) @@ -92,7 +105,7 @@ func TestParseCortexSession_SkipsInternalBlocks(t *testing.T) { }` path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -137,7 +150,7 @@ func TestParseCortexSession_ToolUse(t *testing.T) { }` path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -177,7 +190,7 @@ func TestParseCortexSession_SplitHistoryJSONL(t *testing.T) { histPath := filepath.Join(dir, uuid+".history.jsonl") require.NoError(t, os.WriteFile(histPath, []byte(lines), 0o644)) - sess, msgs, err := ParseCortexSession(metaPath, "local") + sess, msgs, err := parseCortexSessionForTest(t, metaPath, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -210,7 +223,7 @@ func TestParseCortexSession_SplitHistoryReadError(t *testing.T) { require.NoError(t, os.Chmod(histPath, 0o000)) t.Cleanup(func() { os.Chmod(histPath, 0o644) }) - _, _, err := ParseCortexSession(metaPath, "local") + _, _, err := parseCortexSessionForTest(t, metaPath, "local") require.Error(t, err, "non-ENOENT read error should propagate") assert.Contains(t, err.Error(), "read history") } @@ -229,7 +242,7 @@ func TestParseCortexSession_SplitHistoryMissing(t *testing.T) { metaPath := filepath.Join(dir, uuid+".json") require.NoError(t, os.WriteFile(metaPath, []byte(meta), 0o644)) - sess, msgs, err := ParseCortexSession(metaPath, "local") + sess, msgs, err := parseCortexSessionForTest(t, metaPath, "local") require.NoError(t, err) assert.Nil(t, sess, "missing JSONL should silently skip") assert.Nil(t, msgs) @@ -260,7 +273,7 @@ func TestParseCortexSession_FirstUserTurnSystemOnly(t *testing.T) { }` path := createTestFile(t, cortexTestUUID+".json", content) - sess, msgs, err := ParseCortexSession(path, "local") + sess, msgs, err := parseCortexSessionForTest(t, path, "local") require.NoError(t, err) require.NotNil(t, sess) @@ -318,16 +331,25 @@ func TestDiscoverCortexSessions(t *testing.T) { filepath.Join(dir, name), []byte(""), 0o644)) } - files := DiscoverCortexSessions(dir) - require.Len(t, files, 2) - for _, f := range files { - assert.Equal(t, AgentCortex, f.Agent) - } + provider, ok := NewProvider(AgentCortex, ProviderConfig{Roots: []string{dir}}) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + require.Len(t, sources, 2) + assert.Equal(t, []string{ + filepath.Join(dir, cortexTestUUID+".json"), + filepath.Join(dir, uuid2+".json"), + }, sourceDisplayPaths(sources)) } func TestDiscoverCortexSessions_EmptyDir(t *testing.T) { - assert.Nil(t, DiscoverCortexSessions("")) - assert.Nil(t, DiscoverCortexSessions("/nonexistent")) + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{"", "/nonexistent"}, + }) + require.True(t, ok) + sources, err := provider.Discover(context.Background()) + require.NoError(t, err) + assert.Empty(t, sources) } func TestFindCortexSourceFile(t *testing.T) { @@ -349,8 +371,21 @@ func TestFindCortexSourceFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := FindCortexSourceFile(tt.dir, tt.sessionID) - assert.Equal(t, tt.want, got) + provider, ok := NewProvider(AgentCortex, ProviderConfig{ + Roots: []string{tt.dir}, + }) + require.True(t, ok) + source, ok, err := provider.FindSource( + context.Background(), + FindSourceRequest{RawSessionID: tt.sessionID}, + ) + require.NoError(t, err) + if tt.want == "" { + assert.False(t, ok) + return + } + require.True(t, ok) + assert.Equal(t, tt.want, source.DisplayPath) }) } } diff --git a/internal/parser/provider.go b/internal/parser/provider.go index c2a56bd04..4b2d7e76a 100644 --- a/internal/parser/provider.go +++ b/internal/parser/provider.go @@ -351,6 +351,8 @@ func providerFactoryForDef(def AgentDef) ProviderFactory { return newAmpProviderFactory(def) case AgentCommandCode: return newCommandCodeProviderFactory(def) + case AgentCortex: + return newCortexProviderFactory(def) case AgentDeepSeekTUI: return newDeepSeekTUIProviderFactory(def) case AgentIflow: diff --git a/internal/parser/provider_migration.go b/internal/parser/provider_migration.go index aa1b8e323..4a1164d78 100644 --- a/internal/parser/provider_migration.go +++ b/internal/parser/provider_migration.go @@ -43,7 +43,7 @@ var providerMigrationModes = map[AgentType]ProviderMigrationMode{ AgentChatGPT: ProviderMigrationLegacyOnly, AgentKiro: ProviderMigrationLegacyOnly, AgentKiroIDE: ProviderMigrationLegacyOnly, - AgentCortex: ProviderMigrationLegacyOnly, + AgentCortex: ProviderMigrationProviderAuthoritative, AgentHermes: ProviderMigrationLegacyOnly, AgentWorkBuddy: ProviderMigrationProviderAuthoritative, AgentForge: ProviderMigrationLegacyOnly, diff --git a/internal/parser/types.go b/internal/parser/types.go index 3b4fc8997..888bfba7e 100644 --- a/internal/parser/types.go +++ b/internal/parser/types.go @@ -439,10 +439,8 @@ var Registry = []AgentDef{ DefaultDirs: []string{ ".snowflake/cortex/conversations", }, - IDPrefix: "cortex:", - FileBased: true, - DiscoverFunc: DiscoverCortexSessions, - FindSourceFunc: FindCortexSourceFile, + IDPrefix: "cortex:", + FileBased: true, }, { Type: AgentHermes, diff --git a/internal/sync/classify_cortex_test.go b/internal/sync/classify_cortex_test.go index 68e2f64d7..6f3ddc9da 100644 --- a/internal/sync/classify_cortex_test.go +++ b/internal/sync/classify_cortex_test.go @@ -21,11 +21,15 @@ func TestClassifyOnePath_Cortex(t *testing.T) { require.NoError(t, os.WriteFile(jsonlPath, []byte("{}"), 0o644)) eng := &Engine{ + db: openTestDB(t), agentDirs: map[parser.AgentType][]string{ parser.AgentCortex: {dir}, }, + providerFactories: providerFactoryMap(parser.ProviderFactories()), + providerMigrationModes: map[parser.AgentType]parser.ProviderMigrationMode{ + parser.AgentCortex: parser.ProviderMigrationProviderAuthoritative, + }, } - geminiMap := make(map[string]map[string]string) tests := []struct { name string @@ -71,9 +75,14 @@ func TestClassifyOnePath_Cortex(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, ok := eng.classifyOnePath(tt.path, geminiMap) - assert.Equal(t, tt.want, ok) - if ok { + files := eng.classifyPaths([]string{tt.path}) + if !tt.want { + assert.Empty(t, files) + return + } + require.Len(t, files, 1) + got := files[0] + if tt.want { assert.Equal(t, tt.agent, got.Agent) assert.Equal(t, tt.retPath, got.Path) } diff --git a/internal/sync/cortex_integration_test.go b/internal/sync/cortex_integration_test.go new file mode 100644 index 000000000..abbe36ee4 --- /dev/null +++ b/internal/sync/cortex_integration_test.go @@ -0,0 +1,70 @@ +package sync_test + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.kenn.io/agentsview/internal/dbtest" + "go.kenn.io/agentsview/internal/parser" + "go.kenn.io/agentsview/internal/sync" +) + +func TestSyncAllSinceCortexHistoryUpdateTriggersResync(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + cortexDir := t.TempDir() + testDB := dbtest.OpenTestDB(t) + engine := sync.NewEngine(testDB, sync.EngineConfig{ + AgentDirs: map[parser.AgentType][]string{ + parser.AgentCortex: {cortexDir}, + }, + Machine: "local", + }) + + uuid := "11111111-2222-3333-4444-555555555555" + metaPath := filepath.Join(cortexDir, uuid+".json") + historyPath := filepath.Join(cortexDir, uuid+".history.jsonl") + require.NoError(t, os.WriteFile(metaPath, []byte(cortexSyncMeta(uuid)), 0o644)) + require.NoError(t, os.WriteFile(historyPath, []byte(cortexSyncHistory("Before cutoff")), 0o644)) + + baseTime := time.Unix(1_781_475_200, 0) + require.NoError(t, os.Chtimes(metaPath, baseTime, baseTime)) + require.NoError(t, os.Chtimes(historyPath, baseTime, baseTime)) + + engine.SyncPaths([]string{metaPath}) + assertMessageContent(t, testDB, "cortex:"+uuid, "Before cutoff", "ack") + + cutoff := baseTime.Add(500 * time.Millisecond) + historyTime := baseTime.Add(time.Second) + require.NoError(t, os.WriteFile(historyPath, []byte(cortexSyncHistory("After cutoff")), 0o644)) + require.NoError(t, os.Chtimes(historyPath, historyTime, historyTime)) + + stats := engine.SyncAllSince(context.Background(), cutoff, nil) + require.Equal(t, 1, stats.Synced, "synced = %d, want 1", stats.Synced) + assertMessageContent(t, testDB, "cortex:"+uuid, "After cutoff", "ack") +} + +func cortexSyncMeta(uuid string) string { + return `{ + "session_id": "` + uuid + `", + "title": "Cortex split history", + "working_directory": "/home/user/cortex-project", + "created_at": "2024-06-01T10:00:00Z", + "last_updated": "2024-06-01T10:05:00Z" +}` +} + +func cortexSyncHistory(prompt string) string { + return strings.Join([]string{ + `{"role":"user","id":"m1","content":[{"type":"text","text":"` + prompt + `"}]}`, + `{"role":"assistant","id":"m2","content":[{"type":"text","text":"ack"}]}`, + }, "\n") + "\n" +} diff --git a/internal/sync/engine.go b/internal/sync/engine.go index e6f4830bb..6ed17725b 100644 --- a/internal/sync/engine.go +++ b/internal/sync/engine.go @@ -1416,43 +1416,6 @@ func (e *Engine) classifyOnePath( } } - // Cortex: /.json - // or: /.history.jsonl → remap to .json - for _, cortexDir := range e.agentDirs[parser.AgentCortex] { - if cortexDir == "" { - continue - } - if rel, ok := isUnder(cortexDir, path); ok { - if strings.Count(rel, sep) != 0 { - continue - } - name := filepath.Base(rel) - - // .history.jsonl companion → remap to .json metadata. - if stem, ok := strings.CutSuffix( - name, ".history.jsonl", - ); ok { - jsonPath := filepath.Join( - cortexDir, stem+".json", - ) - if parser.IsCortexSessionFile(stem + ".json") { - return parser.DiscoveredFile{ - Path: jsonPath, - Agent: parser.AgentCortex, - }, true - } - continue - } - - if parser.IsCortexSessionFile(name) { - return parser.DiscoveredFile{ - Path: path, - Agent: parser.AgentCortex, - }, true - } - } - } - // Antigravity IDE: /conversations/.db (+ -wal, -shm). // annotations/.pbtxt and brain//* sidecar events are // handled in classifyPaths via classifyAntigravitySidecarPath, @@ -2829,7 +2792,7 @@ func (e *Engine) syncAllLocked( if !since.IsZero() { all = e.dedupeClaudeDiscoveredFiles(all) - all = e.filterFilesByMtime(all, since) + all = e.filterFilesByMtime(ctx, all, since) } all = dedupeDiscoveredFiles(all) @@ -3336,13 +3299,15 @@ func (e *Engine) recordSyncFinished() { // dropped). The cost is one stat per file — acceptable for // polling use cases where most files will be skipped. func (e *Engine) filterFilesByMtime( - files []parser.DiscoveredFile, cutoff time.Time, + ctx context.Context, + files []parser.DiscoveredFile, + cutoff time.Time, ) []parser.DiscoveredFile { cutoffNs := cutoff.UnixNano() out := files[:0] codexIndexRefresh := make(map[string][]parser.DiscoveredFile) for _, f := range files { - mtime, err := discoveredFileMtime(f) + mtime, err := e.discoveredFileEffectiveMtime(ctx, f) if err != nil { out = append(out, f) continue @@ -3374,6 +3339,53 @@ func (e *Engine) filterFilesByMtime( return out } +func (e *Engine) discoveredFileEffectiveMtime( + ctx context.Context, + file parser.DiscoveredFile, +) (int64, error) { + if file.ProviderSource != nil && file.ProviderProcess { + if mtime, ok, err := e.providerFingerprintMtime(ctx, file); err != nil { + return 0, err + } else if ok { + return mtime, nil + } + } + return discoveredFileMtime(file) +} + +func (e *Engine) providerFingerprintMtime( + ctx context.Context, + file parser.DiscoveredFile, +) (int64, bool, error) { + if file.ProviderSource == nil { + return 0, false, nil + } + factory, ok := e.providerFactories[file.Agent] + if !ok || factory == nil { + return 0, false, nil + } + source := *file.ProviderSource + if source.Provider != "" && source.Provider != file.Agent { + return 0, false, fmt.Errorf( + "provider source mismatch for %s: %s", + file.Agent, + source.Provider, + ) + } + provider := factory.NewProvider(parser.ProviderConfig{ + Roots: e.agentDirs[file.Agent], + Machine: e.machine, + }) + fingerprint, err := provider.Fingerprint(ctx, source) + if err != nil { + return 0, false, err + } + if fingerprint.MTimeNS == 0 { + return 0, false, nil + } + return fingerprint.MTimeNS, true, nil +} + func discoveredFileMtime( file parser.DiscoveredFile, ) (int64, error) { @@ -4460,8 +4472,6 @@ func (e *Engine) processFile( res = e.processKiro(file, info) case parser.AgentKiroIDE: res = e.processKiroIDE(file, info) - case parser.AgentCortex: - res = e.processCortex(file, info) case parser.AgentHermes: res = e.processHermes(file, info) case parser.AgentVibe: @@ -6505,35 +6515,6 @@ func (e *Engine) processKiroIDE( } } -func (e *Engine) processCortex( - file parser.DiscoveredFile, info os.FileInfo, -) processResult { - if e.shouldSkipByPath(file.Path, info) { - return processResult{skip: true} - } - - sess, msgs, err := parser.ParseCortexSession( - file.Path, e.machine, - ) - if err != nil { - return processResult{err: err} - } - if sess == nil { - return processResult{} - } - - hash, err := ComputeFileHash(file.Path) - if err == nil { - sess.File.Hash = hash - } - - return processResult{ - results: []parser.ParseResult{ - {Session: *sess, Messages: msgs}, - }, - } -} - func (e *Engine) processHermes( file parser.DiscoveredFile, info os.FileInfo, ) processResult { @@ -8096,6 +8077,7 @@ func shouldReplaceFullParseMessages( pw.sess.Agent == parser.AgentAntigravity || pw.sess.Agent == parser.AgentAntigravityCLI || pw.sess.Agent == parser.AgentQwenPaw || + pw.sess.Agent == parser.AgentCortex || // Vibe pairs later tool-result carrier records back to an // earlier assistant tool call. An incremental append would // only add the new ordinals and leave the existing tool call's