Skip to content

[schemas] Thought work claims for parallel workers#361

Open
alanshurafa wants to merge 2 commits into
NateBJones-Projects:mainfrom
alanshurafa:contrib/alanshurafa/thought-work-claims
Open

[schemas] Thought work claims for parallel workers#361
alanshurafa wants to merge 2 commits into
NateBJones-Projects:mainfrom
alanshurafa:contrib/alanshurafa/thought-work-claims

Conversation

@alanshurafa

Copy link
Copy Markdown
Collaborator

What this adds

A small coordination layer so several workers can process the same pool of thoughts at once without stepping on each other. New folder: schemas/thought-work-claims/.

It's the missing piece for any "fan this batch out across N processes" job — enrichment passes, embedding backfills, scoring, consolidation. Run one worker and it's slow; run five and, without coordination, they pick overlapping rows and do the same work twice.

How it works

  • public.thought_work_claims — one row per (thought_id, work_type). A row means "a worker owns this thought for this kind of work right now."
  • claim_thoughts(ids, work_type, worker_id, ttl) — a worker hands in candidate ids and gets back only the subset it actually won.
  • release_thought(...) — mark one claim succeeded/failed when done.
  • release_claims_for_worker(...) — on clean shutdown, drop a worker's open claims so others reclaim them immediately.

Why no row locks are needed. Exclusivity comes from the (thought_id, work_type) primary key plus INSERT ... ON CONFLICT DO NOTHING, returning only inserted rows. When two workers submit the same id at the same moment, Postgres lets exactly one insert win and the other conflicts out silently — the loser just sees that id missing from its result. This is the right primitive when workers supply specific ids (as opposed to draining an unknown queue, where FOR UPDATE SKIP LOCKED is the tool). No advisory locks, no SELECT ... FOR UPDATE.

Crash recovery via TTL. A worker can die mid-batch and never release. Each claim carries ttl_expires_at; claim_thoughts reaps inline on every call, deleting still-claimed rows whose TTL has passed before it inserts, so a dead worker's thoughts become claimable again. Terminal succeeded/failed rows are kept as a record and are not reaped.

Notes for review

  • UUID id contract. thought_id UUID REFERENCES public.thoughts(id) ON DELETE CASCADE, and the RPC arrays are UUID[], matching the canonical Open Brain thoughts.id. The README documents the one-line change for a non-canonical BIGINT install.
  • Additive and idempotent. CREATE TABLE IF NOT EXISTS, CREATE INDEX IF NOT EXISTS, CREATE OR REPLACE FUNCTION. Nothing on public.thoughts is altered or dropped. Every DELETE is WHERE-qualified.
  • Locked down. RLS on, table and RPCs granted to service_role only — no anon/authenticated access. Functions are SECURITY INVOKER.
  • README has Prerequisites, numbered apply steps, Expected outcome, a complete dependency-free Node worker example (config entirely via env vars), and inspect/clear/rollback SQL.

Testing

Applied the schema to a throwaway Postgres and exercised it: two workers claiming the same five ids produce a 5/0 split with zero overlap; a different work_type co-claims the same thoughts; release_thought succeeds for the holder and returns false for a non-holder; expiring TTLs lets a third worker reclaim; bulk release frees open claims; an invalid release status raises; re-applying the schema is a no-op; and deleting a thought cascades its claims away.

alanshurafa and others added 2 commits June 13, 2026 18:53
Parallel workers need a way to divide a pool of thoughts without two of
them grabbing the same one. This adds a claim table whose (thought_id,
work_type) primary key makes each claim exclusive via INSERT ON CONFLICT
DO NOTHING, plus a per-claim TTL that reaps stale claims inline so a
crashed worker's work is recoverable. Thought ids are UUID to match
public.thoughts.id; the RPCs are SECURITY INVOKER and service-role only.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Fix the bundled worker example and harden grants per the fork PR #39
Codex findings:

- findCandidates now excludes thoughts already in thought_work_claims
  for the work_type (PostgREST embedding filtered to thought_id is null),
  so the loop pages past terminal rows instead of backing off forever.
- Default WORKER_ID is now globally unique (hostname + pid + random
  suffix); a bare PID collides across containers and lets one worker's
  shutdown delete another's in-flight claims.
- Document TTL-vs-batch sizing: a whole batch shares one ttl_expires_at
  but is processed serially, so set TTL >= batch hold time or shrink
  CLAIM_SIZE to avoid mid-batch reaping and duplicate processing.
- Explicitly REVOKE ALL on the table and all three functions from anon
  and authenticated, matching the hardened siblings; validated on an
  ephemeral Postgres 18 that neither role retains table or EXECUTE
  access and the script still applies cleanly and idempotently.

Also add the "More from Nate" provenance CTA per AGENTS.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the schema Contribution: database extension label Jun 14, 2026

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bc733673b6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

const params = new URLSearchParams({
select: "id,claims:thought_work_claims(thought_id)",
"claims.work_type": `eq.${WORK_TYPE}`,
"claims.thought_id": "is.null", // keep only thoughts with NO claim row

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use a resource-level anti-join for candidates

PostgREST treats claims.thought_id=is.null as a filter on rows inside the embedded claims array, not as a top-level anti-join; because thought_work_claims.thought_id is NOT NULL, the embed is empty while the parent thought rows still come back. In the copied worker, once the newest page has active or terminal claims, it keeps submitting the same ids, claim_thoughts returns nothing, and the loop backs off forever instead of reaching older unclaimed thoughts. Use the embedded resource filter itself, e.g. an empty claims:thought_work_claims() embed plus claims=is.null with the claims.work_type filter, or an equivalent NOT EXISTS query.

Useful? React with 👍 / 👎.


while (!shuttingDown) {
const candidates = await findCandidates(CLAIM_SIZE);
if (candidates.length === 0) break;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reap stale claims before exiting an empty batch

With the documented candidate query excluding any row that already has a thought_work_claims record, expired claimed rows from a crashed worker are not returned as candidates. If those stale rows are the only remaining work, this early break exits before claim_thoughts can run its inline TTL reaper, so the crashed worker's thoughts stay stranded until someone manually calls the RPC. Trigger claim_thoughts with an empty batch (or otherwise run the reaper) before deciding there is no work left.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

schema Contribution: database extension

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant