feat(ledger): PgLedger sqlx impl + engine wiring + bounded writer (PR-1 of #95)#119
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Pablosinyores
left a comment
There was a problem hiding this comment.
Review — feat(ledger): PgLedger sqlx impl + engine wiring + bounded writer (PR-1 of #95)
Solid PR-1. Hot-path non-blocking design is right (try_send → drop with metric, never await), the metric surface is appropriate (counter for outcome + drops, gauge for queue depth, histogram for write latency), ledger_from_env falls back to NoopLedger cleanly so dev/CI behaviour is preserved, and the existing engine tests still pass through the new_with_metrics → new_with_metrics_and_ledger constructor chain. Verified locally: cargo build --workspace clean, cargo test -p aether-common --lib db:: 5/5 pass (incl. new u256_to_decimal_max, protocol_label_matches_serde_tag, ledger_metrics_register_round_trips), cargo clippy -p aether-common -p aether-grpc-server --all-targets -- -D warnings clean. CI all green; mergeable CLEAN.
Two real cross-cutting issues warrant a fix on this PR rather than slipping to PR-2/PR-3, plus a handful of forward-looking concerns.
Should Fix Before Merge (WARNING)
1. arbs.protocols JSONB written via format!("{:?}", h.protocol) while pool_registry.protocol uses the pinned serde label — crates/grpc-server/src/engine.rs:199
The two paths agree by coincidence today: Rust's Debug derive and the serde tag both emit UniswapV2. They will silently diverge the moment anyone:
- adds
#[serde(rename = "...")]toProtocolType, - hand-impls
Debug(new fields, new variants), - renames a variant.
The test at crates/common/src/db.rs:551-567 pins protocol_label to the serde tag, so the divergence would land in arbs.protocols only — pool_registry.protocol and arbs.protocols would disagree on the same protocol's name, breaking any cross-table join or aggregation. Fix: replace format!("{:?}", h.protocol) with protocol_label(h.protocol) (export it from aether_common::db or make a public helper). Single source of truth for the on-disk name.
2. Clock-authority policy violation — crates/common/src/db.rs:381-419 and :461-485
The schema header at migrations/0001_trade_ledger.sql:11-22 (policy you wrote in #115) declares:
Event-time columns (
arbs.ts,bundles.submitted_at,inclusion_results.resolved_at) are CLIENT-SET. Writers MUST populate the value at the moment the event occurs in code — DEFAULT now() exists only as a safety net for ad-hoc inserts via psql / migrations and must not be relied on by application paths.
But:
insert_arb_inneratdb.rs:381-419does NOT bindts. The INSERT column list omits it, so every row gets DB clock.update_inclusion_inneratdb.rs:461-485doesn't bindresolved_aton insert (DB clock) and explicitly setsresolved_at = now()on conflict (line 473) — DB clock both branches.- The data model itself has no carrier:
NewArb(db.rs:51-69) has notsfield;InclusionUpdate(db.rs:92-100) has noresolved_atfield.
The skew is bounded by channel depth + pool acquire timeout (≤5 s worst case), so this isn't immediately catastrophic, but it directly contradicts a policy this PR's author wrote one day ago. PR-2 ships the Go side of the same inclusion_results table; if Rust writes DB-clock and Go writes client-clock, the same column has inconsistent authority across producers — exactly the integration hazard the policy was meant to prevent. Fix on this PR: add ts: DateTime<Utc> to NewArb (capture in build_new_arb before enqueue), add resolved_at: DateTime<Utc> to InclusionUpdate, bind in both *_inner queries.
3. arb_id (fresh Uuid::new_v4()) is disconnected from the log line — crates/grpc-server/src/engine.rs:182, 1440
engine.rs:1440 logs id = %input.opp.id (a String). engine.rs:215 then mints arb_id: Uuid::new_v4() for the DB row. Different ID spaces. An operator who sees Published validated arb id=foo-123 in logs cannot grep SELECT * FROM arbs WHERE arb_id = ... to find the row.
This was raised on the #115 review and the author noted it would be addressed when wiring landed — this is the wiring PR. Two clean options:
- (a) Derive
arb_idfromopp.id(parse if UUID-shaped, else hash to bytes andUuid::from_bytes), - (b) Emit
arb_idas a structured log field on theARB PUBLISHEDline so a single grep links the two ID spaces.
(b) is the smaller change and keeps Uuid::new_v4() as the source of truth for the row.
Performance / Forward-looking (SUGGESTION)
4. Sequential writer under-utilizes the 8-connection pool — crates/common/src/db.rs:319-363
PgPoolOptions::new().max_connections(8) configures 8 connections, but the writer task does while let Some(op) = rx.recv().await { ... .execute(&pool).await } — one in-flight query at a time. Effective concurrency = 1, not 8. At the PR's claimed 200 arbs/s peak, each insert must complete in <5 ms; on a co-located Postgres that's plausible but on any network hop it leaves no headroom. A FuturesUnordered drainer or tokio::spawn-per-op bounded by a semaphore matching pool size brings effective concurrency to 8 with no ordering loss (every op is independent / ON CONFLICT-idempotent). Fail-safe today (drop on Full + metric), but the 8-conn pool is currently dead weight.
5. Channel capacity vs claimed peak — minor numeric drift
LEDGER_CHANNEL_CAPACITY = 1024 (db.rs:44); PR body says "~3 s of bursty writes at 200 arbs/s peak". 1024/200 = 5.12 s, not 3 s. Either fix the comment or the constant. Also: 200 arbs/s is asserted but not derived from anything in the codebase — a footnote pointing at the source (CLAUDE.md targets ~500 candidates/block but post-sim emission is much smaller) would help future tuning.
6. update_inclusion is on the trait but never called from this engine — db.rs:311-313
Dead code path at the end of PR-1. PR body says it's "for future Rust-side reconciliation backfill (Go owns inclusion writes)". Fine, but a future maintainer reading engine.rs will rg update_inclusion( and find zero call sites — leave a doc comment on the trait method noting that the engine doesn't drive it today.
7. u256_to_decimal silent-zero fallback — db.rs:509-511
BigDecimal::from_str(&v.to_string()).unwrap_or_else(|_| BigDecimal::from(0)) — U256::to_string() always produces a valid decimal so the branch is unreachable, but if it ever fires the writer silently records zero profit. Use .expect("U256::to_string always produces valid decimal") so the impossible case is loud, not silent.
8. Uuid::new_v4() per ARB PUBLISHED — small but worth noting
getrandom syscall on Linux per call. At 200/s this is irrelevant, but if the v4 RNG ever switches to OsRng ceremony (entropy pool init per call) the cost shifts. Worth pinning the uuid crate features explicitly; today the workspace Cargo.toml should set uuid = { version = "...", features = ["v4", "serde"] } and not rely on defaults.
9. Boot-time connect can stall startup for 5 s — db.rs:249-257
If Postgres is reachable but slow at boot, connect() blocks the main thread up to acquire_timeout(5s) before ledger_from_env falls back. Operator who set DATABASE_URL to a wrong host pays 5 s of stall on every restart. Either reduce the connect timeout to ~1 s and rely on the writer task's later acquire timeout, or background the connect and start with NoopLedger immediately, swap once ready.
What's Good
- Hot path on
insert_arbisBox::new(arb.clone()) + try_send. No await, no panic, predictable cost. enqueuebumps the right metric in all three branches (Ok, Full, Closed) and logs Closed at debug only (shutdown, expected).LedgerMetrics::registerregisters againstEngineMetrics::registry()(metrics.rs:148) — single/metricsendpoint, no second scrape, exactly the right move.- Histogram buckets
[0.5, 1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0]are reasonable for write-latency tail visibility. - ON CONFLICT clauses (
db.rs:398, 436-437, 468-474) are idempotent —arbsDO NOTHING,pool_registryupsertslast_seen,inclusion_resultsupserts every column. Re-runs of the same op don't produce duplicate state. i64::try_from(...).unwrap_or(i64::MAX)saturating instead of panicking is the right tradeoff for a writer that must never take down the engine. Same foru8::try_fromon hops.ledger_from_envcovers all three failure modes (var unset, var empty, connect error) with NoopLedger fallback. Engine genuinely runnable without Postgres.- 3 new unit tests cover the boundaries that matter most: U256 round-trip on
MAX,protocol_labelpinned to serde tag, all four metric families register and accept the expected label combinations. uuid_compat::Uuidretired; the byte-order contract documented in #115 holds (uuid::Uuid::as_bytesis RFC 4122 network byte order, drop-in for the placeholder's spec).- Scope discipline:
gh pr diff 119 --name-onlyreturns exactly 7 files, all coherent. - Both new commits follow conventional format, no AI attribution.
Verdict
REQUEST CHANGES — gated on items 1, 2, 3 (Debug-vs-serde divergence, clock-authority policy violation, and the log↔DB ID correlation gap). All three are cheap to fix on this PR and addressing them now prevents either silent data drift (1, 2) or a forensic gap (3) from being baked into a column shape that ships to production. The performance/forward-looking items can land in PR-2 or PR-3.
…relation Three reviewer-blockers from PR #119 round 1: 1. arbs.protocols was bound via format!("{:?}", h.protocol) while pool_registry.protocol used the pinned protocol_label. Renaming a ProtocolType variant would silently rewrite the JSONB array but not the TEXT column, breaking joins. protocol_label is now pub and used on both writers — single source of truth for on-disk protocol names. 2. Clock-authority policy. The migration declares event-time columns (arbs.ts, inclusion_results.resolved_at) CLIENT-SET; the previous PgLedger queries omitted both bindings, so the schema's DEFAULT now() silently fired and back-tests skewed by the dequeue lag. Adds ts: DateTime<Utc> to NewArb and resolved_at: DateTime<Utc> to InclusionUpdate, binds both, and updates the inclusion upsert branch to set resolved_at = EXCLUDED.resolved_at instead of now(). 3. arb_id was Uuid::new_v4() per row, so log lines (which print ArbOpportunity::id) had no join key into the trade ledger. Derives arb_id deterministically from opp.id via UUIDv5 against a stable namespace, and emits arb_id as a structured field on the ARB PUBLISHED log line so grep <id> logs/* | psql works. Adds chrono to aether-grpc-server deps and the v5 feature to the uuid workspace dep. Refs PR #119 review.
…e + dead-code doc Forward-looking items from PR #119 review: - Writer was sequential on a pool of 8 connections — every conn but one sat idle while INSERTs serialised on the dispatcher's await. Adds a Semaphore(LEDGER_MAX_INFLIGHT=8) gate around tokio::spawn fan-out so the pool actually runs at capacity. Pool size and inflight cap are named constants tuned in lockstep. - Channel-capacity comment math was wrong (1024 / 200 ≈ 5.12 s, not ~3 s). Also surfaces the constants in the connect log line so operators can see the tuning at startup. - Connect acquire_timeout 5 s → 2 s. Misconfigured DATABASE_URL should fail fast and degrade to NoopLedger via ledger_from_env, not stall engine boot. Logs already cover the reason. - u256_to_decimal: replace .unwrap_or(BigDecimal::from(0)) with .expect. U256::to_string is a base-10 digit sequence which BigDecimal::from_str accepts by definition; a failure here means the alloy/bigdecimal contract changed. Failing loudly beats a silent zero in arb economics. - Documents update_inclusion as engine-side dead code (Go owns inclusion writes today) so future readers do not look for a Rust caller. Refs PR #119 review.
|
Re-requesting review. All 3 blockers + 5 forward-looking items addressed across 2 atomic commits ( Blockers
Forward-looking
Cargo bumps
Local checks
|
Adds PgLedger backed by sqlx::PgPool with fire-and-forget tokio spawns on the engine hot path. Wires insert_arb on every ARB PUBLISHED and insert_pool on every register_pool. Bootstrap from DATABASE_URL falls back to NoopLedger when unset.
Replaces fire-and-forget tokio::spawn per write with a single dedicated
writer task draining a bounded mpsc::channel(1024). Every Ledger trait
method becomes a non-awaiting try_send; saturation drops the row and
bumps a metric instead of fanning out unbounded background tasks while
Postgres is slow.
Adds LedgerMetrics, registered on the engine's existing prometheus
Registry via the new EngineMetrics::registry() accessor so a single
/metrics endpoint emits both engine and ledger families:
aether_ledger_writes_total{op, result} Counter
aether_ledger_drops_total{op} Counter
aether_ledger_queue_depth Gauge
aether_ledger_write_latency_ms{op} Histogram
The writer task observes per-op latency from dequeue to query
completion, decrements queue_depth on dequeue, and tags every write
ok/err. A failing write logs and drops — never propagates to the
engine.
Channel size sized for ~3 s of bursty inserts at 200 arbs/s peak, so
saturation is the alert signal that Postgres is the bottleneck. The
sqlx pool stays bounded at 8 connections, so even if every slot stalls
the channel still drops cleanly rather than blocking.
Refs PR #115 follow-up; PR-1 of the 3-PR plan to close issue #95.
…relation Three reviewer-blockers from PR #119 round 1: 1. arbs.protocols was bound via format!("{:?}", h.protocol) while pool_registry.protocol used the pinned protocol_label. Renaming a ProtocolType variant would silently rewrite the JSONB array but not the TEXT column, breaking joins. protocol_label is now pub and used on both writers — single source of truth for on-disk protocol names. 2. Clock-authority policy. The migration declares event-time columns (arbs.ts, inclusion_results.resolved_at) CLIENT-SET; the previous PgLedger queries omitted both bindings, so the schema's DEFAULT now() silently fired and back-tests skewed by the dequeue lag. Adds ts: DateTime<Utc> to NewArb and resolved_at: DateTime<Utc> to InclusionUpdate, binds both, and updates the inclusion upsert branch to set resolved_at = EXCLUDED.resolved_at instead of now(). 3. arb_id was Uuid::new_v4() per row, so log lines (which print ArbOpportunity::id) had no join key into the trade ledger. Derives arb_id deterministically from opp.id via UUIDv5 against a stable namespace, and emits arb_id as a structured field on the ARB PUBLISHED log line so grep <id> logs/* | psql works. Adds chrono to aether-grpc-server deps and the v5 feature to the uuid workspace dep. Refs PR #119 review.
…e + dead-code doc Forward-looking items from PR #119 review: - Writer was sequential on a pool of 8 connections — every conn but one sat idle while INSERTs serialised on the dispatcher's await. Adds a Semaphore(LEDGER_MAX_INFLIGHT=8) gate around tokio::spawn fan-out so the pool actually runs at capacity. Pool size and inflight cap are named constants tuned in lockstep. - Channel-capacity comment math was wrong (1024 / 200 ≈ 5.12 s, not ~3 s). Also surfaces the constants in the connect log line so operators can see the tuning at startup. - Connect acquire_timeout 5 s → 2 s. Misconfigured DATABASE_URL should fail fast and degrade to NoopLedger via ledger_from_env, not stall engine boot. Logs already cover the reason. - u256_to_decimal: replace .unwrap_or(BigDecimal::from(0)) with .expect. U256::to_string is a base-10 digit sequence which BigDecimal::from_str accepts by definition; a failure here means the alloy/bigdecimal contract changed. Failing loudly beats a silent zero in arb economics. - Documents update_inclusion as engine-side dead code (Go owns inclusion writes today) so future readers do not look for a Rust caller. Refs PR #119 review.
0f273f0 to
e2f7422
Compare
…relation Three reviewer-blockers from PR #119 round 1: 1. arbs.protocols was bound via format!("{:?}", h.protocol) while pool_registry.protocol used the pinned protocol_label. Renaming a ProtocolType variant would silently rewrite the JSONB array but not the TEXT column, breaking joins. protocol_label is now pub and used on both writers — single source of truth for on-disk protocol names. 2. Clock-authority policy. The migration declares event-time columns (arbs.ts, inclusion_results.resolved_at) CLIENT-SET; the previous PgLedger queries omitted both bindings, so the schema's DEFAULT now() silently fired and back-tests skewed by the dequeue lag. Adds ts: DateTime<Utc> to NewArb and resolved_at: DateTime<Utc> to InclusionUpdate, binds both, and updates the inclusion upsert branch to set resolved_at = EXCLUDED.resolved_at instead of now(). 3. arb_id was Uuid::new_v4() per row, so log lines (which print ArbOpportunity::id) had no join key into the trade ledger. Derives arb_id deterministically from opp.id via UUIDv5 against a stable namespace, and emits arb_id as a structured field on the ARB PUBLISHED log line so grep <id> logs/* | psql works. Adds chrono to aether-grpc-server deps and the v5 feature to the uuid workspace dep. Refs PR #119 review.
Summary
PR-1 of the 3-PR plan to close #95. Adds the Rust trade-ledger writer surface:
PgLedgersqlx impl, engine wiring onregister_poolandARB PUBLISHED, bounded mpsc + dedicated writer task, and four ledger metrics on the existing/metricsendpoint. Hot path stays non-blocking; saturation drops the row and bumps a counter rather than fanning out unbounded background tasks.This does not close #95 — the Go side (criterion 6), CI Postgres service (criterion 9), and counter-vs-row reconciliation (criterion 8) land in PR-2 and PR-3.
Commits
741c53afeat(ledger): PgLedger sqlx impl + engine wiringa0d0341feat(ledger): bounded mpsc + LedgerMetrics for hot-path safetyWhat landed
Persistence
PgLedgeronsqlx::PgPool(max 8 conns, 5 s acquire timeout).insert_arb_inner/insert_pool_inner/update_inclusion_innerwithON CONFLICTclauses (idempotent).ledger_from_env()bootstrap → falls back toNoopLedgerwhenDATABASE_URLis unset / connect fails. Engine behaviour identical to today on dev / CI.uuid_compatplaceholder retired in favour ofuuid::Uuid.NewPool.protocol: ProtocolTypebinds viaprotocol_label()so the TEXT column gets the stable serde tag, notformat!("{:?}", …).Engine wiring
register_pool→insert_poolper discovered pool.insert_arbper validated arb.update_inclusionimpl exists onLedgerfor future Rust-side reconciliation backfill; not currently called from the engine (Go owns inclusion writes).Hot-path safety
mpsc::channel(1024)between hot path and a single dedicated writer task.try_sendnon-blocking;Full→ drop +aether_ledger_drops_total{op}.Closed→ debug log only (shutdown path).Observability
prometheus::Registryvia the newEngineMetrics::registry()accessor — single/metricsendpoint, no second scrape:aether_ledger_writes_total{op, result}Counter (ok/err)aether_ledger_drops_total{op}Counteraether_ledger_queue_depthGaugeaether_ledger_write_latency_ms{op}HistogramTests
noop_ledger_silently_accepts_writes,noop_ledger_is_object_safe(preserved).u256_to_decimal_max—NUMERIC(78,0)round-trip onU256::MAX.protocol_label_matches_serde_tag— pins the static label to the serde tag so a future serde-driven query path stays compatible.ledger_metrics_register_round_trips— exercises every metric path so a typo surfaces in CI.Local checks
cargo build --workspaceclean.cargo clippy --workspace --all-targets -- -D warningsclean.cargo test --workspace420 passed, 0 failed.Acceptance criteria progress (#95)
DATABASE_URL, no-op when unsetTIMESTAMPTZ+NUMERIC(78,0)Out of scope
aether_ledger_drops_totalsustained growth → PR-3.