Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 126 additions & 24 deletions internal/query/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"strings"
Expand Down Expand Up @@ -46,7 +47,14 @@ type DuckDBEngine struct {
// Used to gracefully handle stale cache files that lack newer columns
// (e.g. phone_number, attachment_count, sender_id, message_type added in PR #160).
// Map: table_name -> column_name -> exists_in_parquet
//
// Guarded by optColsMu because a long-running server (e.g. mcp-http) may
// have the analytics cache rebuilt underneath it by build-cache/sync. When
// that happens the column set can change, so optionalCols is re-probed on
// demand (see ensureFreshOptionalCols) rather than only at construction.
optColsMu sync.RWMutex
optionalCols map[string]map[string]bool
cacheFP string // fingerprint of the Parquet cache at last probe

// Search result cache: keeps the materialized temp table alive across
// pagination calls for the same search query, avoiding repeated Parquet scans.
Expand Down Expand Up @@ -141,12 +149,9 @@ func NewDuckDBEngine(analyticsDir string, sqlitePath string, sqliteDB *sql.DB, o

// Probe Parquet schemas for optional columns added in PR #160 (WhatsApp import).
// Old cache files may lack these columns; we'll supply defaults in parquetCTEs().
engine.optionalCols = map[string]map[string]bool{
datasetParticipants: engine.probeParquetColumns(engine.parquetPath(datasetParticipants), false),
datasetMessages: engine.probeParquetColumns(engine.parquetGlob(), true),
datasetConversations: engine.probeParquetColumns(engine.parquetPath(datasetConversations), false),
"sources": engine.probeParquetColumns(engine.parquetPath("sources"), false),
}
engine.optionalCols, engine.cacheFP = stableOptionalColumns(engine.cacheFingerprint, func() map[string]map[string]bool {
return probeAllOptionalColumns(db, analyticsDir)
})
var missing []string
for _, col := range []struct{ table, col string }{
{datasetParticipants, "phone_number"},
Expand All @@ -164,7 +169,6 @@ func NewDuckDBEngine(analyticsDir string, sqlitePath string, sqliteDB *sql.DB, o
if len(missing) > 0 {
log.Printf("[warn] Parquet cache missing columns %v — run 'msgvault build-cache --full-rebuild' to update", missing)
}

// Register SQL views over Parquet files for raw SQL access.
// Pass the already-probed optionalCols to avoid a redundant schema probe.
if err := RegisterViewsWithColumns(db, analyticsDir, engine.optionalCols); err != nil {
Expand All @@ -189,6 +193,10 @@ func (e *DuckDBEngine) Close() error {
func (e *DuckDBEngine) QuerySQL(
ctx context.Context, sqlStr string,
) (*QueryResult, error) {
// Refresh views if the cache changed since startup, so the registered
// views match the current Parquet schema.
e.ensureFreshOptionalCols()

rows, err := e.db.QueryContext(ctx, sqlStr)
if err != nil {
return nil, fmt.Errorf("execute query: %w", err)
Expand Down Expand Up @@ -240,16 +248,10 @@ func (e *DuckDBEngine) parquetPath(table string) string {
return filepath.Join(e.analyticsDir, table, "*.parquet")
}

// probeParquetColumns checks which columns exist in a Parquet table's files.
// Delegates to the standalone probeColumns in views.go.
func (e *DuckDBEngine) probeParquetColumns(
pathPattern string, hivePartitioning bool,
) map[string]bool {
return probeColumns(e.db, pathPattern, hivePartitioning)
}

// hasCol returns true if the named column exists in the Parquet schema for the given table.
func (e *DuckDBEngine) hasCol(table, col string) bool {
e.optColsMu.RLock()
defer e.optColsMu.RUnlock()
if e.optionalCols == nil {
return true // no probe data — assume present (backwards compatible)
}
Expand All @@ -260,6 +262,94 @@ func (e *DuckDBEngine) hasCol(table, col string) bool {
return tbl[col]
}

// cacheFingerprint computes a cheap signature of the analytics Parquet cache.
// It combines, per required table, the file count and each file's size and
// modification time. When build-cache or sync rewrites the cache underneath a
// long-running process the fingerprint changes, letting the engine detect that
// its probed column set and materialized search cache are stale. Pure
// filesystem stats — no DuckDB access.
func (e *DuckDBEngine) cacheFingerprint() string {
var b strings.Builder
for _, g := range e.cacheFingerprintGlobs() {
matches, _ := filepath.Glob(g)
fmt.Fprintf(&b, "%s#%d|", g, len(matches))
for _, m := range matches {
if fi, err := os.Stat(m); err == nil {
fmt.Fprintf(&b, "%s=%d,%d;", m, fi.Size(), fi.ModTime().UnixNano())
}
}
}
return b.String()
}

func (e *DuckDBEngine) cacheFingerprintGlobs() []string {
globs := make([]string, 0, len(RequiredParquetDirs))
for _, dir := range RequiredParquetDirs {
if dir == datasetMessages {
globs = append(globs, filepath.Join(e.analyticsDir, dir, "*", "*.parquet"))
continue
}
globs = append(globs, e.parquetPath(dir))
}
return globs
}

func stableOptionalColumns(
cacheFingerprint func() string,
probe func() map[string]map[string]bool,
) (map[string]map[string]bool, string) {
for {
before := cacheFingerprint()
cols := probe()
after := cacheFingerprint()
if before == after {
return cols, after
}
log.Printf("[info] analytics cache changed during Parquet schema probe — retrying")
}
}

// ensureFreshOptionalCols re-probes the Parquet schema (and re-registers the
// SQL views) when the analytics cache has changed since the last probe. This
// guards long-running engines (e.g. the mcp-http server) against a binder
// error when build-cache rewrites the cache with a different column set:
// without it, a stale "column present" verdict puts a now-absent column into a
// SELECT * REPLACE list, which DuckDB rejects with
// "Column ... in REPLACE list not found in FROM clause". Cheap on the common
// no-change path (a handful of os.Stat calls).
func (e *DuckDBEngine) ensureFreshOptionalCols() {
fp := e.cacheFingerprint()

e.optColsMu.RLock()
unchanged := fp == e.cacheFP
e.optColsMu.RUnlock()
if unchanged {
return
}

e.optColsMu.Lock()
defer e.optColsMu.Unlock()
if fp == e.cacheFP { // another goroutine refreshed while we waited
return
}

newCols, fp := stableOptionalColumns(e.cacheFingerprint, func() map[string]map[string]bool {
return probeAllOptionalColumns(e.db, e.analyticsDir)
})
e.optionalCols = newCols
e.cacheFP = fp
if err := RegisterViewsWithColumns(e.db, e.analyticsDir, newCols); err != nil {
log.Printf("[warn] re-register views after analytics cache change: %v", err)
}
log.Printf("[info] analytics cache changed — re-probed Parquet optional columns")
}

func (e *DuckDBEngine) currentCacheFingerprint() string {
e.optColsMu.RLock()
defer e.optColsMu.RUnlock()
return e.cacheFP
}

// parquetCTEs returns common CTEs for reading all Parquet tables.
// This is used by aggregate queries that need to join across tables.
// parquetCTEs returns the WITH clause body that defines CTEs for all Parquet
Expand All @@ -272,6 +362,10 @@ func (e *DuckDBEngine) hasCol(table, col string) bool {
// are handled gracefully: if the Parquet file predates their addition, they
// are synthesised with sensible defaults instead of causing a binder error.
func (e *DuckDBEngine) parquetCTEs() string {
// Re-probe if build-cache/sync rewrote the cache underneath us, so the
// REPLACE list below never references a column the current Parquet lacks.
e.ensureFreshOptionalCols()

// --- messages CTE ---
msgReplace := []string{
"CAST(id AS BIGINT) AS id",
Expand Down Expand Up @@ -2158,20 +2252,22 @@ func (e *DuckDBEngine) SearchFastCount(ctx context.Context, q *search.Query, fil
return count, nil
}

// searchCacheKeyFor builds a deterministic cache key from search conditions and args.
// Same query+filter always produces the same key. Uses JSON encoding to avoid
// searchCacheKeyFor builds a deterministic cache key from search conditions,
// args, and the Parquet cache fingerprint. Same query+filter over the same
// analytics cache always produces the same key. Uses JSON encoding to avoid
// ambiguity from delimiter collisions (e.g. args containing commas or pipes).
func searchCacheKeyFor(conditions []string, args []any) string {
func searchCacheKeyFor(conditions []string, args []any, cacheFP string) string {
// JSON marshaling is unambiguous: each element is quoted/escaped independently.
// Errors are impossible for string/int/float/bool args, but fall back to fmt.
key := struct {
C []string `json:"c"`
A []any `json:"a"`
}{conditions, args}
C []string `json:"c"`
A []any `json:"a"`
FP string `json:"fp"`
}{conditions, args, cacheFP}
b, err := json.Marshal(key)
if err != nil {
// Fallback: should never happen with the types buildSearchConditions produces.
return fmt.Sprintf("%v#%v", conditions, args)
return fmt.Sprintf("%v#%v#%s", conditions, args, cacheFP)
}
return string(b)
}
Expand Down Expand Up @@ -2362,8 +2458,14 @@ func (e *DuckDBEngine) SearchFastWithStats(ctx context.Context, q *search.Query,
e.searchCacheMu.Lock()
defer e.searchCacheMu.Unlock()

// Check cache: same conditions+args means same search, serve from cached table.
cacheKey := searchCacheKeyFor(conditions, args)
// Refresh before checking the search cache. A cache-hit page query may call
// parquetCTEs(), but that is too late to decide whether the already
// materialized temp table still represents the current Parquet data.
e.ensureFreshOptionalCols()

// Check cache: same conditions+args+Parquet fingerprint means same search,
// serve from cached table.
cacheKey := searchCacheKeyFor(conditions, args, e.currentCacheFingerprint())
if cacheKey == e.searchCacheKey && e.searchCacheTable != "" {
// Retry stats if a previous attempt failed (transient error).
if e.searchCacheStats == nil {
Expand Down
Loading