From f7789b2c4b03824d4f4dc913549ec85fd18bd926 Mon Sep 17 00:00:00 2001 From: 0xfandom Date: Tue, 28 Apr 2026 17:24:22 +0530 Subject: [PATCH 1/4] feat(ledger): PgLedger sqlx impl + engine wiring Adds PgLedger backed by sqlx::PgPool with fire-and-forget tokio spawns on the engine hot path. Wires insert_arb on every ARB PUBLISHED and insert_pool on every register_pool. Bootstrap from DATABASE_URL falls back to NoopLedger when unset. --- Cargo.toml | 5 + crates/common/Cargo.toml | 5 + crates/common/src/db.rs | 359 ++++++++++++++++++++++++++----- crates/grpc-server/Cargo.toml | 2 + crates/grpc-server/src/engine.rs | 92 ++++++++ crates/grpc-server/src/main.rs | 4 +- 6 files changed, 410 insertions(+), 57 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 53337cb..71e2aa5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,3 +56,8 @@ config = "0.14" serde_yml = "0.0.12" # Benchmarking criterion = { version = "0.5", features = ["html_reports"] } +# Database (trade ledger) +sqlx = { version = "0.8", default-features = false, features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "json", "macros", "bigdecimal"] } +uuid = { version = "1", features = ["v4", "serde"] } +chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } +bigdecimal = "0.4" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 85880b1..57482ed 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -11,6 +11,11 @@ thiserror = { workspace = true } tracing = { workspace = true } ruint = { workspace = true } prost = { workspace = true } +tokio = { workspace = true } +sqlx = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +bigdecimal = { workspace = true } [dev-dependencies] serde_json = { workspace = true } diff --git a/crates/common/src/db.rs b/crates/common/src/db.rs index c6c1a40..fd78306 100644 --- a/crates/common/src/db.rs +++ b/crates/common/src/db.rs @@ -1,35 +1,34 @@ //! Trade-ledger access layer (Rust side). //! -//! Ships the [`Ledger`] trait and a [`NoopLedger`] default. The real -//! `sqlx::PgPool`-backed impl plus engine wiring on the `ARB PUBLISHED` path -//! land in a follow-up. Today the engine can depend on the trait without -//! pulling `sqlx`, keeping the default build and `DATABASE_URL`-unset -//! behaviour identical to current `main`. +//! Two impls of [`Ledger`]: //! -//! See `migrations/0001_trade_ledger.sql` for the schema this trait targets. +//! - [`NoopLedger`] — default, used when `DATABASE_URL` is unset. Discards +//! every write so engine behaviour is identical to runs without Postgres. +//! - [`PgLedger`] — `sqlx::PgPool`-backed. Public methods are sync; each call +//! spawns a detached tokio task so the engine hot path never blocks on I/O. +//! A connection blip logs and drops; the engine never crashes on ledger I/O. //! -//! ```ignore -//! use aether_common::db::{Ledger, NoopLedger, NewArb}; -//! -//! let ledger: Box = Box::new(NoopLedger); -//! ledger.insert_arb(&NewArb::default()); // no-op, no panic -//! ``` +//! See `migrations/0001_trade_ledger.sql` for the schema. -use std::sync::OnceLock; +use std::str::FromStr; +use std::sync::{Arc, OnceLock}; use alloy::primitives::{Address, B256, U256}; +use bigdecimal::BigDecimal; use serde::{Deserialize, Serialize}; +use sqlx::postgres::{PgPool, PgPoolOptions}; +use uuid::Uuid; use crate::types::ProtocolType; /// Insert payload for the `arbs` table. /// -/// Field shapes mirror the SQL schema 1:1 so a Postgres-backed `Ledger` impl -/// can map without extra conversion. `Default` exists so callers can build -/// the struct field by field without filling in every column. +/// Field shapes mirror the SQL schema 1:1 so the [`PgLedger`] impl maps +/// without extra conversion. `Default` exists so callers can build the struct +/// field by field without filling every column. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NewArb { - pub arb_id: uuid_compat::Uuid, + pub arb_id: Uuid, pub target_block: u64, pub path_hash: B256, pub hops: u8, @@ -51,8 +50,9 @@ pub struct NewArb { /// /// `protocol` is bound to [`ProtocolType`] (not `String`) so callers cannot /// invent values the rest of the system does not understand. The Postgres -/// column stays `TEXT`; the impl serialises via `ProtocolType`'s serde -/// representation, giving a stable on-disk name without losing type safety. +/// column stays `TEXT`; [`PgLedger::insert_pool_inner`] serialises via +/// `protocol_label` (matching the serde tag), giving a stable on-disk name +/// without losing type safety. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NewPool { pub address: Address, @@ -69,7 +69,7 @@ pub struct NewPool { /// reconciliation job can backfill from Rust if needed. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct InclusionUpdate { - pub bundle_id: uuid_compat::Uuid, + pub bundle_id: Uuid, pub builder: String, pub included: bool, pub included_block: Option, @@ -118,45 +118,267 @@ impl Ledger for NoopLedger { fn update_inclusion(&self, _update: &InclusionUpdate) {} } -/// Minimal UUID stand-in so this module does not pull a new workspace dep -/// today. A follow-up swaps this for `uuid::Uuid` once the `sqlx` / `uuid` -/// features land. -/// -/// **Byte-order contract.** The 16-byte payload is stored in **RFC 4122 -/// network byte order** (big-endian for `time_low`, `time_mid`, -/// `time_hi_and_version`). This matches: -/// -/// * Postgres `uuid` binary wire format, -/// * `uuid::Uuid::as_bytes` and `uuid::Uuid::from_bytes`, -/// * the canonical `8-4-4-4-12` hex string read left-to-right. +/// Postgres-backed [`Ledger`] using `sqlx`. /// -/// Future swap to `uuid::Uuid` is therefore a straight `from_bytes` / -/// `as_bytes` round-trip with no byte reversal — call sites that build a -/// `uuid_compat::Uuid` from raw bytes today must already be feeding network -/// order. Anything mixed-endian (`uuid::Uuid::from_bytes_le`, -/// little-endian Windows GUIDs) must be converted before construction. -pub mod uuid_compat { - use serde::{Deserialize, Serialize}; - - #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] - pub struct Uuid(pub [u8; 16]); - - impl Uuid { - pub const fn nil() -> Self { - Self([0u8; 16]) - } +/// Each public call spawns a detached tokio task so the caller (typically the +/// engine on the hot path after `ARB PUBLISHED`) never awaits I/O. The pool +/// is bounded so a slow Postgres cannot fan out unbounded backpressure. +#[derive(Clone)] +pub struct PgLedger { + pool: PgPool, +} - /// Build a UUID from 16 bytes in **RFC 4122 network byte order**. - /// See module docs for the byte-order contract. - pub const fn from_bytes(b: [u8; 16]) -> Self { - Self(b) - } +impl PgLedger { + /// Connect to Postgres at `database_url` and return a ready ledger. + /// + /// Pool sizing matches the engine: a few simultaneous inserts are common + /// but most blocks publish 0–5 arbs, so a small pool with idle timeout is + /// enough. Callers should construct this once at startup and clone the + /// `Arc` everywhere. + pub async fn connect(database_url: &str) -> Result { + let pool = PgPoolOptions::new() + .max_connections(8) + .acquire_timeout(std::time::Duration::from_secs(5)) + .connect(database_url) + .await?; + tracing::info!( + target: "aether::ledger", + "PgLedger connected — trade ledger writes enabled" + ); + Ok(Self { pool }) + } - /// Returns the 16-byte payload in **RFC 4122 network byte order**. - /// See module docs for the byte-order contract. - pub const fn as_bytes(&self) -> &[u8; 16] { - &self.0 - } + /// Fallible variant of [`Ledger::insert_arb`] used by the spawned task. + async fn insert_arb_inner(pool: &PgPool, arb: &NewArb) -> Result<(), sqlx::Error> { + let arb_id = arb.arb_id; + let target_block = i64::try_from(arb.target_block).unwrap_or(i64::MAX); + let path_hash = arb.path_hash.as_slice(); + let hops = i16::from(arb.hops); + let flashloan_token = arb.flashloan_token.as_slice(); + let flashloan_amount = u256_to_decimal(arb.flashloan_amount); + let gross_profit = u256_to_decimal(arb.gross_profit_wei); + let net_profit = u256_to_decimal(arb.net_profit_wei); + let gas_estimate = i64::try_from(arb.gas_estimate).unwrap_or(i64::MAX); + let tip_bps = i32::try_from(arb.tip_bps).unwrap_or(i32::MAX); + let detection_us = arb + .detection_us + .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); + let sim_us = arb.sim_us.map(|v| i64::try_from(v).unwrap_or(i64::MAX)); + + sqlx::query( + r#" + INSERT INTO arbs ( + arb_id, target_block, path_hash, hops, + path, protocols, pool_addresses, + flashloan_token, flashloan_amount, + gross_profit_wei, net_profit_wei, + gas_estimate, tip_bps, + detection_us, sim_us, git_sha + ) VALUES ( + $1, $2, $3, $4, + $5, $6, $7, + $8, $9, + $10, $11, + $12, $13, + $14, $15, $16 + ) + ON CONFLICT (arb_id) DO NOTHING + "#, + ) + .bind(arb_id) + .bind(target_block) + .bind(path_hash) + .bind(hops) + .bind(&arb.path) + .bind(&arb.protocols) + .bind(&arb.pool_addresses) + .bind(flashloan_token) + .bind(&flashloan_amount) + .bind(&gross_profit) + .bind(&net_profit) + .bind(gas_estimate) + .bind(tip_bps) + .bind(detection_us) + .bind(sim_us) + .bind(arb.git_sha.as_deref()) + .execute(pool) + .await?; + Ok(()) + } + + async fn insert_pool_inner(pool: &PgPool, np: &NewPool) -> Result<(), sqlx::Error> { + let address = np.address.as_slice(); + let protocol = protocol_label(np.protocol); + let token0 = np.token0.as_slice(); + let token1 = np.token1.as_slice(); + let fee_bps = np.fee_bps.map(|v| i32::try_from(v).unwrap_or(i32::MAX)); + + sqlx::query( + r#" + INSERT INTO pool_registry ( + address, protocol, token0, token1, fee_bps, tier, source + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7 + ) + ON CONFLICT (address) DO UPDATE + SET last_seen = now() + "#, + ) + .bind(address) + .bind(protocol) + .bind(token0) + .bind(token1) + .bind(fee_bps) + .bind(np.tier.as_deref()) + .bind(&np.source) + .execute(pool) + .await?; + Ok(()) + } + + async fn update_inclusion_inner( + pool: &PgPool, + u: &InclusionUpdate, + ) -> Result<(), sqlx::Error> { + let included_block = u + .included_block + .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); + let landed = u.landed_tx_hash.as_ref().map(|h| h.as_slice()); + + sqlx::query( + r#" + INSERT INTO inclusion_results ( + bundle_id, builder, included, included_block, landed_tx_hash, error + ) VALUES ( + $1, $2, $3, $4, $5, $6 + ) + ON CONFLICT (bundle_id, builder) DO UPDATE SET + included = EXCLUDED.included, + included_block = EXCLUDED.included_block, + landed_tx_hash = EXCLUDED.landed_tx_hash, + error = EXCLUDED.error, + resolved_at = now() + "#, + ) + .bind(u.bundle_id) + .bind(&u.builder) + .bind(u.included) + .bind(included_block) + .bind(landed) + .bind(u.error.as_deref()) + .execute(pool) + .await?; + Ok(()) + } + + /// Borrow the underlying pool. Useful for read-only queries (reporters, + /// integration tests) that want to share the connection budget. + pub fn pool(&self) -> &PgPool { + &self.pool + } +} + +impl Ledger for PgLedger { + fn insert_arb(&self, arb: &NewArb) { + let pool = self.pool.clone(); + let arb = arb.clone(); + spawn_detached(async move { + if let Err(e) = PgLedger::insert_arb_inner(&pool, &arb).await { + tracing::warn!( + target: "aether::ledger", + error = %e, + arb_id = %arb.arb_id, + "insert_arb failed; dropping row" + ); + } + }); + } + + fn insert_pool(&self, pool_row: &NewPool) { + let pool = self.pool.clone(); + let row = pool_row.clone(); + spawn_detached(async move { + if let Err(e) = PgLedger::insert_pool_inner(&pool, &row).await { + tracing::warn!( + target: "aether::ledger", + error = %e, + pool = %row.address, + "insert_pool failed; dropping row" + ); + } + }); + } + + fn update_inclusion(&self, update: &InclusionUpdate) { + let pool = self.pool.clone(); + let upd = update.clone(); + spawn_detached(async move { + if let Err(e) = PgLedger::update_inclusion_inner(&pool, &upd).await { + tracing::warn!( + target: "aether::ledger", + error = %e, + bundle_id = %upd.bundle_id, + "update_inclusion failed; dropping row" + ); + } + }); + } +} + +/// Build a [`Ledger`] from `DATABASE_URL`. Returns [`NoopLedger`] when the var +/// is unset or empty so the engine stays runnable in dev / CI without +/// Postgres. +pub async fn ledger_from_env() -> Arc { + match std::env::var("DATABASE_URL") { + Ok(url) if !url.is_empty() => match PgLedger::connect(&url).await { + Ok(p) => Arc::new(p) as Arc, + Err(e) => { + tracing::error!( + target: "aether::ledger", + error = %e, + "PgLedger connect failed; falling back to NoopLedger" + ); + Arc::new(NoopLedger::new()) + } + }, + _ => Arc::new(NoopLedger::new()), + } +} + +/// Map a U256 to the `NUMERIC(78,0)` representation sqlx accepts via +/// [`BigDecimal`]. U256::MAX has 78 decimal digits, which fits. +fn u256_to_decimal(v: U256) -> BigDecimal { + BigDecimal::from_str(&v.to_string()).unwrap_or_else(|_| BigDecimal::from(0)) +} + +/// Stable on-disk name for a [`ProtocolType`]. Matches the serde enum tag so +/// rows written today and rows written by a future serde-driven impl stay +/// comparable. +fn protocol_label(p: ProtocolType) -> &'static str { + match p { + ProtocolType::UniswapV2 => "UniswapV2", + ProtocolType::UniswapV3 => "UniswapV3", + ProtocolType::SushiSwap => "SushiSwap", + ProtocolType::Curve => "Curve", + ProtocolType::BalancerV2 => "BalancerV2", + ProtocolType::BancorV3 => "BancorV3", + } +} + +/// Spawn a future on the current tokio runtime if one exists; otherwise log +/// and drop. The engine always runs under tokio so the drop branch is dev / +/// test only. +fn spawn_detached(fut: F) +where + F: std::future::Future + Send + 'static, +{ + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(fut); + } else { + tracing::debug!( + target: "aether::ledger", + "no tokio runtime; dropping ledger write" + ); } } @@ -176,4 +398,29 @@ mod tests { fn noop_ledger_is_object_safe() { let _: Box = Box::new(NoopLedger::new()); } + + #[test] + fn u256_to_decimal_max() { + let max = U256::MAX; + let d = u256_to_decimal(max); + assert_eq!(d.to_string(), max.to_string()); + } + + #[test] + fn protocol_label_matches_serde_tag() { + for (p, expected) in [ + (ProtocolType::UniswapV2, "UniswapV2"), + (ProtocolType::UniswapV3, "UniswapV3"), + (ProtocolType::SushiSwap, "SushiSwap"), + (ProtocolType::Curve, "Curve"), + (ProtocolType::BalancerV2, "BalancerV2"), + (ProtocolType::BancorV3, "BancorV3"), + ] { + assert_eq!(protocol_label(p), expected); + // Pin the static label to the serde tag so a future serde-driven + // query path produces the same on-disk value. + let serde_repr = serde_json::to_string(&p).expect("serde"); + assert_eq!(serde_repr, format!("\"{expected}\"")); + } + } } diff --git a/crates/grpc-server/Cargo.toml b/crates/grpc-server/Cargo.toml index 93a706a..01781e3 100644 --- a/crates/grpc-server/Cargo.toml +++ b/crates/grpc-server/Cargo.toml @@ -45,6 +45,8 @@ prometheus = "0.13.4" clap = { version = "4", features = ["derive"] } anyhow = { workspace = true } serde_json = { workspace = true } +uuid = { workspace = true } +sha2 = "0.10" [build-dependencies] tonic-build = { workspace = true } prost-build = { workspace = true } diff --git a/crates/grpc-server/src/engine.rs b/crates/grpc-server/src/engine.rs index 31c9dfe..71039ab 100644 --- a/crates/grpc-server/src/engine.rs +++ b/crates/grpc-server/src/engine.rs @@ -11,7 +11,10 @@ use alloy::primitives::{Address, U256}; use alloy::providers::{DynProvider, Provider}; use alloy::sol_types::SolCall; +use aether_common::db::{Ledger, NewArb, NewPool, NoopLedger}; use aether_common::types::{ArbHop, ArbOpportunity, PoolId, ProtocolType, SwapStep}; +use sha2::{Digest, Sha256}; +use uuid::Uuid; use aether_detector::bellman_ford::BellmanFord; use aether_detector::gas::{estimate_total_gas, gas_cost_wei}; use aether_detector::optimizer::ternary_search_optimal_input; @@ -126,6 +129,9 @@ pub struct AetherEngine { rpc_provider: Option>, /// Prometheus metrics for engine operations. metrics: Arc, + /// Persistent trade ledger. NoopLedger by default; PgLedger when + /// `DATABASE_URL` is set at startup. + ledger: Arc, } /// Lightweight snapshot of the current block's key fields. @@ -168,6 +174,59 @@ fn token_label(addr: &Address) -> String { } } +/// Build a [`NewArb`] row from a published opportunity. +/// +/// `arb_id` is generated server-side because `ArbOpportunity::id` is a free-form +/// String not guaranteed to be UUID-shaped. `path_hash` is sha256 of the pool +/// address sequence so equivalent paths collapse to the same key for grouping. +fn build_new_arb( + opp: &ArbOpportunity, + flashloan_token: Address, + flashloan_amount: U256, + net_profit_u128: u128, + tip_bps: u64, + sim_us: u128, + path_label: &str, +) -> NewArb { + let pool_addrs: Vec = opp + .hops + .iter() + .map(|h| format!("{:#x}", h.pool_address)) + .collect(); + let protocols: Vec = opp + .hops + .iter() + .map(|h| format!("{:?}", h.protocol)) + .collect(); + + let mut hasher = Sha256::new(); + for h in &opp.hops { + hasher.update(h.pool_address.as_slice()); + } + let digest = hasher.finalize(); + let mut path_hash = [0u8; 32]; + path_hash.copy_from_slice(&digest); + + NewArb { + arb_id: Uuid::new_v4(), + target_block: opp.block_number, + path_hash: path_hash.into(), + hops: u8::try_from(opp.hops.len()).unwrap_or(u8::MAX), + path: serde_json::Value::String(path_label.to_string()), + protocols: serde_json::json!(protocols), + pool_addresses: serde_json::json!(pool_addrs), + flashloan_token, + flashloan_amount, + gross_profit_wei: opp.total_profit_wei, + net_profit_wei: U256::from(net_profit_u128), + gas_estimate: opp.total_gas, + tip_bps: u32::try_from(tip_bps).unwrap_or(u32::MAX), + detection_us: None, + sim_us: Some(u64::try_from(sim_us).unwrap_or(u64::MAX)), + git_sha: option_env!("GIT_SHA").map(|s| s.to_string()), + } +} + /// Build a path like "WETH -> AAVE -> WETH" from an `ArbOpportunity`'s hop list. fn arb_path_labels(opp: &ArbOpportunity) -> String { if opp.hops.is_empty() { @@ -219,6 +278,18 @@ impl AetherEngine { config: EngineConfig, arb_tx: broadcast::Sender, metrics: Arc, + ) -> Self { + Self::new_with_metrics_and_ledger(config, arb_tx, metrics, Arc::new(NoopLedger::new())) + } + + /// Build an engine with an explicit ledger backend. Production callers + /// pass a `PgLedger` constructed from `DATABASE_URL`; tests and dev mode + /// use `NoopLedger`. + pub fn new_with_metrics_and_ledger( + config: EngineConfig, + arb_tx: broadcast::Sender, + metrics: Arc, + ledger: Arc, ) -> Self { let event_channels = Arc::new(EventChannels::new()); let detector = BellmanFord::new(config.max_hops, config.detection_time_budget_us); @@ -256,6 +327,7 @@ impl AetherEngine { pool_registry: Arc::new(ArcSwap::from_pointee(HashMap::new())), rpc_provider, metrics, + ledger, } } @@ -347,6 +419,16 @@ impl AetherEngine { %pool_addr, %token0, %token1, ?protocol, fee_bps, "Pool registered (t0={}, t1={})", t0_idx, t1_idx ); + + self.ledger.insert_pool(&NewPool { + address: pool_addr, + protocol, + token0, + token1, + fee_bps: Some(fee_bps), + tier: None, + source: "register_pool".to_string(), + }); } /// Bootstrap pools from a TOML config file (e.g. `config/pools.toml`). @@ -1374,6 +1456,16 @@ impl AetherEngine { sim_us, "ARB PUBLISHED" ); + + self.ledger.insert_arb(&build_new_arb( + &input.opp, + input.flashloan_token, + input.input_amount, + input.net_profit, + self.config.tip_bps, + sim_us, + &path, + )); } }); } diff --git a/crates/grpc-server/src/main.rs b/crates/grpc-server/src/main.rs index 656a56b..d0c0e81 100644 --- a/crates/grpc-server/src/main.rs +++ b/crates/grpc-server/src/main.rs @@ -61,10 +61,12 @@ async fn main() -> Result<(), Box> { } else { info!("ETH_RPC_URL not set — engine will use empty-state simulation"); } - let engine = Arc::new(AetherEngine::new_with_metrics( + let ledger = aether_common::db::ledger_from_env().await; + let engine = Arc::new(AetherEngine::new_with_metrics_and_ledger( engine_config, arb_tx, Arc::clone(&metrics), + ledger, )); // ControlService needs a handle to the engine for hot-reload support. From f03d5bc1215dce3a0dc7cda4c124dde74b161ceb Mon Sep 17 00:00:00 2001 From: 0xfandom Date: Tue, 5 May 2026 13:57:10 +0530 Subject: [PATCH 2/4] feat(ledger): bounded mpsc + LedgerMetrics for hot-path safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces fire-and-forget tokio::spawn per write with a single dedicated writer task draining a bounded mpsc::channel(1024). Every Ledger trait method becomes a non-awaiting try_send; saturation drops the row and bumps a metric instead of fanning out unbounded background tasks while Postgres is slow. Adds LedgerMetrics, registered on the engine's existing prometheus Registry via the new EngineMetrics::registry() accessor so a single /metrics endpoint emits both engine and ledger families: aether_ledger_writes_total{op, result} Counter aether_ledger_drops_total{op} Counter aether_ledger_queue_depth Gauge aether_ledger_write_latency_ms{op} Histogram The writer task observes per-op latency from dequeue to query completion, decrements queue_depth on dequeue, and tags every write ok/err. A failing write logs and drops — never propagates to the engine. Channel size sized for ~3 s of bursty inserts at 200 arbs/s peak, so saturation is the alert signal that Postgres is the bottleneck. The sqlx pool stays bounded at 8 connections, so even if every slot stalls the channel still drops cleanly rather than blocking. Refs PR #115 follow-up; PR-1 of the 3-PR plan to close issue #95. --- crates/common/Cargo.toml | 1 + crates/common/src/db.rs | 560 +++++++++++++++++++----------- crates/grpc-server/src/main.rs | 3 +- crates/grpc-server/src/metrics.rs | 7 + 4 files changed, 375 insertions(+), 196 deletions(-) diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 57482ed..7ef4326 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -16,6 +16,7 @@ sqlx = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } bigdecimal = { workspace = true } +prometheus = "0.13.4" [dev-dependencies] serde_json = { workspace = true } diff --git a/crates/common/src/db.rs b/crates/common/src/db.rs index fd78306..bd1d4fc 100644 --- a/crates/common/src/db.rs +++ b/crates/common/src/db.rs @@ -5,22 +5,44 @@ //! - [`NoopLedger`] — default, used when `DATABASE_URL` is unset. Discards //! every write so engine behaviour is identical to runs without Postgres. //! - [`PgLedger`] — `sqlx::PgPool`-backed. Public methods are sync; each call -//! spawns a detached tokio task so the engine hot path never blocks on I/O. -//! A connection blip logs and drops; the engine never crashes on ledger I/O. +//! enqueues onto a **bounded** mpsc and a single dedicated writer task drains +//! it. The hot path never awaits I/O. Channel saturation drops the row and +//! bumps `aether_ledger_drops_total{op}`; a slow Postgres can never exert +//! unbounded backpressure on the engine. +//! +//! Observability surface (registered against a shared `prometheus::Registry`): +//! +//! | Metric | Type | Labels | +//! |---|---|---| +//! | `aether_ledger_writes_total` | Counter | `op`, `result` (`ok`/`err`) | +//! | `aether_ledger_drops_total` | Counter | `op` | +//! | `aether_ledger_queue_depth` | Gauge | — | +//! | `aether_ledger_write_latency_ms` | Histogram | `op` | //! //! See `migrations/0001_trade_ledger.sql` for the schema. use std::str::FromStr; use std::sync::{Arc, OnceLock}; +use std::time::Instant; use alloy::primitives::{Address, B256, U256}; use bigdecimal::BigDecimal; +use prometheus::{ + HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry, +}; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgPool, PgPoolOptions}; +use tokio::sync::mpsc; use uuid::Uuid; use crate::types::ProtocolType; +/// Channel depth between the engine hot path and the PgLedger writer task. +/// Sized for ~3 s of bursty inserts (engine tops out around 200 arbs/s under +/// peak load) before saturating and dropping. Breached only when Postgres +/// stalls; the drops counter is the alert signal. +const LEDGER_CHANNEL_CAPACITY: usize = 1024; + /// Insert payload for the `arbs` table. /// /// Field shapes mirror the SQL schema 1:1 so the [`PgLedger`] impl maps @@ -91,6 +113,75 @@ pub trait Ledger: Send + Sync { fn update_inclusion(&self, update: &InclusionUpdate); } +/// Prometheus handles shared with [`PgLedger`]. Constructed once at startup +/// against the engine's existing `Registry` so a single `/metrics` endpoint +/// emits both engine and ledger families. +pub struct LedgerMetrics { + writes_total: IntCounterVec, + drops_total: IntCounterVec, + queue_depth: IntGauge, + write_latency_ms: HistogramVec, +} + +impl LedgerMetrics { + /// Register all four ledger metrics on the provided `Registry`. + /// + /// Panics on register failure; this is startup code and a duplicate + /// registration indicates a programmer error, not a runtime condition. + pub fn register(registry: &Registry) -> Arc { + let writes_total = IntCounterVec::new( + Opts::new( + "aether_ledger_writes_total", + "Trade-ledger writes attempted by the writer task, by op and outcome", + ), + &["op", "result"], + ) + .expect("aether_ledger_writes_total counter vec"); + let drops_total = IntCounterVec::new( + Opts::new( + "aether_ledger_drops_total", + "Trade-ledger writes dropped because the bounded channel was full", + ), + &["op"], + ) + .expect("aether_ledger_drops_total counter vec"); + let queue_depth = IntGauge::new( + "aether_ledger_queue_depth", + "Pending trade-ledger writes sitting in the writer-task channel", + ) + .expect("aether_ledger_queue_depth gauge"); + let write_latency_ms = HistogramVec::new( + HistogramOpts::new( + "aether_ledger_write_latency_ms", + "Per-op latency of trade-ledger writes from dequeue to query completion", + ) + .buckets(vec![0.5, 1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0]), + &["op"], + ) + .expect("aether_ledger_write_latency_ms histogram vec"); + + registry + .register(Box::new(writes_total.clone())) + .expect("register aether_ledger_writes_total"); + registry + .register(Box::new(drops_total.clone())) + .expect("register aether_ledger_drops_total"); + registry + .register(Box::new(queue_depth.clone())) + .expect("register aether_ledger_queue_depth"); + registry + .register(Box::new(write_latency_ms.clone())) + .expect("register aether_ledger_write_latency_ms"); + + Arc::new(Self { + writes_total, + drops_total, + queue_depth, + write_latency_ms, + }) + } +} + /// Default ledger: discards every write. /// /// Logs once on construction so operators can grep for "ledger disabled" in @@ -118,219 +209,287 @@ impl Ledger for NoopLedger { fn update_inclusion(&self, _update: &InclusionUpdate) {} } +/// One unit of ledger work shipped over the writer-task channel. Owns its +/// payload so the hot path can drop the original immediately. +enum LedgerOp { + InsertArb(Box), + InsertPool(Box), + UpdateInclusion(Box), +} + +impl LedgerOp { + fn label(&self) -> &'static str { + match self { + LedgerOp::InsertArb(_) => "insert_arb", + LedgerOp::InsertPool(_) => "insert_pool", + LedgerOp::UpdateInclusion(_) => "update_inclusion", + } + } +} + /// Postgres-backed [`Ledger`] using `sqlx`. /// -/// Each public call spawns a detached tokio task so the caller (typically the -/// engine on the hot path after `ARB PUBLISHED`) never awaits I/O. The pool -/// is bounded so a slow Postgres cannot fan out unbounded backpressure. +/// The hot path enqueues onto a bounded channel; a single dedicated writer +/// task drains and executes. Channel saturation drops the row (with metric) +/// rather than blocking the engine. The connection pool is bounded so a slow +/// Postgres still cannot fan out unbounded backpressure even if it acquires +/// every slot. #[derive(Clone)] pub struct PgLedger { - pool: PgPool, + tx: mpsc::Sender, + metrics: Arc, } impl PgLedger { - /// Connect to Postgres at `database_url` and return a ready ledger. + /// Connect to Postgres and spawn the dedicated writer task. /// - /// Pool sizing matches the engine: a few simultaneous inserts are common - /// but most blocks publish 0–5 arbs, so a small pool with idle timeout is - /// enough. Callers should construct this once at startup and clone the - /// `Arc` everywhere. - pub async fn connect(database_url: &str) -> Result { + /// Returns once the pool is ready and the writer is live. The writer task + /// runs until the channel closes (i.e. every clone of the `Sender` is + /// dropped — typically at process shutdown). + pub async fn connect( + database_url: &str, + metrics: Arc, + ) -> Result { let pool = PgPoolOptions::new() .max_connections(8) .acquire_timeout(std::time::Duration::from_secs(5)) .connect(database_url) .await?; + + let (tx, rx) = mpsc::channel::(LEDGER_CHANNEL_CAPACITY); + spawn_writer(pool, rx, Arc::clone(&metrics)); + tracing::info!( target: "aether::ledger", + channel_capacity = LEDGER_CHANNEL_CAPACITY, "PgLedger connected — trade ledger writes enabled" ); - Ok(Self { pool }) - } - - /// Fallible variant of [`Ledger::insert_arb`] used by the spawned task. - async fn insert_arb_inner(pool: &PgPool, arb: &NewArb) -> Result<(), sqlx::Error> { - let arb_id = arb.arb_id; - let target_block = i64::try_from(arb.target_block).unwrap_or(i64::MAX); - let path_hash = arb.path_hash.as_slice(); - let hops = i16::from(arb.hops); - let flashloan_token = arb.flashloan_token.as_slice(); - let flashloan_amount = u256_to_decimal(arb.flashloan_amount); - let gross_profit = u256_to_decimal(arb.gross_profit_wei); - let net_profit = u256_to_decimal(arb.net_profit_wei); - let gas_estimate = i64::try_from(arb.gas_estimate).unwrap_or(i64::MAX); - let tip_bps = i32::try_from(arb.tip_bps).unwrap_or(i32::MAX); - let detection_us = arb - .detection_us - .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); - let sim_us = arb.sim_us.map(|v| i64::try_from(v).unwrap_or(i64::MAX)); - - sqlx::query( - r#" - INSERT INTO arbs ( - arb_id, target_block, path_hash, hops, - path, protocols, pool_addresses, - flashloan_token, flashloan_amount, - gross_profit_wei, net_profit_wei, - gas_estimate, tip_bps, - detection_us, sim_us, git_sha - ) VALUES ( - $1, $2, $3, $4, - $5, $6, $7, - $8, $9, - $10, $11, - $12, $13, - $14, $15, $16 - ) - ON CONFLICT (arb_id) DO NOTHING - "#, - ) - .bind(arb_id) - .bind(target_block) - .bind(path_hash) - .bind(hops) - .bind(&arb.path) - .bind(&arb.protocols) - .bind(&arb.pool_addresses) - .bind(flashloan_token) - .bind(&flashloan_amount) - .bind(&gross_profit) - .bind(&net_profit) - .bind(gas_estimate) - .bind(tip_bps) - .bind(detection_us) - .bind(sim_us) - .bind(arb.git_sha.as_deref()) - .execute(pool) - .await?; - Ok(()) - } - - async fn insert_pool_inner(pool: &PgPool, np: &NewPool) -> Result<(), sqlx::Error> { - let address = np.address.as_slice(); - let protocol = protocol_label(np.protocol); - let token0 = np.token0.as_slice(); - let token1 = np.token1.as_slice(); - let fee_bps = np.fee_bps.map(|v| i32::try_from(v).unwrap_or(i32::MAX)); - - sqlx::query( - r#" - INSERT INTO pool_registry ( - address, protocol, token0, token1, fee_bps, tier, source - ) VALUES ( - $1, $2, $3, $4, $5, $6, $7 - ) - ON CONFLICT (address) DO UPDATE - SET last_seen = now() - "#, - ) - .bind(address) - .bind(protocol) - .bind(token0) - .bind(token1) - .bind(fee_bps) - .bind(np.tier.as_deref()) - .bind(&np.source) - .execute(pool) - .await?; - Ok(()) - } - - async fn update_inclusion_inner( - pool: &PgPool, - u: &InclusionUpdate, - ) -> Result<(), sqlx::Error> { - let included_block = u - .included_block - .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); - let landed = u.landed_tx_hash.as_ref().map(|h| h.as_slice()); - - sqlx::query( - r#" - INSERT INTO inclusion_results ( - bundle_id, builder, included, included_block, landed_tx_hash, error - ) VALUES ( - $1, $2, $3, $4, $5, $6 - ) - ON CONFLICT (bundle_id, builder) DO UPDATE SET - included = EXCLUDED.included, - included_block = EXCLUDED.included_block, - landed_tx_hash = EXCLUDED.landed_tx_hash, - error = EXCLUDED.error, - resolved_at = now() - "#, - ) - .bind(u.bundle_id) - .bind(&u.builder) - .bind(u.included) - .bind(included_block) - .bind(landed) - .bind(u.error.as_deref()) - .execute(pool) - .await?; - Ok(()) + Ok(Self { tx, metrics }) } - /// Borrow the underlying pool. Useful for read-only queries (reporters, - /// integration tests) that want to share the connection budget. - pub fn pool(&self) -> &PgPool { - &self.pool + /// Common enqueue path: try_send, bump the right metric on the result. + /// Never awaits — the hot path stays non-blocking. + fn enqueue(&self, op: LedgerOp) { + let label = op.label(); + match self.tx.try_send(op) { + Ok(()) => { + self.metrics.queue_depth.inc(); + } + Err(mpsc::error::TrySendError::Full(_)) => { + self.metrics + .drops_total + .with_label_values(&[label]) + .inc(); + tracing::warn!( + target: "aether::ledger", + op = label, + capacity = LEDGER_CHANNEL_CAPACITY, + "ledger channel full — dropping row" + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + // Writer task has exited; this happens only at shutdown. + tracing::debug!( + target: "aether::ledger", + op = label, + "ledger channel closed; dropping row" + ); + } + } } } impl Ledger for PgLedger { fn insert_arb(&self, arb: &NewArb) { - let pool = self.pool.clone(); - let arb = arb.clone(); - spawn_detached(async move { - if let Err(e) = PgLedger::insert_arb_inner(&pool, &arb).await { - tracing::warn!( - target: "aether::ledger", - error = %e, - arb_id = %arb.arb_id, - "insert_arb failed; dropping row" - ); - } - }); + self.enqueue(LedgerOp::InsertArb(Box::new(arb.clone()))); } fn insert_pool(&self, pool_row: &NewPool) { - let pool = self.pool.clone(); - let row = pool_row.clone(); - spawn_detached(async move { - if let Err(e) = PgLedger::insert_pool_inner(&pool, &row).await { - tracing::warn!( - target: "aether::ledger", - error = %e, - pool = %row.address, - "insert_pool failed; dropping row" - ); - } - }); + self.enqueue(LedgerOp::InsertPool(Box::new(pool_row.clone()))); } fn update_inclusion(&self, update: &InclusionUpdate) { - let pool = self.pool.clone(); - let upd = update.clone(); - spawn_detached(async move { - if let Err(e) = PgLedger::update_inclusion_inner(&pool, &upd).await { - tracing::warn!( - target: "aether::ledger", - error = %e, - bundle_id = %upd.bundle_id, - "update_inclusion failed; dropping row" - ); - } - }); + self.enqueue(LedgerOp::UpdateInclusion(Box::new(update.clone()))); } } +/// Spawn the dedicated writer task. The task owns the `PgPool` so callers +/// hand off ownership at construction; queries run sequentially within the +/// task and concurrently across the pool's connections. +fn spawn_writer( + pool: PgPool, + mut rx: mpsc::Receiver, + metrics: Arc, +) { + tokio::spawn(async move { + while let Some(op) = rx.recv().await { + metrics.queue_depth.dec(); + let label = op.label(); + let timer = Instant::now(); + let result = match op { + LedgerOp::InsertArb(arb) => insert_arb_inner(&pool, &arb).await, + LedgerOp::InsertPool(p) => insert_pool_inner(&pool, &p).await, + LedgerOp::UpdateInclusion(u) => update_inclusion_inner(&pool, &u).await, + }; + let elapsed_ms = timer.elapsed().as_secs_f64() * 1_000.0; + metrics + .write_latency_ms + .with_label_values(&[label]) + .observe(elapsed_ms); + match result { + Ok(()) => { + metrics + .writes_total + .with_label_values(&[label, "ok"]) + .inc(); + } + Err(e) => { + metrics + .writes_total + .with_label_values(&[label, "err"]) + .inc(); + tracing::warn!( + target: "aether::ledger", + op = label, + error = %e, + elapsed_ms, + "ledger write failed; row dropped" + ); + } + } + } + tracing::info!(target: "aether::ledger", "PgLedger writer task exiting"); + }); +} + +async fn insert_arb_inner(pool: &PgPool, arb: &NewArb) -> Result<(), sqlx::Error> { + let arb_id = arb.arb_id; + let target_block = i64::try_from(arb.target_block).unwrap_or(i64::MAX); + let path_hash = arb.path_hash.as_slice(); + let hops = i16::from(arb.hops); + let flashloan_token = arb.flashloan_token.as_slice(); + let flashloan_amount = u256_to_decimal(arb.flashloan_amount); + let gross_profit = u256_to_decimal(arb.gross_profit_wei); + let net_profit = u256_to_decimal(arb.net_profit_wei); + let gas_estimate = i64::try_from(arb.gas_estimate).unwrap_or(i64::MAX); + let tip_bps = i32::try_from(arb.tip_bps).unwrap_or(i32::MAX); + let detection_us = arb + .detection_us + .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); + let sim_us = arb.sim_us.map(|v| i64::try_from(v).unwrap_or(i64::MAX)); + + sqlx::query( + r#" + INSERT INTO arbs ( + arb_id, target_block, path_hash, hops, + path, protocols, pool_addresses, + flashloan_token, flashloan_amount, + gross_profit_wei, net_profit_wei, + gas_estimate, tip_bps, + detection_us, sim_us, git_sha + ) VALUES ( + $1, $2, $3, $4, + $5, $6, $7, + $8, $9, + $10, $11, + $12, $13, + $14, $15, $16 + ) + ON CONFLICT (arb_id) DO NOTHING + "#, + ) + .bind(arb_id) + .bind(target_block) + .bind(path_hash) + .bind(hops) + .bind(&arb.path) + .bind(&arb.protocols) + .bind(&arb.pool_addresses) + .bind(flashloan_token) + .bind(&flashloan_amount) + .bind(&gross_profit) + .bind(&net_profit) + .bind(gas_estimate) + .bind(tip_bps) + .bind(detection_us) + .bind(sim_us) + .bind(arb.git_sha.as_deref()) + .execute(pool) + .await?; + Ok(()) +} + +async fn insert_pool_inner(pool: &PgPool, np: &NewPool) -> Result<(), sqlx::Error> { + let address = np.address.as_slice(); + let protocol = protocol_label(np.protocol); + let token0 = np.token0.as_slice(); + let token1 = np.token1.as_slice(); + let fee_bps = np.fee_bps.map(|v| i32::try_from(v).unwrap_or(i32::MAX)); + + sqlx::query( + r#" + INSERT INTO pool_registry ( + address, protocol, token0, token1, fee_bps, tier, source + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7 + ) + ON CONFLICT (address) DO UPDATE + SET last_seen = now() + "#, + ) + .bind(address) + .bind(protocol) + .bind(token0) + .bind(token1) + .bind(fee_bps) + .bind(np.tier.as_deref()) + .bind(&np.source) + .execute(pool) + .await?; + Ok(()) +} + +async fn update_inclusion_inner( + pool: &PgPool, + u: &InclusionUpdate, +) -> Result<(), sqlx::Error> { + let included_block = u + .included_block + .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); + let landed = u.landed_tx_hash.as_ref().map(|h| h.as_slice()); + + sqlx::query( + r#" + INSERT INTO inclusion_results ( + bundle_id, builder, included, included_block, landed_tx_hash, error + ) VALUES ( + $1, $2, $3, $4, $5, $6 + ) + ON CONFLICT (bundle_id, builder) DO UPDATE SET + included = EXCLUDED.included, + included_block = EXCLUDED.included_block, + landed_tx_hash = EXCLUDED.landed_tx_hash, + error = EXCLUDED.error, + resolved_at = now() + "#, + ) + .bind(u.bundle_id) + .bind(&u.builder) + .bind(u.included) + .bind(included_block) + .bind(landed) + .bind(u.error.as_deref()) + .execute(pool) + .await?; + Ok(()) +} + /// Build a [`Ledger`] from `DATABASE_URL`. Returns [`NoopLedger`] when the var /// is unset or empty so the engine stays runnable in dev / CI without /// Postgres. -pub async fn ledger_from_env() -> Arc { +pub async fn ledger_from_env(metrics: Arc) -> Arc { match std::env::var("DATABASE_URL") { - Ok(url) if !url.is_empty() => match PgLedger::connect(&url).await { + Ok(url) if !url.is_empty() => match PgLedger::connect(&url, metrics).await { Ok(p) => Arc::new(p) as Arc, Err(e) => { tracing::error!( @@ -365,23 +524,6 @@ fn protocol_label(p: ProtocolType) -> &'static str { } } -/// Spawn a future on the current tokio runtime if one exists; otherwise log -/// and drop. The engine always runs under tokio so the drop branch is dev / -/// test only. -fn spawn_detached(fut: F) -where - F: std::future::Future + Send + 'static, -{ - if let Ok(handle) = tokio::runtime::Handle::try_current() { - handle.spawn(fut); - } else { - tracing::debug!( - target: "aether::ledger", - "no tokio runtime; dropping ledger write" - ); - } -} - #[cfg(test)] mod tests { use super::*; @@ -423,4 +565,32 @@ mod tests { assert_eq!(serde_repr, format!("\"{expected}\"")); } } + + #[test] + fn ledger_metrics_register_round_trips() { + let registry = Registry::new(); + let m = LedgerMetrics::register(®istry); + // Exercise every path so a counter typo surfaces in CI. + m.writes_total.with_label_values(&["insert_arb", "ok"]).inc(); + m.writes_total.with_label_values(&["insert_pool", "err"]).inc(); + m.drops_total.with_label_values(&["update_inclusion"]).inc(); + m.queue_depth.set(7); + m.write_latency_ms + .with_label_values(&["insert_arb"]) + .observe(2.5); + + let families = registry.gather(); + let names: Vec<_> = families.iter().map(|f| f.get_name().to_string()).collect(); + for required in [ + "aether_ledger_writes_total", + "aether_ledger_drops_total", + "aether_ledger_queue_depth", + "aether_ledger_write_latency_ms", + ] { + assert!( + names.iter().any(|n| n == required), + "missing metric family {required}" + ); + } + } } diff --git a/crates/grpc-server/src/main.rs b/crates/grpc-server/src/main.rs index d0c0e81..c303c94 100644 --- a/crates/grpc-server/src/main.rs +++ b/crates/grpc-server/src/main.rs @@ -61,7 +61,8 @@ async fn main() -> Result<(), Box> { } else { info!("ETH_RPC_URL not set — engine will use empty-state simulation"); } - let ledger = aether_common::db::ledger_from_env().await; + let ledger_metrics = aether_common::db::LedgerMetrics::register(metrics.registry()); + let ledger = aether_common::db::ledger_from_env(ledger_metrics).await; let engine = Arc::new(AetherEngine::new_with_metrics_and_ledger( engine_config, arb_tx, diff --git a/crates/grpc-server/src/metrics.rs b/crates/grpc-server/src/metrics.rs index 22143e7..366a227 100644 --- a/crates/grpc-server/src/metrics.rs +++ b/crates/grpc-server/src/metrics.rs @@ -142,6 +142,13 @@ impl EngineMetrics { self.decode_errors.with_label_values(&[reason]).inc(); } + /// Borrow the underlying `Registry` so foreign metric families (e.g. the + /// trade-ledger counters in `aether_common::db`) can register on the same + /// scrape endpoint without standing up a second `/metrics` server. + pub fn registry(&self) -> &Registry { + &self.registry + } + /// Render the registered metrics in Prometheus text exposition format. /// `pub(crate)` so sibling modules (`provider::tests`) can assert on /// rendered counter values without exposing the whole registry. From 1a2ab0adf64a07c6daee5bedfee1aea9e0a0c91f Mon Sep 17 00:00:00 2001 From: 0xfandom Date: Tue, 5 May 2026 15:38:27 +0530 Subject: [PATCH 3/4] fix(ledger): protocol_label everywhere + clock-authority + arb_id correlation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three reviewer-blockers from PR #119 round 1: 1. arbs.protocols was bound via format!("{:?}", h.protocol) while pool_registry.protocol used the pinned protocol_label. Renaming a ProtocolType variant would silently rewrite the JSONB array but not the TEXT column, breaking joins. protocol_label is now pub and used on both writers — single source of truth for on-disk protocol names. 2. Clock-authority policy. The migration declares event-time columns (arbs.ts, inclusion_results.resolved_at) CLIENT-SET; the previous PgLedger queries omitted both bindings, so the schema's DEFAULT now() silently fired and back-tests skewed by the dequeue lag. Adds ts: DateTime to NewArb and resolved_at: DateTime to InclusionUpdate, binds both, and updates the inclusion upsert branch to set resolved_at = EXCLUDED.resolved_at instead of now(). 3. arb_id was Uuid::new_v4() per row, so log lines (which print ArbOpportunity::id) had no join key into the trade ledger. Derives arb_id deterministically from opp.id via UUIDv5 against a stable namespace, and emits arb_id as a structured field on the ARB PUBLISHED log line so grep logs/* | psql works. Adds chrono to aether-grpc-server deps and the v5 feature to the uuid workspace dep. Refs PR #119 review. --- Cargo.toml | 2 +- crates/common/src/db.rs | 42 +++++++++++++++++++++++--------- crates/grpc-server/Cargo.toml | 1 + crates/grpc-server/src/engine.rs | 41 +++++++++++++++++++++++++------ 4 files changed, 66 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 71e2aa5..bca6cbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,6 @@ serde_yml = "0.0.12" criterion = { version = "0.5", features = ["html_reports"] } # Database (trade ledger) sqlx = { version = "0.8", default-features = false, features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "json", "macros", "bigdecimal"] } -uuid = { version = "1", features = ["v4", "serde"] } +uuid = { version = "1", features = ["v4", "v5", "serde"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } bigdecimal = "0.4" diff --git a/crates/common/src/db.rs b/crates/common/src/db.rs index bd1d4fc..585c60a 100644 --- a/crates/common/src/db.rs +++ b/crates/common/src/db.rs @@ -27,6 +27,7 @@ use std::time::Instant; use alloy::primitives::{Address, B256, U256}; use bigdecimal::BigDecimal; +use chrono::{DateTime, Utc}; use prometheus::{ HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry, }; @@ -51,6 +52,10 @@ const LEDGER_CHANNEL_CAPACITY: usize = 1024; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct NewArb { pub arb_id: Uuid, + /// Event time — when the engine published the arb. Per the migration's + /// clock-authority policy this column is CLIENT-SET; producers MUST + /// populate it and never rely on the schema's `DEFAULT now()` fallback. + pub ts: DateTime, pub target_block: u64, pub path_hash: B256, pub hops: u8, @@ -97,6 +102,11 @@ pub struct InclusionUpdate { pub included_block: Option, pub landed_tx_hash: Option, pub error: Option, + /// Event time — when the GetBundleStats poll resolved. Per the + /// migration's clock-authority policy this column is CLIENT-SET and + /// must be populated by the writer; the schema `DEFAULT now()` is a + /// safety net for ad-hoc psql inserts only. + pub resolved_at: DateTime, } /// Persistence boundary for arb / pool / inclusion records. @@ -381,24 +391,25 @@ async fn insert_arb_inner(pool: &PgPool, arb: &NewArb) -> Result<(), sqlx::Error sqlx::query( r#" INSERT INTO arbs ( - arb_id, target_block, path_hash, hops, + arb_id, ts, target_block, path_hash, hops, path, protocols, pool_addresses, flashloan_token, flashloan_amount, gross_profit_wei, net_profit_wei, gas_estimate, tip_bps, detection_us, sim_us, git_sha ) VALUES ( - $1, $2, $3, $4, - $5, $6, $7, - $8, $9, - $10, $11, - $12, $13, - $14, $15, $16 + $1, $2, $3, $4, $5, + $6, $7, $8, + $9, $10, + $11, $12, + $13, $14, + $15, $16, $17 ) ON CONFLICT (arb_id) DO NOTHING "#, ) .bind(arb_id) + .bind(arb.ts) .bind(target_block) .bind(path_hash) .bind(hops) @@ -458,19 +469,23 @@ async fn update_inclusion_inner( .map(|v| i64::try_from(v).unwrap_or(i64::MAX)); let landed = u.landed_tx_hash.as_ref().map(|h| h.as_slice()); + // resolved_at is bound from the caller (CLIENT-SET, per the + // clock-authority policy in 0001_trade_ledger.sql). Both insert and + // update branches use the bound value so the column reflects when the + // GetBundleStats poll resolved in code, not when the row hit Postgres. sqlx::query( r#" INSERT INTO inclusion_results ( - bundle_id, builder, included, included_block, landed_tx_hash, error + bundle_id, builder, included, included_block, landed_tx_hash, error, resolved_at ) VALUES ( - $1, $2, $3, $4, $5, $6 + $1, $2, $3, $4, $5, $6, $7 ) ON CONFLICT (bundle_id, builder) DO UPDATE SET included = EXCLUDED.included, included_block = EXCLUDED.included_block, landed_tx_hash = EXCLUDED.landed_tx_hash, error = EXCLUDED.error, - resolved_at = now() + resolved_at = EXCLUDED.resolved_at "#, ) .bind(u.bundle_id) @@ -479,6 +494,7 @@ async fn update_inclusion_inner( .bind(included_block) .bind(landed) .bind(u.error.as_deref()) + .bind(u.resolved_at) .execute(pool) .await?; Ok(()) @@ -512,8 +528,10 @@ fn u256_to_decimal(v: U256) -> BigDecimal { /// Stable on-disk name for a [`ProtocolType`]. Matches the serde enum tag so /// rows written today and rows written by a future serde-driven impl stay -/// comparable. -fn protocol_label(p: ProtocolType) -> &'static str { +/// comparable. Public so the engine can use the same mapping when building +/// the JSONB `protocols` column on `NewArb` — keeping a single source of +/// truth for on-disk protocol names. +pub fn protocol_label(p: ProtocolType) -> &'static str { match p { ProtocolType::UniswapV2 => "UniswapV2", ProtocolType::UniswapV3 => "UniswapV3", diff --git a/crates/grpc-server/Cargo.toml b/crates/grpc-server/Cargo.toml index 01781e3..bd2766a 100644 --- a/crates/grpc-server/Cargo.toml +++ b/crates/grpc-server/Cargo.toml @@ -46,6 +46,7 @@ clap = { version = "4", features = ["derive"] } anyhow = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } +chrono = { workspace = true } sha2 = "0.10" [build-dependencies] tonic-build = { workspace = true } diff --git a/crates/grpc-server/src/engine.rs b/crates/grpc-server/src/engine.rs index 71039ab..f6c8df3 100644 --- a/crates/grpc-server/src/engine.rs +++ b/crates/grpc-server/src/engine.rs @@ -11,7 +11,7 @@ use alloy::primitives::{Address, U256}; use alloy::providers::{DynProvider, Provider}; use alloy::sol_types::SolCall; -use aether_common::db::{Ledger, NewArb, NewPool, NoopLedger}; +use aether_common::db::{protocol_label, Ledger, NewArb, NewPool, NoopLedger}; use aether_common::types::{ArbHop, ArbOpportunity, PoolId, ProtocolType, SwapStep}; use sha2::{Digest, Sha256}; use uuid::Uuid; @@ -174,11 +174,30 @@ fn token_label(addr: &Address) -> String { } } +/// Stable UUID namespace for deriving DB `arb_id` values from the engine's +/// log-side `ArbOpportunity::id` strings. Hard-coded so the same opportunity +/// id always maps to the same UUID across runs and machines, making +/// `grep logs/* | xargs psql -c 'SELECT … WHERE arb_id = …'` work without +/// a second lookup table. +const ARB_ID_NAMESPACE: Uuid = Uuid::from_bytes([ + 0x6e, 0xc6, 0xfd, 0x05, 0xb1, 0xc8, 0x4c, 0x4d, + 0x8d, 0x57, 0x4e, 0xc1, 0x77, 0xa2, 0x47, 0x6e, +]); + +/// Derive a deterministic `arb_id` (UUIDv5) from the engine's free-form +/// `ArbOpportunity::id`. Same input string always produces the same UUID, so +/// log-side ids and DB ids correlate without a side table. +pub(crate) fn arb_id_for_opp(opp_id: &str) -> Uuid { + Uuid::new_v5(&ARB_ID_NAMESPACE, opp_id.as_bytes()) +} + /// Build a [`NewArb`] row from a published opportunity. /// -/// `arb_id` is generated server-side because `ArbOpportunity::id` is a free-form -/// String not guaranteed to be UUID-shaped. `path_hash` is sha256 of the pool -/// address sequence so equivalent paths collapse to the same key for grouping. +/// `arb_id` is derived from `ArbOpportunity::id` via UUIDv5 so the engine's +/// log-side id and the DB row share a stable mapping; this is the only join +/// key between Loki / structured logs and the trade ledger. `path_hash` is +/// sha256 of the pool address sequence so equivalent paths collapse to the +/// same key for grouping. fn build_new_arb( opp: &ArbOpportunity, flashloan_token: Address, @@ -193,10 +212,15 @@ fn build_new_arb( .iter() .map(|h| format!("{:#x}", h.pool_address)) .collect(); - let protocols: Vec = opp + // Use the same `protocol_label` adapter the PgLedger uses for + // pool_registry.protocol so the JSONB array on arbs.protocols stays + // join-compatible with the TEXT column. format!("{:?}", _) silently + // diverges if ProtocolType variants are renamed; the serde-tag-pinned + // label is the single source of truth. + let protocols: Vec<&'static str> = opp .hops .iter() - .map(|h| format!("{:?}", h.protocol)) + .map(|h| protocol_label(h.protocol)) .collect(); let mut hasher = Sha256::new(); @@ -208,7 +232,8 @@ fn build_new_arb( path_hash.copy_from_slice(&digest); NewArb { - arb_id: Uuid::new_v4(), + arb_id: arb_id_for_opp(&opp.id), + ts: chrono::Utc::now(), target_block: opp.block_number, path_hash: path_hash.into(), hops: u8::try_from(opp.hops.len()).unwrap_or(u8::MAX), @@ -1446,8 +1471,10 @@ impl AetherEngine { sim_us, "Published validated arb" ); + let arb_id = arb_id_for_opp(&input.opp.id); info!( id = %input.opp.id, + arb_id = %arb_id, path = %path, hops = hop_count, flashloan = %flashloan_label, From e2f7422bec91a0350cef661628ccb984bfbd5b73 Mon Sep 17 00:00:00 2001 From: 0xfandom Date: Tue, 5 May 2026 15:44:17 +0530 Subject: [PATCH 4/4] perf(ledger): concurrent writer + tighter connect timeout + loud parse + dead-code doc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Forward-looking items from PR #119 review: - Writer was sequential on a pool of 8 connections — every conn but one sat idle while INSERTs serialised on the dispatcher's await. Adds a Semaphore(LEDGER_MAX_INFLIGHT=8) gate around tokio::spawn fan-out so the pool actually runs at capacity. Pool size and inflight cap are named constants tuned in lockstep. - Channel-capacity comment math was wrong (1024 / 200 ≈ 5.12 s, not ~3 s). Also surfaces the constants in the connect log line so operators can see the tuning at startup. - Connect acquire_timeout 5 s → 2 s. Misconfigured DATABASE_URL should fail fast and degrade to NoopLedger via ledger_from_env, not stall engine boot. Logs already cover the reason. - u256_to_decimal: replace .unwrap_or(BigDecimal::from(0)) with .expect. U256::to_string is a base-10 digit sequence which BigDecimal::from_str accepts by definition; a failure here means the alloy/bigdecimal contract changed. Failing loudly beats a silent zero in arb economics. - Documents update_inclusion as engine-side dead code (Go owns inclusion writes today) so future readers do not look for a Rust caller. Refs PR #119 review. --- crates/common/src/db.rs | 132 +++++++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 42 deletions(-) diff --git a/crates/common/src/db.rs b/crates/common/src/db.rs index 585c60a..0f1b037 100644 --- a/crates/common/src/db.rs +++ b/crates/common/src/db.rs @@ -33,17 +33,27 @@ use prometheus::{ }; use serde::{Deserialize, Serialize}; use sqlx::postgres::{PgPool, PgPoolOptions}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Semaphore}; use uuid::Uuid; use crate::types::ProtocolType; /// Channel depth between the engine hot path and the PgLedger writer task. -/// Sized for ~3 s of bursty inserts (engine tops out around 200 arbs/s under -/// peak load) before saturating and dropping. Breached only when Postgres -/// stalls; the drops counter is the alert signal. +/// Sized for ~5 s of bursty inserts at the engine's 200 arbs/s peak before +/// saturating (1024 / 200 ≈ 5.12 s). Breached only when Postgres stalls; the +/// drops counter is the alert signal. const LEDGER_CHANNEL_CAPACITY: usize = 1024; +/// Maximum simultaneous in-flight INSERTs. Matches the sqlx pool size so the +/// writer can saturate the pool without queueing on the connection acquire +/// path. Higher than the pool size = wasted spawns waiting for a connection; +/// lower = pool sits idle while writes serialise. +const LEDGER_MAX_INFLIGHT: usize = 8; + +/// sqlx connection pool size. Kept identical to LEDGER_MAX_INFLIGHT so the +/// two are tuned in lockstep. +const LEDGER_POOL_SIZE: u32 = 8; + /// Insert payload for the `arbs` table. /// /// Field shapes mirror the SQL schema 1:1 so the [`PgLedger`] impl maps @@ -261,8 +271,13 @@ impl PgLedger { metrics: Arc, ) -> Result { let pool = PgPoolOptions::new() - .max_connections(8) - .acquire_timeout(std::time::Duration::from_secs(5)) + .max_connections(LEDGER_POOL_SIZE) + // Short acquire timeout: misconfigured DATABASE_URL should fail + // boot in seconds, not block the engine while we wait. The + // ledger_from_env wrapper falls back to NoopLedger on this + // error, so a slow Postgres degrades gracefully instead of + // stalling startup. + .acquire_timeout(std::time::Duration::from_secs(2)) .connect(database_url) .await?; @@ -272,6 +287,8 @@ impl PgLedger { tracing::info!( target: "aether::ledger", channel_capacity = LEDGER_CHANNEL_CAPACITY, + pool_size = LEDGER_POOL_SIZE, + max_inflight = LEDGER_MAX_INFLIGHT, "PgLedger connected — trade ledger writes enabled" ); Ok(Self { tx, metrics }) @@ -318,57 +335,80 @@ impl Ledger for PgLedger { self.enqueue(LedgerOp::InsertPool(Box::new(pool_row.clone()))); } + /// `update_inclusion` is currently **unused on the engine side** — the Go + /// executor owns inclusion writes (it's the side that polls + /// `GetBundleStats`). This Rust path is reserved for a future + /// reconciliation worker that backfills `inclusion_results` from + /// on-chain block data when a builder API loses the race. Tests + /// exercise the wire-up so the code stays compilable; no engine-side + /// caller wires it yet. fn update_inclusion(&self, update: &InclusionUpdate) { self.enqueue(LedgerOp::UpdateInclusion(Box::new(update.clone()))); } } -/// Spawn the dedicated writer task. The task owns the `PgPool` so callers -/// hand off ownership at construction; queries run sequentially within the -/// task and concurrently across the pool's connections. +/// Spawn the dedicated writer dispatcher. The dispatcher dequeues from `rx` +/// and fans each op out to a tokio task gated by a semaphore so up to +/// [`LEDGER_MAX_INFLIGHT`] writes execute concurrently across the sqlx pool's +/// connections. Sequential await on the writer side previously left every +/// connection but one idle; the semaphore matches concurrency to the pool. fn spawn_writer( pool: PgPool, mut rx: mpsc::Receiver, metrics: Arc, ) { + let semaphore = Arc::new(Semaphore::new(LEDGER_MAX_INFLIGHT)); tokio::spawn(async move { while let Some(op) = rx.recv().await { metrics.queue_depth.dec(); - let label = op.label(); - let timer = Instant::now(); - let result = match op { - LedgerOp::InsertArb(arb) => insert_arb_inner(&pool, &arb).await, - LedgerOp::InsertPool(p) => insert_pool_inner(&pool, &p).await, - LedgerOp::UpdateInclusion(u) => update_inclusion_inner(&pool, &u).await, - }; - let elapsed_ms = timer.elapsed().as_secs_f64() * 1_000.0; - metrics - .write_latency_ms - .with_label_values(&[label]) - .observe(elapsed_ms); - match result { - Ok(()) => { - metrics - .writes_total - .with_label_values(&[label, "ok"]) - .inc(); + // Permit drops at task end, releasing one in-flight slot. + let permit = match Arc::clone(&semaphore).acquire_owned().await { + Ok(p) => p, + Err(_) => { + // Semaphore was closed; the dispatcher is shutting down. + break; } - Err(e) => { - metrics - .writes_total - .with_label_values(&[label, "err"]) - .inc(); - tracing::warn!( - target: "aether::ledger", - op = label, - error = %e, - elapsed_ms, - "ledger write failed; row dropped" - ); + }; + let pool = pool.clone(); + let metrics = Arc::clone(&metrics); + tokio::spawn(async move { + let label = op.label(); + let timer = Instant::now(); + let result = match op { + LedgerOp::InsertArb(arb) => insert_arb_inner(&pool, &arb).await, + LedgerOp::InsertPool(p) => insert_pool_inner(&pool, &p).await, + LedgerOp::UpdateInclusion(u) => update_inclusion_inner(&pool, &u).await, + }; + let elapsed_ms = timer.elapsed().as_secs_f64() * 1_000.0; + metrics + .write_latency_ms + .with_label_values(&[label]) + .observe(elapsed_ms); + match result { + Ok(()) => { + metrics + .writes_total + .with_label_values(&[label, "ok"]) + .inc(); + } + Err(e) => { + metrics + .writes_total + .with_label_values(&[label, "err"]) + .inc(); + tracing::warn!( + target: "aether::ledger", + op = label, + error = %e, + elapsed_ms, + "ledger write failed; row dropped" + ); + } } - } + drop(permit); + }); } - tracing::info!(target: "aether::ledger", "PgLedger writer task exiting"); + tracing::info!(target: "aether::ledger", "PgLedger writer dispatcher exiting"); }); } @@ -522,8 +562,16 @@ pub async fn ledger_from_env(metrics: Arc) -> Arc { /// Map a U256 to the `NUMERIC(78,0)` representation sqlx accepts via /// [`BigDecimal`]. U256::MAX has 78 decimal digits, which fits. +/// +/// `expect`s the parse rather than masking with `unwrap_or(0)`: `U256::to_string` +/// emits a base-10 digit sequence which `BigDecimal::from_str` accepts by +/// definition. A failure here would mean the alloy / bigdecimal contract +/// changed under us — a programmer bug we want to surface loudly, not a +/// silent zero that quietly corrupts every arb's economics. fn u256_to_decimal(v: U256) -> BigDecimal { - BigDecimal::from_str(&v.to_string()).unwrap_or_else(|_| BigDecimal::from(0)) + let s = v.to_string(); + BigDecimal::from_str(&s) + .expect("U256::to_string is always a valid base-10 BigDecimal input") } /// Stable on-disk name for a [`ProtocolType`]. Matches the serde enum tag so