From 393f6328f51eeb1a6202fac9e35b9e25c1919e9a Mon Sep 17 00:00:00 2001 From: Lazare-42 Date: Thu, 25 Jun 2026 19:12:39 +0200 Subject: [PATCH 1/4] fix(query): re-probe Parquet schema when cache changes underneath engine The DuckDB engine probed each Parquet table's optional columns once at construction and trusted that snapshot for the process lifetime. A long-running mcp-http server therefore went stale when build-cache/sync rewrote the analytics cache with a different column set: the cached "column present" verdict put a now-absent column into a SELECT * REPLACE list, which DuckDB rejects with Binder Error: Column "message_type" in REPLACE list not found in FROM clause crashing every search_messages and aggregate call until restart. Fingerprint the cache (per-table file count + size + mtime) and re-probe optional columns (and re-register views) on demand when it changes. Guard optionalCols with a RWMutex since refresh now happens concurrently with reads. Add a regression test that rebuilds the cache underneath a live engine and asserts the query recovers instead of crashing. Co-Authored-By: Claude Opus 4.8 (1M context) --- internal/query/duckdb.go | 79 ++++++++++++++++++++ internal/query/duckdb_cache_drift_test.go | 87 +++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 internal/query/duckdb_cache_drift_test.go diff --git a/internal/query/duckdb.go b/internal/query/duckdb.go index e4a31059..644f36d6 100644 --- a/internal/query/duckdb.go +++ b/internal/query/duckdb.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log" + "os" "path/filepath" "runtime" "strings" @@ -46,7 +47,14 @@ type DuckDBEngine struct { // Used to gracefully handle stale cache files that lack newer columns // (e.g. phone_number, attachment_count, sender_id, message_type added in PR #160). // Map: table_name -> column_name -> exists_in_parquet + // + // Guarded by optColsMu because a long-running server (e.g. mcp-http) may + // have the analytics cache rebuilt underneath it by build-cache/sync. When + // that happens the column set can change, so optionalCols is re-probed on + // demand (see ensureFreshOptionalCols) rather than only at construction. + optColsMu sync.RWMutex optionalCols map[string]map[string]bool + cacheFP string // fingerprint of the Parquet cache at last probe // Search result cache: keeps the materialized temp table alive across // pagination calls for the same search query, avoiding repeated Parquet scans. @@ -164,6 +172,9 @@ func NewDuckDBEngine(analyticsDir string, sqlitePath string, sqliteDB *sql.DB, o if len(missing) > 0 { log.Printf("[warn] Parquet cache missing columns %v — run 'msgvault build-cache --full-rebuild' to update", missing) } + // Record the cache fingerprint so ensureFreshOptionalCols can detect a + // later rebuild and re-probe instead of trusting this snapshot forever. + engine.cacheFP = engine.cacheFingerprint() // Register SQL views over Parquet files for raw SQL access. // Pass the already-probed optionalCols to avoid a redundant schema probe. @@ -189,6 +200,10 @@ func (e *DuckDBEngine) Close() error { func (e *DuckDBEngine) QuerySQL( ctx context.Context, sqlStr string, ) (*QueryResult, error) { + // Refresh views if the cache changed since startup, so the registered + // views match the current Parquet schema. + e.ensureFreshOptionalCols() + rows, err := e.db.QueryContext(ctx, sqlStr) if err != nil { return nil, fmt.Errorf("execute query: %w", err) @@ -250,6 +265,8 @@ func (e *DuckDBEngine) probeParquetColumns( // hasCol returns true if the named column exists in the Parquet schema for the given table. func (e *DuckDBEngine) hasCol(table, col string) bool { + e.optColsMu.RLock() + defer e.optColsMu.RUnlock() if e.optionalCols == nil { return true // no probe data — assume present (backwards compatible) } @@ -260,6 +277,64 @@ func (e *DuckDBEngine) hasCol(table, col string) bool { return tbl[col] } +// cacheFingerprint computes a cheap signature of the analytics Parquet cache. +// It combines, per probed table, the file count and each file's size and +// modification time. When build-cache or sync rewrites the cache underneath a +// long-running process the fingerprint changes, letting the engine detect that +// its probed column set is stale. Pure filesystem stats — no DuckDB access. +func (e *DuckDBEngine) cacheFingerprint() string { + var b strings.Builder + globs := []string{ + filepath.Join(e.analyticsDir, datasetMessages, "*", "*.parquet"), + e.parquetPath(datasetParticipants), + e.parquetPath(datasetConversations), + e.parquetPath("sources"), + } + for _, g := range globs { + matches, _ := filepath.Glob(g) + fmt.Fprintf(&b, "%s#%d|", g, len(matches)) + for _, m := range matches { + if fi, err := os.Stat(m); err == nil { + fmt.Fprintf(&b, "%s=%d,%d;", m, fi.Size(), fi.ModTime().UnixNano()) + } + } + } + return b.String() +} + +// ensureFreshOptionalCols re-probes the Parquet schema (and re-registers the +// SQL views) when the analytics cache has changed since the last probe. This +// guards long-running engines (e.g. the mcp-http server) against a binder +// error when build-cache rewrites the cache with a different column set: +// without it, a stale "column present" verdict puts a now-absent column into a +// SELECT * REPLACE list, which DuckDB rejects with +// "Column ... in REPLACE list not found in FROM clause". Cheap on the common +// no-change path (a handful of os.Stat calls). +func (e *DuckDBEngine) ensureFreshOptionalCols() { + fp := e.cacheFingerprint() + + e.optColsMu.RLock() + unchanged := fp == e.cacheFP + e.optColsMu.RUnlock() + if unchanged { + return + } + + e.optColsMu.Lock() + defer e.optColsMu.Unlock() + if fp == e.cacheFP { // another goroutine refreshed while we waited + return + } + + newCols := probeAllOptionalColumns(e.db, e.analyticsDir) + e.optionalCols = newCols + e.cacheFP = fp + if err := RegisterViewsWithColumns(e.db, e.analyticsDir, newCols); err != nil { + log.Printf("[warn] re-register views after analytics cache change: %v", err) + } + log.Printf("[info] analytics cache changed — re-probed Parquet optional columns") +} + // parquetCTEs returns common CTEs for reading all Parquet tables. // This is used by aggregate queries that need to join across tables. // parquetCTEs returns the WITH clause body that defines CTEs for all Parquet @@ -272,6 +347,10 @@ func (e *DuckDBEngine) hasCol(table, col string) bool { // are handled gracefully: if the Parquet file predates their addition, they // are synthesised with sensible defaults instead of causing a binder error. func (e *DuckDBEngine) parquetCTEs() string { + // Re-probe if build-cache/sync rewrote the cache underneath us, so the + // REPLACE list below never references a column the current Parquet lacks. + e.ensureFreshOptionalCols() + // --- messages CTE --- msgReplace := []string{ "CAST(id AS BIGINT) AS id", diff --git a/internal/query/duckdb_cache_drift_test.go b/internal/query/duckdb_cache_drift_test.go new file mode 100644 index 00000000..21503afa --- /dev/null +++ b/internal/query/duckdb_cache_drift_test.go @@ -0,0 +1,87 @@ +package query + +import ( + "context" + "database/sql" + "path/filepath" + "testing" + + _ "github.com/duckdb/duckdb-go/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.kenn.io/msgvault/internal/search" +) + +// TestDuckDBEngine_CacheRebuiltUnderneath reproduces the production crash where +// a long-running engine (the mcp-http server) probed the Parquet schema once at +// startup, then build-cache/sync rewrote the cache with a different column set. +// The stale "message_type present" verdict put the column into a SELECT * +// REPLACE list that the new Parquet lacked, yielding: +// +// Binder Error: Column "message_type" in REPLACE list not found in FROM clause +// +// The engine must detect the cache change and re-probe instead of crashing. +func TestDuckDBEngine_CacheRebuiltUnderneath(t *testing.T) { + // Pre-rebuild: current schema (message_type, sender_id, attachment_count present). + const newMessagesCols = messagesCols + // Post-rebuild: old schema written by a stale cache builder (no new columns). + const oldMessagesCols = "id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, deleted_from_source_at, year, month" + + pb := newParquetBuilder(t). + addTable("messages", "messages/year=2024", "data.parquet", newMessagesCols, ` + (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1) + `). + addTable("sources", "sources", "sources.parquet", sourcesCols, `(1::BIGINT, 'test@gmail.com', 'gmail')`). + addTable("participants", "participants", "participants.parquet", participantsCols, `(1::BIGINT, 'alice@test.com', 'test.com', 'Alice', '')`). + addTable("message_recipients", "message_recipients", "message_recipients.parquet", messageRecipientsCols, `(1::BIGINT, 1::BIGINT, 'from', 'Alice')`). + addEmptyTable("labels", "labels", "labels.parquet", labelsCols, `(1::BIGINT, 'x')`). + addEmptyTable("message_labels", "message_labels", "message_labels.parquet", messageLabelsCols, `(1::BIGINT, 1::BIGINT)`). + addEmptyTable("attachments", "attachments", "attachments.parquet", attachmentsCols, `(1::BIGINT, 100::BIGINT, 'x')`). + addTable("conversations", "conversations", "conversations.parquet", conversationsCols, `(100::BIGINT, 'thread100', '')`) + + analyticsDir, cleanup := pb.build() + t.Cleanup(cleanup) + + engine, err := NewDuckDBEngine(analyticsDir, "", nil) + require.NoError(t, err, "NewDuckDBEngine") + t.Cleanup(func() { _ = engine.Close() }) + + ctx := context.Background() + + // Startup probe sees the new schema. + require.True(t, engine.hasCol("messages", "message_type"), + "message_type should be detected as present in the initial schema") + res, err := engine.SearchFast(ctx, search.Parse("SOFRA"), MessageFilter{}, 10, 0) + require.NoError(t, err, "SearchFast before rebuild") + require.Len(t, res, 1) + + // build-cache rewrites the messages Parquet with the OLD schema underneath + // the running engine — message_type/sender_id/attachment_count disappear. + msgPath := filepath.Join(analyticsDir, "messages", "year=2024", "data.parquet") + rewriteParquetForTest(t, msgPath, oldMessagesCols, ` + (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, NULL::TIMESTAMP, 2024, 1) + `) + + // Must re-probe and succeed rather than fail with the REPLACE binder error. + res, err = engine.SearchFast(ctx, search.Parse("SOFRA"), MessageFilter{}, 10, 0) + require.NoError(t, err, "SearchFast after cache rebuilt underneath engine") + require.Len(t, res, 1) + assert.Equal(t, "Hello SOFRA", res[0].Subject) + assert.False(t, engine.hasCol("messages", "message_type"), + "message_type should be re-probed as absent after the rebuild") + + // Aggregate (the other reported-broken path) must also recover. + agg, err := engine.Aggregate(ctx, ViewSenders, DefaultAggregateOptions()) + require.NoError(t, err, "Aggregate after cache rebuilt underneath engine") + require.Len(t, agg, 1) +} + +// rewriteParquetForTest overwrites an existing Parquet file with a new schema +// and rows, simulating an out-of-band cache rebuild. +func rewriteParquetForTest(t *testing.T, path, columns, values string) { + t.Helper() + db, err := sql.Open("duckdb", "") + require.NoError(t, err, "open duckdb") + defer func() { _ = db.Close() }() + writeTableParquet(t, db, escapePath(path), columns, values, false) +} From 1dd0bb73701fd06e489885f8dfc3b807e21f49af Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 26 Jun 2026 18:01:42 -0500 Subject: [PATCH 2/4] fix(query): invalidate search cache after Parquet rebuild SearchFastWithStats kept a materialized temp table alive for pagination and keyed it only by search predicates. When build-cache or sync rewrote the Parquet analytics files under a long-running engine, the optional-column probe could refresh while the search path still served rows, counts, and stats from the pre-rebuild temp table.\n\nInclude the observed Parquet cache fingerprint in the search cache key and refresh that fingerprint before considering a cache hit, so repeated searches rematerialize against current analytics data after an out-of-band cache rebuild. The regression rewrites Parquet beneath a live engine and repeats the same fast search to prove counts, stats, and row ordering come from the rebuilt cache.\n\nGenerated with Codex (GPT-5)\nCo-authored-by: Codex --- internal/query/duckdb.go | 32 +++++++++++----- internal/query/duckdb_cache_drift_test.go | 46 +++++++++++++++++++++++ internal/query/duckdb_test.go | 24 +++++++++++- 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/internal/query/duckdb.go b/internal/query/duckdb.go index 644f36d6..f9fa2772 100644 --- a/internal/query/duckdb.go +++ b/internal/query/duckdb.go @@ -335,6 +335,12 @@ func (e *DuckDBEngine) ensureFreshOptionalCols() { log.Printf("[info] analytics cache changed — re-probed Parquet optional columns") } +func (e *DuckDBEngine) currentCacheFingerprint() string { + e.optColsMu.RLock() + defer e.optColsMu.RUnlock() + return e.cacheFP +} + // parquetCTEs returns common CTEs for reading all Parquet tables. // This is used by aggregate queries that need to join across tables. // parquetCTEs returns the WITH clause body that defines CTEs for all Parquet @@ -2237,20 +2243,22 @@ func (e *DuckDBEngine) SearchFastCount(ctx context.Context, q *search.Query, fil return count, nil } -// searchCacheKeyFor builds a deterministic cache key from search conditions and args. -// Same query+filter always produces the same key. Uses JSON encoding to avoid +// searchCacheKeyFor builds a deterministic cache key from search conditions, +// args, and the Parquet cache fingerprint. Same query+filter over the same +// analytics cache always produces the same key. Uses JSON encoding to avoid // ambiguity from delimiter collisions (e.g. args containing commas or pipes). -func searchCacheKeyFor(conditions []string, args []any) string { +func searchCacheKeyFor(conditions []string, args []any, cacheFP string) string { // JSON marshaling is unambiguous: each element is quoted/escaped independently. // Errors are impossible for string/int/float/bool args, but fall back to fmt. key := struct { - C []string `json:"c"` - A []any `json:"a"` - }{conditions, args} + C []string `json:"c"` + A []any `json:"a"` + FP string `json:"fp"` + }{conditions, args, cacheFP} b, err := json.Marshal(key) if err != nil { // Fallback: should never happen with the types buildSearchConditions produces. - return fmt.Sprintf("%v#%v", conditions, args) + return fmt.Sprintf("%v#%v#%s", conditions, args, cacheFP) } return string(b) } @@ -2441,8 +2449,14 @@ func (e *DuckDBEngine) SearchFastWithStats(ctx context.Context, q *search.Query, e.searchCacheMu.Lock() defer e.searchCacheMu.Unlock() - // Check cache: same conditions+args means same search, serve from cached table. - cacheKey := searchCacheKeyFor(conditions, args) + // Refresh before checking the search cache. A cache-hit page query may call + // parquetCTEs(), but that is too late to decide whether the already + // materialized temp table still represents the current Parquet data. + e.ensureFreshOptionalCols() + + // Check cache: same conditions+args+Parquet fingerprint means same search, + // serve from cached table. + cacheKey := searchCacheKeyFor(conditions, args, e.currentCacheFingerprint()) if cacheKey == e.searchCacheKey && e.searchCacheTable != "" { // Retry stats if a previous attempt failed (transient error). if e.searchCacheStats == nil { diff --git a/internal/query/duckdb_cache_drift_test.go b/internal/query/duckdb_cache_drift_test.go index 21503afa..c07fc23a 100644 --- a/internal/query/duckdb_cache_drift_test.go +++ b/internal/query/duckdb_cache_drift_test.go @@ -76,6 +76,52 @@ func TestDuckDBEngine_CacheRebuiltUnderneath(t *testing.T) { require.Len(t, agg, 1) } +func TestDuckDBEngine_SearchFastWithStatsRebuildsCacheWhenParquetChanges(t *testing.T) { + pb := newParquetBuilder(t). + addTable("messages", "messages/year=2024", "data.parquet", messagesCols, ` + (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1) + `). + addTable("sources", "sources", "sources.parquet", sourcesCols, `(1::BIGINT, 'test@gmail.com', 'gmail')`). + addTable("participants", "participants", "participants.parquet", participantsCols, `(1::BIGINT, 'alice@test.com', 'test.com', 'Alice', '')`). + addTable("message_recipients", "message_recipients", "message_recipients.parquet", messageRecipientsCols, `(1::BIGINT, 1::BIGINT, 'from', 'Alice')`). + addEmptyTable("labels", "labels", "labels.parquet", labelsCols, `(1::BIGINT, 'x')`). + addEmptyTable("message_labels", "message_labels", "message_labels.parquet", messageLabelsCols, `(1::BIGINT, 1::BIGINT)`). + addEmptyTable("attachments", "attachments", "attachments.parquet", attachmentsCols, `(1::BIGINT, 100::BIGINT, 'x')`). + addTable("conversations", "conversations", "conversations.parquet", conversationsCols, `(100::BIGINT, 'thread100', '')`) + + analyticsDir, cleanup := pb.build() + t.Cleanup(cleanup) + + engine, err := NewDuckDBEngine(analyticsDir, "", nil) + require.NoError(t, err, "NewDuckDBEngine") + t.Cleanup(func() { _ = engine.Close() }) + + ctx := context.Background() + q := search.Parse("SOFRA") + + first, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) + require.NoError(t, err, "SearchFastWithStats before rebuild") + require.Len(t, first.Messages, 1) + require.Equal(t, int64(1), first.TotalCount) + require.NotNil(t, first.Stats) + require.Equal(t, int64(1), first.Stats.MessageCount) + + msgPath := filepath.Join(analyticsDir, "messages", "year=2024", "data.parquet") + rewriteParquetForTest(t, msgPath, messagesCols, ` + (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1), + (2::BIGINT, 1::BIGINT, 'm2', 101::BIGINT, 'Another SOFRA', 'snip', TIMESTAMP '2024-01-16 10:00:00', 2000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1) + `) + + second, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) + require.NoError(t, err, "SearchFastWithStats after rebuild") + require.Len(t, second.Messages, 2) + assert.Equal(t, int64(2), second.TotalCount) + require.NotNil(t, second.Stats) + assert.Equal(t, int64(2), second.Stats.MessageCount) + assert.Equal(t, "Another SOFRA", second.Messages[0].Subject) + assert.Equal(t, "Hello SOFRA", second.Messages[1].Subject) +} + // rewriteParquetForTest overwrites an existing Parquet file with a new schema // and rows, simulating an out-of-band cache rebuild. func rewriteParquetForTest(t *testing.T, path, columns, values string) { diff --git a/internal/query/duckdb_test.go b/internal/query/duckdb_test.go index a10e428c..50c744a0 100644 --- a/internal/query/duckdb_test.go +++ b/internal/query/duckdb_test.go @@ -2987,6 +2987,8 @@ func TestSearchCacheKeyFor(t *testing.T) { args1 []any conds2 []string args2 []any + fp1 string + fp2 string wantEqual bool }{ { @@ -3045,12 +3047,30 @@ func TestSearchCacheKeyFor(t *testing.T) { args2: []any{`%"quoted"%`}, wantEqual: true, }, + { + name: "different fingerprints produce different keys", + conds1: []string{"x = ?"}, + args1: []any{"foo"}, + conds2: []string{"x = ?"}, + args2: []any{"foo"}, + fp1: "fp-before", + fp2: "fp-after", + wantEqual: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - key1 := searchCacheKeyFor(tt.conds1, tt.args1) - key2 := searchCacheKeyFor(tt.conds2, tt.args2) + fp1 := tt.fp1 + if fp1 == "" { + fp1 = "same-fingerprint" + } + fp2 := tt.fp2 + if fp2 == "" { + fp2 = "same-fingerprint" + } + key1 := searchCacheKeyFor(tt.conds1, tt.args1, fp1) + key2 := searchCacheKeyFor(tt.conds2, tt.args2, fp2) if tt.wantEqual { assertpkg.Equal(t, key1, key2, "expected equal keys") } else { From e673436173c890171e5f395bf4310f99ad78a6d0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 26 Jun 2026 18:44:05 -0500 Subject: [PATCH 3/4] fix(query): stabilize Parquet cache fingerprints Search cache invalidation now depends on the same Parquet directory set required for a complete analytics cache. Without the auxiliary tables in the fingerprint, attachment, label, recipient, or relationship rebuilds could leave SearchFastWithStats serving stale cached stats or rows for repeated searches.\n\nSchema probing now also records optional columns only from a stable fingerprint window. If build-cache rewrites files while a long-running engine is probing schemas, the probe retries instead of storing columns from one cache generation with the fingerprint from another.\n\nGenerated with Codex (GPT-5)\nCo-authored-by: Codex --- internal/query/duckdb.go | 65 +++++++------ internal/query/duckdb_cache_drift_test.go | 111 ++++++++++++++++++++++ 2 files changed, 148 insertions(+), 28 deletions(-) diff --git a/internal/query/duckdb.go b/internal/query/duckdb.go index f9fa2772..c04469e2 100644 --- a/internal/query/duckdb.go +++ b/internal/query/duckdb.go @@ -149,12 +149,9 @@ func NewDuckDBEngine(analyticsDir string, sqlitePath string, sqliteDB *sql.DB, o // Probe Parquet schemas for optional columns added in PR #160 (WhatsApp import). // Old cache files may lack these columns; we'll supply defaults in parquetCTEs(). - engine.optionalCols = map[string]map[string]bool{ - datasetParticipants: engine.probeParquetColumns(engine.parquetPath(datasetParticipants), false), - datasetMessages: engine.probeParquetColumns(engine.parquetGlob(), true), - datasetConversations: engine.probeParquetColumns(engine.parquetPath(datasetConversations), false), - "sources": engine.probeParquetColumns(engine.parquetPath("sources"), false), - } + engine.optionalCols, engine.cacheFP = stableOptionalColumns(engine.cacheFingerprint, func() map[string]map[string]bool { + return probeAllOptionalColumns(db, analyticsDir) + }) var missing []string for _, col := range []struct{ table, col string }{ {datasetParticipants, "phone_number"}, @@ -172,10 +169,6 @@ func NewDuckDBEngine(analyticsDir string, sqlitePath string, sqliteDB *sql.DB, o if len(missing) > 0 { log.Printf("[warn] Parquet cache missing columns %v — run 'msgvault build-cache --full-rebuild' to update", missing) } - // Record the cache fingerprint so ensureFreshOptionalCols can detect a - // later rebuild and re-probe instead of trusting this snapshot forever. - engine.cacheFP = engine.cacheFingerprint() - // Register SQL views over Parquet files for raw SQL access. // Pass the already-probed optionalCols to avoid a redundant schema probe. if err := RegisterViewsWithColumns(db, analyticsDir, engine.optionalCols); err != nil { @@ -255,14 +248,6 @@ func (e *DuckDBEngine) parquetPath(table string) string { return filepath.Join(e.analyticsDir, table, "*.parquet") } -// probeParquetColumns checks which columns exist in a Parquet table's files. -// Delegates to the standalone probeColumns in views.go. -func (e *DuckDBEngine) probeParquetColumns( - pathPattern string, hivePartitioning bool, -) map[string]bool { - return probeColumns(e.db, pathPattern, hivePartitioning) -} - // hasCol returns true if the named column exists in the Parquet schema for the given table. func (e *DuckDBEngine) hasCol(table, col string) bool { e.optColsMu.RLock() @@ -278,19 +263,14 @@ func (e *DuckDBEngine) hasCol(table, col string) bool { } // cacheFingerprint computes a cheap signature of the analytics Parquet cache. -// It combines, per probed table, the file count and each file's size and +// It combines, per required table, the file count and each file's size and // modification time. When build-cache or sync rewrites the cache underneath a // long-running process the fingerprint changes, letting the engine detect that -// its probed column set is stale. Pure filesystem stats — no DuckDB access. +// its probed column set and materialized search cache are stale. Pure +// filesystem stats — no DuckDB access. func (e *DuckDBEngine) cacheFingerprint() string { var b strings.Builder - globs := []string{ - filepath.Join(e.analyticsDir, datasetMessages, "*", "*.parquet"), - e.parquetPath(datasetParticipants), - e.parquetPath(datasetConversations), - e.parquetPath("sources"), - } - for _, g := range globs { + for _, g := range e.cacheFingerprintGlobs() { matches, _ := filepath.Glob(g) fmt.Fprintf(&b, "%s#%d|", g, len(matches)) for _, m := range matches { @@ -302,6 +282,33 @@ func (e *DuckDBEngine) cacheFingerprint() string { return b.String() } +func (e *DuckDBEngine) cacheFingerprintGlobs() []string { + globs := make([]string, 0, len(RequiredParquetDirs)) + for _, dir := range RequiredParquetDirs { + if dir == datasetMessages { + globs = append(globs, filepath.Join(e.analyticsDir, dir, "*", "*.parquet")) + continue + } + globs = append(globs, e.parquetPath(dir)) + } + return globs +} + +func stableOptionalColumns( + cacheFingerprint func() string, + probe func() map[string]map[string]bool, +) (map[string]map[string]bool, string) { + for { + before := cacheFingerprint() + cols := probe() + after := cacheFingerprint() + if before == after { + return cols, after + } + log.Printf("[info] analytics cache changed during Parquet schema probe — retrying") + } +} + // ensureFreshOptionalCols re-probes the Parquet schema (and re-registers the // SQL views) when the analytics cache has changed since the last probe. This // guards long-running engines (e.g. the mcp-http server) against a binder @@ -326,7 +333,9 @@ func (e *DuckDBEngine) ensureFreshOptionalCols() { return } - newCols := probeAllOptionalColumns(e.db, e.analyticsDir) + newCols, fp := stableOptionalColumns(e.cacheFingerprint, func() map[string]map[string]bool { + return probeAllOptionalColumns(e.db, e.analyticsDir) + }) e.optionalCols = newCols e.cacheFP = fp if err := RegisterViewsWithColumns(e.db, e.analyticsDir, newCols); err != nil { diff --git a/internal/query/duckdb_cache_drift_test.go b/internal/query/duckdb_cache_drift_test.go index c07fc23a..f4aa3ed0 100644 --- a/internal/query/duckdb_cache_drift_test.go +++ b/internal/query/duckdb_cache_drift_test.go @@ -3,8 +3,10 @@ package query import ( "context" "database/sql" + "os" "path/filepath" "testing" + "time" _ "github.com/duckdb/duckdb-go/v2" "github.com/stretchr/testify/assert" @@ -122,6 +124,90 @@ func TestDuckDBEngine_SearchFastWithStatsRebuildsCacheWhenParquetChanges(t *test assert.Equal(t, "Hello SOFRA", second.Messages[1].Subject) } +func TestDuckDBEngine_SearchFastWithStatsRebuildsStatsWhenAttachmentsChange(t *testing.T) { + pb := newParquetBuilder(t). + addTable("messages", "messages/year=2024", "data.parquet", messagesCols, ` + (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1) + `). + addTable("sources", "sources", "sources.parquet", sourcesCols, `(1::BIGINT, 'test@gmail.com', 'gmail')`). + addTable("participants", "participants", "participants.parquet", participantsCols, `(1::BIGINT, 'alice@test.com', 'test.com', 'Alice', '')`). + addTable("message_recipients", "message_recipients", "message_recipients.parquet", messageRecipientsCols, `(1::BIGINT, 1::BIGINT, 'from', 'Alice')`). + addEmptyTable("labels", "labels", "labels.parquet", labelsCols, `(1::BIGINT, 'x')`). + addEmptyTable("message_labels", "message_labels", "message_labels.parquet", messageLabelsCols, `(1::BIGINT, 1::BIGINT)`). + addEmptyTable("attachments", "attachments", "attachments.parquet", attachmentsCols, `(1::BIGINT, 100::BIGINT, 'x')`). + addTable("conversations", "conversations", "conversations.parquet", conversationsCols, `(100::BIGINT, 'thread100', '')`) + + analyticsDir, cleanup := pb.build() + t.Cleanup(cleanup) + + engine, err := NewDuckDBEngine(analyticsDir, "", nil) + require.NoError(t, err, "NewDuckDBEngine") + t.Cleanup(func() { _ = engine.Close() }) + + ctx := context.Background() + q := search.Parse("SOFRA") + + first, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) + require.NoError(t, err, "SearchFastWithStats before attachments rebuild") + require.NotNil(t, first.Stats) + require.Len(t, first.Messages, 1) + require.Equal(t, int64(0), first.Stats.AttachmentCount) + require.Equal(t, 0, first.Messages[0].AttachmentCount) + + attPath := filepath.Join(analyticsDir, "attachments", "attachments.parquet") + rewriteParquetForTest(t, attPath, attachmentsCols, `(1::BIGINT, 123::BIGINT, 'file.pdf')`) + + second, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) + require.NoError(t, err, "SearchFastWithStats after attachments rebuild") + require.NotNil(t, second.Stats) + require.Len(t, second.Messages, 1) + assert.Equal(t, int64(1), second.Stats.AttachmentCount) + assert.Equal(t, int64(123), second.Stats.AttachmentSize) + assert.Equal(t, 1, second.Messages[0].AttachmentCount) +} + +func TestDuckDBEngine_CacheFingerprintCoversRequiredParquetDirs(t *testing.T) { + analyticsDir, cleanup := buildStandardTestData(t).Build() + t.Cleanup(cleanup) + + engine, err := NewDuckDBEngine(analyticsDir, "", nil) + require.NoError(t, err, "NewDuckDBEngine") + t.Cleanup(func() { _ = engine.Close() }) + + for _, dir := range RequiredParquetDirs { + t.Run(dir, func(t *testing.T) { + before := engine.cacheFingerprint() + touchParquetForTest(t, firstRequiredParquetForTest(t, analyticsDir, dir)) + after := engine.cacheFingerprint() + assert.NotEqual(t, before, after, "fingerprint should include %s", dir) + }) + } +} + +func TestStableOptionalColumnsRetriesWhenFingerprintChanges(t *testing.T) { + staleCols := map[string]map[string]bool{datasetMessages: map[string]bool{"message_type": true}} + freshCols := map[string]map[string]bool{datasetMessages: map[string]bool{"message_type": false}} + fingerprints := []string{"before", "after", "after", "after"} + probeCalls := 0 + + cols, fp := stableOptionalColumns(func() string { + require.NotEmpty(t, fingerprints, "unexpected fingerprint call") + fp := fingerprints[0] + fingerprints = fingerprints[1:] + return fp + }, func() map[string]map[string]bool { + probeCalls++ + if probeCalls == 1 { + return staleCols + } + return freshCols + }) + + assert.Equal(t, 2, probeCalls) + assert.Equal(t, freshCols, cols) + assert.Equal(t, "after", fp) +} + // rewriteParquetForTest overwrites an existing Parquet file with a new schema // and rows, simulating an out-of-band cache rebuild. func rewriteParquetForTest(t *testing.T, path, columns, values string) { @@ -131,3 +217,28 @@ func rewriteParquetForTest(t *testing.T, path, columns, values string) { defer func() { _ = db.Close() }() writeTableParquet(t, db, escapePath(path), columns, values, false) } + +func firstRequiredParquetForTest(t *testing.T, analyticsDir, dir string) string { + t.Helper() + patterns := []string{filepath.Join(analyticsDir, dir, "*.parquet")} + if dir == datasetMessages { + patterns = append([]string{filepath.Join(analyticsDir, dir, "*", "*.parquet")}, patterns...) + } + for _, pattern := range patterns { + matches, err := filepath.Glob(pattern) + require.NoError(t, err, "glob parquet files") + if len(matches) > 0 { + return matches[0] + } + } + require.FailNow(t, "required parquet file not found", "dir %s", dir) + return "" +} + +func touchParquetForTest(t *testing.T, path string) { + t.Helper() + info, err := os.Stat(path) + require.NoError(t, err, "stat parquet file") + modTime := info.ModTime().Add(time.Second) + require.NoError(t, os.Chtimes(path, modTime, modTime), "touch parquet file") +} From d52c39e19a2fa5572bf145584cd4e71f6e87ca6e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 27 Jun 2026 13:17:06 -0500 Subject: [PATCH 4/4] test(query): use testify helpers in cache drift tests CI runs the custom testify-helper-check analyzer as part of lint-ci, while the local lint target only runs golangci-lint. The cache drift regression tests crossed the direct-package-call threshold, so the CI-only analyzer rejected them even though the tests themselves passed.\n\nUse local assert and require helpers in the assertion-heavy tests so the drift coverage follows the repository's testify style rule.\n\nGenerated with Codex (GPT-5)\nCo-authored-by: Codex --- internal/query/duckdb_cache_drift_test.go | 97 +++++++++++++---------- 1 file changed, 56 insertions(+), 41 deletions(-) diff --git a/internal/query/duckdb_cache_drift_test.go b/internal/query/duckdb_cache_drift_test.go index f4aa3ed0..62576be6 100644 --- a/internal/query/duckdb_cache_drift_test.go +++ b/internal/query/duckdb_cache_drift_test.go @@ -24,6 +24,9 @@ import ( // // The engine must detect the cache change and re-probe instead of crashing. func TestDuckDBEngine_CacheRebuiltUnderneath(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + // Pre-rebuild: current schema (message_type, sender_id, attachment_count present). const newMessagesCols = messagesCols // Post-rebuild: old schema written by a stale cache builder (no new columns). @@ -45,17 +48,17 @@ func TestDuckDBEngine_CacheRebuiltUnderneath(t *testing.T) { t.Cleanup(cleanup) engine, err := NewDuckDBEngine(analyticsDir, "", nil) - require.NoError(t, err, "NewDuckDBEngine") + require.NoError(err, "NewDuckDBEngine") t.Cleanup(func() { _ = engine.Close() }) ctx := context.Background() // Startup probe sees the new schema. - require.True(t, engine.hasCol("messages", "message_type"), + require.True(engine.hasCol("messages", "message_type"), "message_type should be detected as present in the initial schema") res, err := engine.SearchFast(ctx, search.Parse("SOFRA"), MessageFilter{}, 10, 0) - require.NoError(t, err, "SearchFast before rebuild") - require.Len(t, res, 1) + require.NoError(err, "SearchFast before rebuild") + require.Len(res, 1) // build-cache rewrites the messages Parquet with the OLD schema underneath // the running engine — message_type/sender_id/attachment_count disappear. @@ -66,19 +69,22 @@ func TestDuckDBEngine_CacheRebuiltUnderneath(t *testing.T) { // Must re-probe and succeed rather than fail with the REPLACE binder error. res, err = engine.SearchFast(ctx, search.Parse("SOFRA"), MessageFilter{}, 10, 0) - require.NoError(t, err, "SearchFast after cache rebuilt underneath engine") - require.Len(t, res, 1) - assert.Equal(t, "Hello SOFRA", res[0].Subject) - assert.False(t, engine.hasCol("messages", "message_type"), + require.NoError(err, "SearchFast after cache rebuilt underneath engine") + require.Len(res, 1) + assert.Equal("Hello SOFRA", res[0].Subject) + assert.False(engine.hasCol("messages", "message_type"), "message_type should be re-probed as absent after the rebuild") // Aggregate (the other reported-broken path) must also recover. agg, err := engine.Aggregate(ctx, ViewSenders, DefaultAggregateOptions()) - require.NoError(t, err, "Aggregate after cache rebuilt underneath engine") - require.Len(t, agg, 1) + require.NoError(err, "Aggregate after cache rebuilt underneath engine") + require.Len(agg, 1) } func TestDuckDBEngine_SearchFastWithStatsRebuildsCacheWhenParquetChanges(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + pb := newParquetBuilder(t). addTable("messages", "messages/year=2024", "data.parquet", messagesCols, ` (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1) @@ -95,18 +101,18 @@ func TestDuckDBEngine_SearchFastWithStatsRebuildsCacheWhenParquetChanges(t *test t.Cleanup(cleanup) engine, err := NewDuckDBEngine(analyticsDir, "", nil) - require.NoError(t, err, "NewDuckDBEngine") + require.NoError(err, "NewDuckDBEngine") t.Cleanup(func() { _ = engine.Close() }) ctx := context.Background() q := search.Parse("SOFRA") first, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) - require.NoError(t, err, "SearchFastWithStats before rebuild") - require.Len(t, first.Messages, 1) - require.Equal(t, int64(1), first.TotalCount) - require.NotNil(t, first.Stats) - require.Equal(t, int64(1), first.Stats.MessageCount) + require.NoError(err, "SearchFastWithStats before rebuild") + require.Len(first.Messages, 1) + require.Equal(int64(1), first.TotalCount) + require.NotNil(first.Stats) + require.Equal(int64(1), first.Stats.MessageCount) msgPath := filepath.Join(analyticsDir, "messages", "year=2024", "data.parquet") rewriteParquetForTest(t, msgPath, messagesCols, ` @@ -115,16 +121,19 @@ func TestDuckDBEngine_SearchFastWithStatsRebuildsCacheWhenParquetChanges(t *test `) second, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) - require.NoError(t, err, "SearchFastWithStats after rebuild") - require.Len(t, second.Messages, 2) - assert.Equal(t, int64(2), second.TotalCount) - require.NotNil(t, second.Stats) - assert.Equal(t, int64(2), second.Stats.MessageCount) - assert.Equal(t, "Another SOFRA", second.Messages[0].Subject) - assert.Equal(t, "Hello SOFRA", second.Messages[1].Subject) + require.NoError(err, "SearchFastWithStats after rebuild") + require.Len(second.Messages, 2) + assert.Equal(int64(2), second.TotalCount) + require.NotNil(second.Stats) + assert.Equal(int64(2), second.Stats.MessageCount) + assert.Equal("Another SOFRA", second.Messages[0].Subject) + assert.Equal("Hello SOFRA", second.Messages[1].Subject) } func TestDuckDBEngine_SearchFastWithStatsRebuildsStatsWhenAttachmentsChange(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + pb := newParquetBuilder(t). addTable("messages", "messages/year=2024", "data.parquet", messagesCols, ` (1::BIGINT, 1::BIGINT, 'm1', 100::BIGINT, 'Hello SOFRA', 'snip', TIMESTAMP '2024-01-15 10:00:00', 1000::BIGINT, false, 0::INTEGER, NULL::TIMESTAMP, NULL::BIGINT, 'email', 2024, 1) @@ -141,37 +150,40 @@ func TestDuckDBEngine_SearchFastWithStatsRebuildsStatsWhenAttachmentsChange(t *t t.Cleanup(cleanup) engine, err := NewDuckDBEngine(analyticsDir, "", nil) - require.NoError(t, err, "NewDuckDBEngine") + require.NoError(err, "NewDuckDBEngine") t.Cleanup(func() { _ = engine.Close() }) ctx := context.Background() q := search.Parse("SOFRA") first, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) - require.NoError(t, err, "SearchFastWithStats before attachments rebuild") - require.NotNil(t, first.Stats) - require.Len(t, first.Messages, 1) - require.Equal(t, int64(0), first.Stats.AttachmentCount) - require.Equal(t, 0, first.Messages[0].AttachmentCount) + require.NoError(err, "SearchFastWithStats before attachments rebuild") + require.NotNil(first.Stats) + require.Len(first.Messages, 1) + require.Equal(int64(0), first.Stats.AttachmentCount) + require.Equal(0, first.Messages[0].AttachmentCount) attPath := filepath.Join(analyticsDir, "attachments", "attachments.parquet") rewriteParquetForTest(t, attPath, attachmentsCols, `(1::BIGINT, 123::BIGINT, 'file.pdf')`) second, err := engine.SearchFastWithStats(ctx, q, "SOFRA", MessageFilter{}, ViewSenders, 10, 0) - require.NoError(t, err, "SearchFastWithStats after attachments rebuild") - require.NotNil(t, second.Stats) - require.Len(t, second.Messages, 1) - assert.Equal(t, int64(1), second.Stats.AttachmentCount) - assert.Equal(t, int64(123), second.Stats.AttachmentSize) - assert.Equal(t, 1, second.Messages[0].AttachmentCount) + require.NoError(err, "SearchFastWithStats after attachments rebuild") + require.NotNil(second.Stats) + require.Len(second.Messages, 1) + assert.Equal(int64(1), second.Stats.AttachmentCount) + assert.Equal(int64(123), second.Stats.AttachmentSize) + assert.Equal(1, second.Messages[0].AttachmentCount) } func TestDuckDBEngine_CacheFingerprintCoversRequiredParquetDirs(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + analyticsDir, cleanup := buildStandardTestData(t).Build() t.Cleanup(cleanup) engine, err := NewDuckDBEngine(analyticsDir, "", nil) - require.NoError(t, err, "NewDuckDBEngine") + require.NoError(err, "NewDuckDBEngine") t.Cleanup(func() { _ = engine.Close() }) for _, dir := range RequiredParquetDirs { @@ -179,19 +191,22 @@ func TestDuckDBEngine_CacheFingerprintCoversRequiredParquetDirs(t *testing.T) { before := engine.cacheFingerprint() touchParquetForTest(t, firstRequiredParquetForTest(t, analyticsDir, dir)) after := engine.cacheFingerprint() - assert.NotEqual(t, before, after, "fingerprint should include %s", dir) + assert.NotEqual(before, after, "fingerprint should include %s", dir) }) } } func TestStableOptionalColumnsRetriesWhenFingerprintChanges(t *testing.T) { + require := require.New(t) + assert := assert.New(t) + staleCols := map[string]map[string]bool{datasetMessages: map[string]bool{"message_type": true}} freshCols := map[string]map[string]bool{datasetMessages: map[string]bool{"message_type": false}} fingerprints := []string{"before", "after", "after", "after"} probeCalls := 0 cols, fp := stableOptionalColumns(func() string { - require.NotEmpty(t, fingerprints, "unexpected fingerprint call") + require.NotEmpty(fingerprints, "unexpected fingerprint call") fp := fingerprints[0] fingerprints = fingerprints[1:] return fp @@ -203,9 +218,9 @@ func TestStableOptionalColumnsRetriesWhenFingerprintChanges(t *testing.T) { return freshCols }) - assert.Equal(t, 2, probeCalls) - assert.Equal(t, freshCols, cols) - assert.Equal(t, "after", fp) + assert.Equal(2, probeCalls) + assert.Equal(freshCols, cols) + assert.Equal("after", fp) } // rewriteParquetForTest overwrites an existing Parquet file with a new schema