Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions recipes/thought-enrichment/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ Classifies each thought using an LLM and writes structured metadata back to Supa
node enrich-thoughts.mjs --apply --retry-failed
```

**Flags:** `--provider` (openrouter or anthropic), `--concurrency`, `--limit`, `--skip`, `--model`.
**Flags:** `--provider` (openrouter or anthropic), `--concurrency`, `--limit`, `--skip`, `--model`, `--max-calls`, `--reset-state`.

The `--max-calls` flag is a hard ceiling on the number of LLM calls per run. The default is `10000`; pass `--max-calls 0` to disable the cap. When the limit is hit the script aborts cleanly, prints a summary, and leaves remaining rows with `enriched=false` so you can resume later. This protects against a shell typo (e.g. dropping `--limit`) burning unbounded spend against a large un-enriched table.

**Resume.** The script checkpoints `lastProcessedId` to `data/enrichment-state.json` after each concurrency chunk. On startup, if a checkpoint exists and neither `--skip` nor `--reset-state` was passed, the run resumes from `id > lastProcessedId`. The `enriched=false` filter is still applied as a second layer of defense. Pass `--reset-state` to ignore the checkpoint and start from scratch.

### backfill-type.mjs -- Type canonicalization

Expand Down Expand Up @@ -81,9 +85,9 @@ Scans thought content for patterns matching SSNs, credit cards, API keys, passwo

2. Apply:

```bash
node backfill-sensitivity.mjs --apply
```
```bash
node backfill-sensitivity.mjs --apply
```

## Recommended execution order

Expand All @@ -92,6 +96,11 @@ Scans thought content for patterns matching SSNs, credit cards, API keys, passwo
3. Run `enrich-thoughts.mjs --dry-run --limit 20` to preview LLM classifications.
4. Run `enrich-thoughts.mjs --apply` to enrich all remaining thoughts.

## Security notes

- **Prompt injection:** thought content is wrapped in `<thought_content>` tags and the system prompt instructs the model to treat everything inside as untrusted data. Any literal tag occurrences in content are escaped. Output fields (`summary`, `topics`, `tags`, `people`, `action_items`) are length-capped and control-char-stripped before they are written to `metadata`. Even so, enriching hostile third-party imports (shared chat exports, scraped feeds) can still influence classification labels — review before trusting them as ground truth.
- **Bearer token on the wire:** every request carries your Supabase service-role key. Double-check that `SUPABASE_URL` points at your own Supabase project, not a proxy or debug server.

## Cost expectations

The default OpenRouter model is `openai/gpt-4o-mini` at roughly $0.001--0.002 per thought. For 1,000 thoughts, expect approximately $1--2. The `backfill-type` and `backfill-sensitivity` scripts are free (no LLM calls -- they use local logic only).
Expand Down
34 changes: 26 additions & 8 deletions recipes/thought-enrichment/backfill-sensitivity.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
import fs from "node:fs";
import path from "node:path";
import { fileURLToPath } from "node:url";
import {
fetchWithTimeout,
resolveTimeoutMs,
DEFAULT_SUPABASE_TIMEOUT_MS,
} from "./lib/memory-core.mjs";

const __dirname = path.dirname(fileURLToPath(import.meta.url));

const SUPABASE_TIMEOUT_MS = resolveTimeoutMs(process.env.FETCH_TIMEOUT_MS, DEFAULT_SUPABASE_TIMEOUT_MS);

// Load env from .env.local
const envPath = path.resolve(__dirname, ".env.local");
const envVars = {};
Expand Down Expand Up @@ -94,18 +101,24 @@ console.log(`Mode: ${dryRun ? "DRY RUN (no changes)" : "APPLY (will update DB)"}
console.log();

const BATCH_SIZE = 500;
let offset = 0;
const PROGRESS_EVERY = 5000;
let afterId = 0;
let scanned = 0;
let scannedAtLastProgress = 0;
let upgradedPersonal = 0;
let upgradedRestricted = 0;
let errors = 0;

// Cursor-based pagination on id. Offset-pagination is unsafe here:
// successful PATCHes shift the "where sensitivity_tier in
// (null,standard,'')" result set, so `offset += BATCH_SIZE` would skip
// un-processed rows. Cursor on id ASC is stable under mutation.
while (true) {
const url = `${BASE_URL}/thoughts?select=id,content,sensitivity_tier&or=(sensitivity_tier.is.null,sensitivity_tier.eq.standard,sensitivity_tier.eq.)&order=id&offset=${offset}&limit=${BATCH_SIZE}`;
const res = await fetch(url, { headers });
const url = `${BASE_URL}/thoughts?select=id,content,sensitivity_tier&or=(sensitivity_tier.is.null,sensitivity_tier.eq.standard,sensitivity_tier.eq.)&id=gt.${afterId}&order=id.asc&limit=${BATCH_SIZE}`;
const res = await fetchWithTimeout(url, { headers }, SUPABASE_TIMEOUT_MS);

if (!res.ok) {
console.error(`Query error at offset ${offset}: ${res.status} ${await res.text()}`);
console.error(`Query error after id ${afterId}: ${res.status} ${await res.text()}`);
errors++;
break;
}
Expand All @@ -123,11 +136,11 @@ while (true) {

if (apply) {
const updateUrl = `${BASE_URL}/thoughts?id=eq.${row.id}`;
const updateRes = await fetch(updateUrl, {
const updateRes = await fetchWithTimeout(updateUrl, {
method: "PATCH",
headers,
body: JSON.stringify({ sensitivity_tier: result.tier }),
});
}, SUPABASE_TIMEOUT_MS);

if (!updateRes.ok) {
console.error(` Failed to update thought #${row.id}: ${updateRes.status}`);
Expand All @@ -143,11 +156,16 @@ while (true) {
}
}

offset += data.length;
// Advance the cursor past the last id seen, regardless of whether
// any rows in this page were upgraded.
afterId = data[data.length - 1].id;
if (data.length < BATCH_SIZE) break;

if (offset % 5000 === 0) {
// Progress reporter independent of a multiple-of-offset check, so
// partial batches do not silently stop emitting progress.
if (Math.floor(scanned / PROGRESS_EVERY) > Math.floor(scannedAtLastProgress / PROGRESS_EVERY)) {
console.log(` ... scanned ${scanned} thoughts so far (${upgradedPersonal} personal, ${upgradedRestricted} restricted)`);
scannedAtLastProgress = scanned;
}
}

Expand Down
83 changes: 67 additions & 16 deletions recipes/thought-enrichment/backfill-type.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
import { readFileSync } from "fs";
import { fileURLToPath } from "url";
import { dirname, join } from "path";
import {
fetchWithTimeout,
resolveTimeoutMs,
DEFAULT_SUPABASE_TIMEOUT_MS,
} from "./lib/memory-core.mjs";

const __dirname = dirname(fileURLToPath(import.meta.url));

const SUPABASE_TIMEOUT_MS = resolveTimeoutMs(process.env.FETCH_TIMEOUT_MS, DEFAULT_SUPABASE_TIMEOUT_MS);

// Load env
function loadEnv() {
const envPath = join(__dirname, ".env.local");
Expand Down Expand Up @@ -69,10 +76,31 @@ async function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async function fetchBatch(offset, retries = 4) {
const url = `${BASE}/thoughts?select=id,metadata->>type&type=eq.reference&limit=${BATCH_SIZE}&offset=${offset}`;
// Cursor-based pagination on id. Offset pagination is unsafe here:
// every successful PATCH removes a row from the `type=eq.reference`
// filter, so `offset += rows.length` would skip unprocessed rows. The
// cursor pattern is id > afterId ORDER BY id ASC. `includeCount` is
// used once on the first call to populate the total for the progress
// bar — every subsequent call omits the count=exact header so
// PostgreSQL does not COUNT(*) the filtered set per page (LOW-7).
async function fetchBatch(afterId, { includeCount = false } = {}, retries = 4) {
const url = `${BASE}/thoughts?select=id,metadata->>type&type=eq.reference&id=gt.${afterId}&order=id.asc&limit=${BATCH_SIZE}`;
const batchHeaders = includeCount ? { ...headers, Prefer: "count=exact" } : { ...headers };
for (let attempt = 0; attempt <= retries; attempt++) {
const r = await fetch(url, { headers: { ...headers, Prefer: "count=exact" } });
let r;
try {
r = await fetchWithTimeout(url, { headers: batchHeaders }, SUPABASE_TIMEOUT_MS);
} catch (err) {
// Treat AbortError/timeouts and other network errors as transient.
const msg = err?.message || String(err);
if (attempt < retries) {
const delay = Math.min(1000 * Math.pow(2, attempt), 16000);
process.stderr.write(`\n[retry] fetch afterId ${afterId} ${msg.slice(0, 120)}, waiting ${delay}ms\n`);
await sleep(delay);
continue;
}
throw err;
}
if (r.ok) {
const contentRange = r.headers.get("content-range");
const total = contentRange ? parseInt(contentRange.split("/")[1], 10) : null;
Expand All @@ -83,22 +111,34 @@ async function fetchBatch(offset, retries = 4) {
const isTransient = r.status === 502 || r.status === 503 || r.status === 504 || r.status === 429;
if (isTransient && attempt < retries) {
const delay = Math.min(1000 * Math.pow(2, attempt), 16000);
process.stderr.write(`\n[retry] fetch offset ${offset} got ${r.status}, waiting ${delay}ms\n`);
process.stderr.write(`\n[retry] fetch afterId ${afterId} got ${r.status}, waiting ${delay}ms\n`);
await sleep(delay);
continue;
}
throw new Error(`Fetch failed at offset ${offset}: ${r.status} ${body.slice(0, 200)}`);
throw new Error(`Fetch failed after id ${afterId}: ${r.status} ${body.slice(0, 200)}`);
}
}

async function updateRow(id, newType, retries = 6) {
const url = `${BASE}/thoughts?id=eq.${id}`;
for (let attempt = 0; attempt <= retries; attempt++) {
const r = await fetch(url, {
method: "PATCH",
headers,
body: JSON.stringify({ type: newType }),
});
let r;
try {
r = await fetchWithTimeout(url, {
method: "PATCH",
headers,
body: JSON.stringify({ type: newType }),
}, SUPABASE_TIMEOUT_MS);
} catch (err) {
const msg = err?.message || String(err);
if (attempt < retries) {
const delay = Math.min(1000 * Math.pow(2, attempt), 16000);
process.stderr.write(`\n[retry] id ${id} ${msg.slice(0, 120)}, waiting ${delay}ms (attempt ${attempt + 1}/${retries})\n`);
await sleep(delay);
continue;
}
throw err;
}
if (r.ok) return;
const body = await r.text();
const isTransient = r.status === 502 || r.status === 503 || r.status === 504 || r.status === 429;
Expand Down Expand Up @@ -126,8 +166,14 @@ async function main() {
console.log(`Batch size: ${BATCH_SIZE}`);
console.log("");

let offset = 0;
// Cursor replaces offset. afterId starts at 0 (all thought ids are
// positive) and advances to the last id seen in each page, so a
// PATCH that removes rows from the `type=eq.reference` filter cannot
// cause the cursor to skip un-processed rows.
let afterId = 0;
let processedRows = 0;
let total = null;
let firstCountDone = false;
let totalUpdated = 0;
let totalSkippedInvalidType = 0;
let totalSkippedAlreadyCorrect = 0;
Expand All @@ -137,7 +183,10 @@ async function main() {
const typeDistribution = {};

while (true) {
const { rows, total: fetchedTotal } = await fetchBatch(offset);
const { rows, total: fetchedTotal } = await fetchBatch(afterId, {
includeCount: !firstCountDone,
});
firstCountDone = true;

if (total === null && fetchedTotal !== null) {
total = fetchedTotal;
Expand Down Expand Up @@ -179,18 +228,20 @@ async function main() {
totalUpdated += updates.length;
}

offset += rows.length;
processedRows += rows.length;
// Advance cursor past the highest id seen (rows are ordered by id ASC).
afterId = rows[rows.length - 1].id;

const pct = total ? ((offset / total) * 100).toFixed(1) : "?";
process.stdout.write(`\rProgress: ${offset}/${total ?? "?"} (${pct}%) — updated so far: ${totalUpdated}`);
const pct = total ? ((processedRows / total) * 100).toFixed(1) : "?";
process.stdout.write(`\rProgress: ${processedRows}/${total ?? "?"} (${pct}%) — updated so far: ${totalUpdated}`);

if (rows.length < BATCH_SIZE) break;
}

console.log("\n");
console.log("=== BACKFILL COMPLETE ===");
console.log("");
console.log(`Rows processed: ${offset}`);
console.log(`Rows processed: ${processedRows}`);
console.log(`Rows updated: ${totalUpdated}${DRY_RUN ? " (dry run, not written)" : ""}`);
console.log(`Skipped (already reference): ${totalSkippedAlreadyCorrect}`);
console.log(`Skipped (null/empty type): ${totalSkippedNullType}`);
Expand Down
Loading
Loading