Fix duplicated extract jobs on repeated fanout runs#358
Fix duplicated extract jobs on repeated fanout runs#358matheus1lva wants to merge 3 commits intomainfrom
Conversation
Fanout runs that overlap with in-flight extract jobs create duplicate work because strides/outputs are only updated after load completes. Adding deterministic jobIds (based on chainId, address, block range) causes BullMQ to skip adding a job if one with the same ID already exists in the queue.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
murderteeth
left a comment
There was a problem hiding this comment.
The first commit (5ea968a) correctly adds deterministic jobIds at the call site in events.ts and timeseries.ts. However, b10e347 ("add to add()") moves the jobId generation into the shared mq.add() function, which introduces two high-severity regressions:
1. Silent job dropping across the entire system
packages/lib/mq.ts:98 now synthesizes a jobId for every mq.add() call, but most producers don't supply the fields used in the template (chainId, address, from, to, blockTime). This turns a targeted extract-job dedupe into repo-wide silent dropping of unrelated jobs. For example, packages/ingest/extract/timeseries.ts:33 enqueues load.output with only { batch }, so every one of those jobs gets the same ID (output-undefined-undefined-...) and BullMQ keeps only one. Similarly, packages/ingest/abis/yearn/3/vault/event/StrategyChanged/hook.ts enqueues distinct load.thing jobs (strategy vs vault) for the same address, but the default ID ignores label, so one will be skipped.
2. Timeseries fix regression
b10e347 also regresses the timeseries fix from 5ea968a. The fanout path at packages/ingest/fanout/timeseries.ts:44 no longer passes an explicit jobId containing outputLabel, and the generic fallback in mq.ts:98 omits it too. Multiple timeseries hooks for the same (chainId, address, blockTime) — e.g. apy and tvl — will collide, and only one will be enqueued per cycle.
Suggested fix: Revert b10e347 (git revert b10e347). The first commit 5ea968a already had the correct approach — targeted jobIds at the call site with all relevant discriminators included.
This reverts commit b10e347.
reverted. |
Summary
Extract jobs (evmlog and timeseries) were being duplicated every fanout cycle because
mq.add()used auto-generated BullMQ job IDs. When fanout runs before previous extract jobs complete and update strides/outputs, it sees the same gaps and enqueues identical work. This adds deterministicjobIds based on the job's unique key (chainId + address + block range or blockTime), so BullMQ skips adding a job if one with the same ID is already in the queue.https://docs.bullmq.io/guide/jobs/job-ids - showcases how and when to use it.
How to review
Two small changes in
packages/ingest/fanout/:events.ts— addsjobId: evmlog-{chainId}-{address}-{from}-{to}toextract.evmlogjobstimeseries.ts— addsjobId: timeseries-{chainId}-{address}-{outputLabel}-{blockTime}toextract.timeseriesjobsKey design decision: parent fanout jobs in
abis.tsintentionally do not get jobIds — they must re-run each cycle to discover new work. Only leaf extract jobs are deduplicated.BullMQ behavior: when a job with the same
jobIdexists in any state (waiting/active/completed/failed),add()returns the existing job without creating a duplicate. Jobs are cleaned up viaremoveOnComplete: { count: 100, age: 3600 }.Test plan
Expected: 0 errors (warnings are pre-existing)
Expected: jobs dispatched without errors
Expected: second run should NOT create duplicate extract jobs for the same block ranges. Verify via Sentry metrics dashboard —
mq.job_addedcounts should not double for identicalabiPath:chainId:address:fromBlock:toBlocktuples.Expected: no more duplicated entries with same abi:chain_id:address:block across consecutive fanout runs
Risk / impact
jobIdparameter to existingmq.add()calls. No changes to job data, processing logic, or queue configuration.removeOnComplete) and the same block range is somehow re-planned, it will be silently skipped. This is safe because strides/outputs update on successful load, so the range won't be re-planned.