diff --git a/.env.example b/.env.example index 4bb5647..85d2803 100644 --- a/.env.example +++ b/.env.example @@ -55,3 +55,28 @@ SLACK_WEBHOOK_URL= # POSTGRES_USER=aether # POSTGRES_PASSWORD=aether # POSTGRES_DB=aether + +# Mempool tracking — opt-in. When set to a truthy value (1/true/yes/on), +# aether-rust spawns the Alchemy alchemy_pendingTransactions WS subscription +# and the calldata decoder, and aether-monitor connects to the Flashbots +# MEV-Share SSE stream. Both binaries log + emit metrics only — no bundle is +# constructed and no submission is made. +# +# IMPORTANT: alchemy_pendingTransactions is an Alchemy-proprietary subscribe +# method. Reth, QuickNode, Infura, and self-hosted Geth do NOT implement it — +# the WS handshake succeeds but the subscription returns no events, so this +# feature silently produces zero metrics on a non-Alchemy endpoint. The +# subscription task logs a warning at startup if MEMPOOL_WS_URL / ETH_RPC_URL +# does not look like an Alchemy host. To use a non-Alchemy provider you need +# a different mempool source (Chainbound Fiber, bloXroute, self-hosted Reth +# txpool IPC) — see crates/ingestion/src/mempool.rs MempoolSource trait. +# MEMPOOL_TRACKING=1 + +# Override the WS endpoint for the mempool subscription. Defaults to +# ETH_RPC_URL when unset; only set this if you want a different node for +# the mempool stream than your block / log subscriptions. MUST point at an +# Alchemy endpoint until a non-Alchemy MempoolSource lands. +# MEMPOOL_WS_URL=wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY} + +# Override the Flashbots MEV-Share SSE endpoint. Defaults to mainnet. +# MEV_SHARE_URL=https://mev-share.flashbots.net diff --git a/crates/grpc-server/src/engine.rs b/crates/grpc-server/src/engine.rs index fd50f14..e636e7e 100644 --- a/crates/grpc-server/src/engine.rs +++ b/crates/grpc-server/src/engine.rs @@ -1572,10 +1572,14 @@ impl AetherEngine { Err(_) => return None, }; if net_profit < self.config.min_profit_threshold_wei { - debug!( - net_profit, - threshold = self.config.min_profit_threshold_wei, - "Below min profit threshold" + let net_profit_eth = net_profit as f64 / 1e18; + let threshold_eth = self.config.min_profit_threshold_wei as f64 / 1e18; + info!( + net_profit_wei = net_profit, + net_profit_eth = format!("{:.6}", net_profit_eth), + threshold_eth = format!("{:.6}", threshold_eth), + hops = candidate.hops.len(), + "CYCLE REJECTED: below min profit threshold" ); return None; } @@ -1812,9 +1816,22 @@ impl AetherEngine { self.metrics.observe_simulation_latency_us(sim_us); if !sim_result.success { - debug!(sim_us, reason = ?sim_result.revert_reason, "Simulation failed, skipping"); + info!( + sim_us, + reason = ?sim_result.revert_reason, + hops = input.opp.hops.len(), + expected_net_wei = input.net_profit, + "REVM SIM REVERTED" + ); continue; } + info!( + sim_us, + hops = input.opp.hops.len(), + expected_net_wei = input.net_profit, + expected_net_eth = format!("{:.6}", input.net_profit as f64 / 1e18), + "REVM SIM OK" + ); // Post-sim cross-check (gate 4 of the candidate gating layer). // The pre-sim gates catch corruption signatures visible from diff --git a/crates/grpc-server/src/main.rs b/crates/grpc-server/src/main.rs index 47d314d..315803e 100644 --- a/crates/grpc-server/src/main.rs +++ b/crates/grpc-server/src/main.rs @@ -13,6 +13,7 @@ use tokio_stream::wrappers::UnixListenerStream; mod cycle_gating; mod engine; +mod mempool_pipeline; mod pipeline; mod service; mod tracing_init; @@ -77,8 +78,8 @@ async fn main() -> Result<(), Box> { // Bootstrap pools from config file at startup. // Supports AETHER_POOLS_CONFIG env var to override the default path, // so the binary works regardless of the working directory. - let pools_config = std::env::var("AETHER_POOLS_CONFIG") - .unwrap_or_else(|_| "config/pools.toml".to_string()); + let pools_config = + std::env::var("AETHER_POOLS_CONFIG").unwrap_or_else(|_| "config/pools.toml".to_string()); let pool_count = engine.bootstrap_pools(&pools_config).await; info!(pool_count, path = %pools_config, "Pools loaded at startup"); @@ -115,13 +116,64 @@ async fn main() -> Result<(), Box> { provider_clone.run(provider_shutdown_rx).await; }); + // Mempool tracking is opt-in via MEMPOOL_TRACKING=1. When unset the + // engine behaves identically to today; when set we spawn the Alchemy + // pending-tx subscription and the decode pipeline that consumes it. + let mempool_handles = if aether_ingestion::mempool::is_enabled() { + info!("MEMPOOL_TRACKING enabled — spawning pending-tx subscription + decode pipeline"); + let ws_url = std::env::var("MEMPOOL_WS_URL") + .or_else(|_| std::env::var("ETH_RPC_URL")) + .unwrap_or_default(); + if ws_url.is_empty() { + tracing::warn!( + "MEMPOOL_TRACKING set but neither MEMPOOL_WS_URL nor ETH_RPC_URL provided; skipping" + ); + None + } else { + let cfg = aether_ingestion::mempool::AlchemyMempoolConfig { + ws_url, + router_filter: aether_ingestion::mempool::default_router_addresses(), + }; + let source = Arc::new(aether_ingestion::mempool::AlchemyMempool::new(cfg)); + let channels = Arc::clone(engine.event_channels()); + let source_shutdown = shutdown_rx.clone(); + let source_handle = tokio::spawn(async move { + use aether_ingestion::mempool::MempoolSource; + source.run(channels, source_shutdown).await; + }); + // Build the post-state simulation context from the engine's + // live registry / token index / snapshot. The detector mirrors + // the engine's BellmanFord config so the analytical scan + // honours the same hop / latency budget as the main path. + let engine_cfg = EngineConfig::default(); + let sim_ctx = Arc::new(mempool_pipeline::SimContext::new( + Arc::clone(engine.pool_registry()), + Arc::clone(engine.token_index()), + Arc::clone(engine.snapshot_manager()), + aether_detector::bellman_ford::BellmanFord::new( + engine_cfg.max_hops, + engine_cfg.detection_time_budget_us, + ), + Arc::clone(engine.pool_states()), + )); + let pipeline_handle = mempool_pipeline::spawn_mempool_pipeline( + Arc::clone(engine.event_channels()), + Arc::clone(&metrics), + Some(sim_ctx), + shutdown_rx.clone(), + ); + Some((source_handle, pipeline_handle)) + } + } else { + None + }; + // Read the listen address from the environment so the systemd unit and // the binary always agree. Default to localhost TCP for development. // // Production (UDS): GRPC_ADDRESS=unix:///var/run/aether/engine.sock // Development (TCP): GRPC_ADDRESS=[::1]:50051 (default) - let addr_str = - std::env::var("GRPC_ADDRESS").unwrap_or_else(|_| "[::1]:50051".to_string()); + let addr_str = std::env::var("GRPC_ADDRESS").unwrap_or_else(|_| "[::1]:50051".to_string()); let server = Server::builder() .add_service(ArbServiceServer::new(arb_service)) @@ -136,7 +188,9 @@ async fn main() -> Result<(), Box> { match std::fs::remove_file(uds_path) { Ok(()) => info!(path = %uds_path, "Removed stale UDS socket"), Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} - Err(e) => tracing::warn!(path = %uds_path, error = %e, "Failed to remove stale UDS socket"), + Err(e) => { + tracing::warn!(path = %uds_path, error = %e, "Failed to remove stale UDS socket") + } } // Ensure parent directory exists. @@ -148,10 +202,7 @@ async fn main() -> Result<(), Box> { // Restrict socket access to the process owner — UDS bypasses // network-layer controls (iptables, mTLS), so file permissions // are the only access control for ControlService endpoints. - std::fs::set_permissions( - uds_path, - PermissionsExt::from_mode(0o600), - )?; + std::fs::set_permissions(uds_path, PermissionsExt::from_mode(0o600))?; info!(path = %uds_path, "gRPC server listening on UDS"); let stream = UnixListenerStream::new(uds); server.serve_with_incoming(stream).await.map_err(|e| { @@ -196,6 +247,15 @@ async fn main() -> Result<(), Box> { error!(error = %e, "Provider task panicked"); } + if let Some((source_handle, pipeline_handle)) = mempool_handles { + if let Err(e) = source_handle.await { + error!(error = %e, "Mempool source task panicked"); + } + if let Err(e) = pipeline_handle.await { + error!(error = %e, "Mempool pipeline task panicked"); + } + } + server_result?; Ok(()) diff --git a/crates/grpc-server/src/mempool_pipeline.rs b/crates/grpc-server/src/mempool_pipeline.rs new file mode 100644 index 0000000..1b2a8ba --- /dev/null +++ b/crates/grpc-server/src/mempool_pipeline.rs @@ -0,0 +1,861 @@ +//! Pipeline that consumes pending-tx events from the mempool subscription +//! and runs them through the router calldata decoder. +//! +//! When a [`SimContext`] is provided, decoded UniswapV2 / SushiSwap swaps +//! are also fed into an analytical post-state simulator: the victim's +//! constant-product swap is applied to a clone of the live price graph, +//! and Bellman-Ford runs over the affected vertices to surface profitable +//! cycles. Profitable cycles are counted in +//! `aether_pending_arb_candidates_total{router, profit_bucket}`. Nothing +//! is submitted — this is a *candidate* metric that proves the post-state +//! pipeline produces non-empty output on real traffic. +//! +//! UniswapV3 / Curve / Balancer post-state math is not implemented here; +//! those decode paths still bump `pending_dex_tx_total` and are skipped +//! at the simulator layer with a `protocol_unsupported` reason. A revm- +//! backed simulator covering every protocol is the planned follow-up +//! ("Phase B" in the issue) and reuses this same pipeline shape. +//! +//! The pipeline runs only when [`aether_ingestion::mempool::is_enabled`] +//! returns `true` (i.e. `MEMPOOL_TRACKING=1` in the environment), so default +//! `main`-branch behaviour is unchanged. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use aether_common::types::ProtocolType; +use aether_detector::bellman_ford::BellmanFord; +use aether_ingestion::subscription::{EventChannels, PendingTxEvent}; +use aether_pools::router_decoder::{decode_pending, DecodeError, DecodedSwap, Protocol}; +use aether_pools::{predict_post_state_with_fallback, PoolStateCache, UnifiedPostState}; +use aether_state::snapshot::SnapshotManager; +use aether_state::token_index::TokenIndex; +use alloy::primitives::{Address, U256}; +use arc_swap::ArcSwap; +use tokio::sync::watch; +use tracing::{debug, info, warn}; + +use crate::engine::PoolMetadata; +use crate::EngineMetrics; + +/// Pair-keyed pool index built from the live pool registry. Lookup is O(1) +/// vs the previous registry.values().find(...) which was O(N) per pending +/// swap and would dominate the per-event budget at 5000+ pools. +/// +/// The key uses the canonical ordering (`min(token0, token1), max(...)`) so +/// either swap direction returns the same bucket. +type PairKey = (Address, Address, ProtocolType); +type PairIndex = HashMap>; + +fn canonical_pair(a: Address, b: Address) -> (Address, Address) { + if a <= b { + (a, b) + } else { + (b, a) + } +} + +fn build_pair_index(registry: &HashMap) -> PairIndex { + let mut idx: PairIndex = HashMap::with_capacity(registry.len()); + for meta in registry.values() { + let (a, b) = canonical_pair(meta.token0, meta.token1); + idx.entry((a, b, meta.protocol)) + .or_default() + .push(meta.clone()); + } + idx +} + +/// State the post-state simulator needs to run after a successful decode. +/// Cheap to clone (everything is `Arc`), so the pipeline holds one +/// `Arc` and dispatches per-event work without re-locking. +pub struct SimContext { + pub pool_registry: Arc>>, + pub token_index: Arc>, + pub snapshot_manager: Arc, + pub detector: BellmanFord, + /// Live per-pool analytical state (V3 sqrt + tick + liquidity, Curve A + + /// balances, Balancer balances + weights) populated by the engine at + /// bootstrap and refreshed on `PoolEvent` updates. Used by the V3 / + /// Balancer mempool sim path to call `predict_post_state_with_fallback` + /// without round-tripping through the pool registry RPC. + pub pool_states: PoolStateCache, + /// Cached `(registry_ptr, PairIndex)` so the second and following pending + /// swaps under the same registry generation lookup in O(1). The Mutex + /// guards rebuild only — the steady-state path is `lock + ptr_eq + read`. + pair_index_cache: Mutex)>>, +} + +impl SimContext { + pub fn new( + pool_registry: Arc>>, + token_index: Arc>, + snapshot_manager: Arc, + detector: BellmanFord, + pool_states: PoolStateCache, + ) -> Self { + Self { + pool_registry, + token_index, + snapshot_manager, + detector, + pool_states, + pair_index_cache: Mutex::new(None), + } + } + + /// Look up a pool by `(token_in, token_out, protocol)` in O(1). + /// + /// Rebuilds the pair index when the underlying `pool_registry` Arc has + /// been swapped (detected via pointer comparison). All lookups under a + /// single registry generation share one Arc. + fn lookup_pool( + &self, + token_in: Address, + token_out: Address, + protocol: ProtocolType, + ) -> Option { + let registry_guard = self.pool_registry.load(); + let registry_ptr = Arc::as_ptr(®istry_guard) as usize; + + let index = { + let mut cache = self + .pair_index_cache + .lock() + .expect("pair_index_cache poisoned"); + let stale = cache.as_ref().is_none_or(|(p, _)| *p != registry_ptr); + if stale { + let fresh = Arc::new(build_pair_index(®istry_guard)); + *cache = Some((registry_ptr, Arc::clone(&fresh))); + fresh + } else { + Arc::clone(&cache.as_ref().expect("populated above").1) + } + }; + + let (a, b) = canonical_pair(token_in, token_out); + index.get(&(a, b, protocol))?.first().cloned() + } +} + +/// Spawn the mempool decode pipeline as a tokio task. +/// +/// When `sim_ctx` is `Some`, decoded V2/Sushi swaps are run through the +/// analytical post-state simulator. When `None`, behaviour is identical +/// to the prior log-only version. +pub fn spawn_mempool_pipeline( + channels: Arc, + metrics: Arc, + sim_ctx: Option>, + mut shutdown: watch::Receiver, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut rx = channels.subscribe_pending_txs(); + info!( + target: "aether::mempool", + sim = sim_ctx.is_some(), + "mempool decode pipeline started" + ); + loop { + tokio::select! { + next = rx.recv() => match next { + Ok(event) => handle_event(&metrics, sim_ctx.as_ref(), event), + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + metrics.add_pending_pipeline_lagged(n); + warn!( + target: "aether::mempool", + lagged = n, + "decode pipeline lagged behind broadcast; events dropped" + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + info!(target: "aether::mempool", "broadcast closed; pipeline exiting"); + return; + } + }, + changed = shutdown.changed() => { + if changed.is_err() || *shutdown.borrow() { + info!(target: "aether::mempool", "shutdown signalled; pipeline exiting"); + return; + } + } + } + } + }) +} + +/// Decode one pending tx and update metrics + logs. +/// +/// Pulled out as a free function so unit tests can drive it without spawning +/// the full pipeline task. The post-state scan (graph clone + Bellman-Ford) +/// is dispatched onto tokio's blocking pool to keep its CPU cost off the +/// main runtime workers — the engine's 3 ms p99 detection budget cannot +/// share worker threads with a 3.8 MB-per-event clone under load. +fn handle_event( + metrics: &Arc, + sim_ctx: Option<&Arc>, + event: PendingTxEvent, +) { + let Some(to) = event.to else { + // Contract creations and other anonymous calls don't have a router + // to attribute to — bump a generic `no_to` failure and move on. + metrics.inc_pending_decode_errors("no_to"); + return; + }; + let router_label = format!("{:#x}", to); + + match decode_pending(to, &event.input) { + Ok(swap) => { + emit_decoded(metrics, &router_label, &swap, &event); + if let Some(ctx) = sim_ctx { + if !pre_sim_filter(metrics, ctx, &swap) { + return; + } + let metrics = Arc::clone(metrics); + let ctx = Arc::clone(ctx); + let swap = swap.clone(); + let router_label = router_label.clone(); + tokio::task::spawn_blocking(move || { + try_post_state_scan(&metrics, &ctx, &router_label, &swap); + }); + } + } + Err(err) => emit_failure(metrics, &router_label, &err), + } +} + +/// Drop a decoded swap before any sim work is scheduled when it would land +/// nowhere useful: self-swap, zero amount, or a (token, token, protocol) +/// triple absent from the live `pool_registry`. Bumps +/// `aether_mempool_filtered_total{reason}` and returns `false` on drop; +/// returns `true` to pass the swap through to the sim task. +/// +/// The pool-registry check is the load-bearing one — without it every +/// shitcoin V2 swap on mainnet would queue a `spawn_blocking` that clones +/// the live graph (~3.8 MB) only to bump +/// `pending_arb_sim_skipped{token_in_unknown}` and discard the work. +fn pre_sim_filter(metrics: &EngineMetrics, ctx: &SimContext, swap: &DecodedSwap) -> bool { + if swap.token_in == swap.token_out { + metrics.inc_mempool_filtered("same_token"); + info!( + target: "aether::mempool", + reason = "same_token", + protocol = ?swap.protocol, + token = %swap.token_in, + "FILTER DROP" + ); + return false; + } + if swap.amount_in.is_zero() { + metrics.inc_mempool_filtered("zero_amount"); + info!( + target: "aether::mempool", + reason = "zero_amount", + protocol = ?swap.protocol, + token_in = %swap.token_in, + token_out = %swap.token_out, + "FILTER DROP" + ); + return false; + } + let target_protocol = match decoder_protocol_to_type(swap.protocol) { + Some(p) => p, + // Decoder-side protocols with no analytical predictor (none today — + // all four decoded variants land here). Pass through so the sim + // task can bump `pending_arb_sim_skipped{protocol_unsupported}` + // without double-counting under `mempool_filtered_total`. + None => return true, + }; + if ctx + .lookup_pool(swap.token_in, swap.token_out, target_protocol) + .is_none() + { + metrics.inc_mempool_filtered("not_in_registry"); + info!( + target: "aether::mempool", + reason = "not_in_registry", + protocol = ?swap.protocol, + token_in = %swap.token_in, + token_out = %swap.token_out, + "FILTER DROP" + ); + return false; + } + info!( + target: "aether::mempool", + protocol = ?swap.protocol, + token_in = %swap.token_in, + token_out = %swap.token_out, + amount_in = %swap.amount_in, + "FILTER PASS" + ); + true +} + +/// Map the router decoder's `Protocol` (a parser-side enum) to the workspace +/// `ProtocolType` used in the pool registry. Returns `None` for protocols +/// the post-state simulator doesn't yet handle so callers can route those +/// through the existing `protocol_unsupported` skip path instead of the +/// mempool filter. +fn decoder_protocol_to_type(p: Protocol) -> Option { + match p { + Protocol::UniswapV2 => Some(ProtocolType::UniswapV2), + Protocol::SushiSwap => Some(ProtocolType::SushiSwap), + Protocol::UniswapV3 => Some(ProtocolType::UniswapV3), + Protocol::BalancerV2 => Some(ProtocolType::BalancerV2), + } +} + +fn emit_decoded( + metrics: &EngineMetrics, + router_label: &str, + swap: &DecodedSwap, + event: &PendingTxEvent, +) { + metrics.inc_pending_dex_tx(router_label, protocol_label(swap.protocol), true); + debug!( + target: "aether::mempool", + tx_hash = %event.tx_hash, + router = %router_label, + protocol = ?swap.protocol, + token_in = %swap.token_in, + token_out = %swap.token_out, + amount_in = %swap.amount_in, + fee_bps = swap.fee_bps, + "PENDING DEX SWAP decoded" + ); +} + +fn emit_failure(metrics: &EngineMetrics, router_label: &str, err: &DecodeError) { + let reason = decode_error_label(err); + metrics.inc_pending_dex_tx(router_label, "unknown", false); + metrics.inc_pending_decode_errors(reason); + debug!( + target: "aether::mempool", + router = %router_label, + reason, + error = %err, + "pending tx decode failed" + ); +} + +/// Try to run the V2/Sushi post-state simulation for a decoded swap. +/// +/// On any miss (unsupported protocol, missing pool, missing token index, +/// no graph edge, zero reserves) bumps +/// `aether_pending_arb_sim_skipped_total{reason}` and returns. On success, +/// every profitable cycle increments +/// `aether_pending_arb_candidates_total{router, profit_bucket}` and is +/// logged at `info` so a tail of the log is enough to verify the path. +fn try_post_state_scan( + metrics: &EngineMetrics, + ctx: &SimContext, + router_label: &str, + swap: &DecodedSwap, +) { + let target_protocol = match swap.protocol { + Protocol::UniswapV2 => ProtocolType::UniswapV2, + Protocol::SushiSwap => ProtocolType::SushiSwap, + Protocol::UniswapV3 => ProtocolType::UniswapV3, + Protocol::BalancerV2 => ProtocolType::BalancerV2, + }; + + let token_idx = ctx.token_index.load(); + let Some(in_idx) = token_idx.get_index(&swap.token_in) else { + metrics.inc_pending_arb_sim_skipped("token_in_unknown"); + return; + }; + let Some(out_idx) = token_idx.get_index(&swap.token_out) else { + metrics.inc_pending_arb_sim_skipped("token_out_unknown"); + return; + }; + + // O(1) pair lookup via the cached PairIndex. The cache rebuilds only + // when the underlying pool_registry Arc has been swapped, so steady-state + // cost is one Mutex acquire + one HashMap probe — independent of the + // number of registered pools. + let Some(meta) = ctx.lookup_pool(swap.token_in, swap.token_out, target_protocol) else { + metrics.inc_pending_arb_sim_skipped("pool_not_registered"); + return; + }; + let pool_id = meta.pool_id; + let fee_factor = meta.fee_factor(); + + // Snapshot the live graph and find the edge for this swap direction so + // we can read the current reserves. The reverse edge is updated in the + // same `update_edge_from_reserves` call against the cloned graph. + let snapshot = ctx.snapshot_manager.load_full(); + let edge_fwd = snapshot + .graph + .edges_from(in_idx) + .iter() + .find(|e| e.to == out_idx && e.pool_id == pool_id) + .cloned(); + let Some(edge_fwd) = edge_fwd else { + metrics.inc_pending_arb_sim_skipped("graph_edge_missing"); + return; + }; + if edge_fwd.reserve_in <= 0.0 || edge_fwd.reserve_out <= 0.0 { + metrics.inc_pending_arb_sim_skipped("reserves_zero"); + return; + } + + // Compute the post-state reserves the graph clone should adopt. V2 / + // Sushi reuse the inline constant-product formula because the predictor + // for those protocols intentionally lives outside `aether-pools`. V3 / + // Balancer route through the analytical post-state predictor in + // `aether-pools` and the result is mapped back onto graph-edge reserves + // so Bellman-Ford treats the two protocol families identically. + let (post_in, post_out) = match swap.protocol { + Protocol::UniswapV2 | Protocol::SushiSwap => { + // V2 constant-product: `dx` is the victim's amountIn — bound to + // f64 via `u256_to_f64_saturating` since the f64 mantissa is + // enough for token amount magnitudes seen on-chain + // (up to ~2^53 ≈ 9e15 units of the smallest decimal). + let dx = u256_to_f64_saturating(swap.amount_in); + predict_v2_post_state(edge_fwd.reserve_in, edge_fwd.reserve_out, dx, fee_factor) + } + Protocol::UniswapV3 | Protocol::BalancerV2 => { + let pool_addr = meta.pool_id.address; + let Some(state_arc) = ctx.pool_states.get(&pool_addr).map(|r| Arc::clone(r.value())) + else { + metrics.inc_pending_arb_sim_skipped("pool_state_missing"); + return; + }; + let post = predict_post_state_with_fallback( + &state_arc, + swap.token_in, + swap.amount_in, + |reason| metrics.inc_sim_evm_fallback(reason), + ); + let Some(unified) = post else { + metrics.inc_pending_arb_sim_skipped("predictor_low_confidence"); + return; + }; + let (pin, pout) = unified_to_post_reserves(swap.token_in, &meta, &unified); + if pin <= 0.0 || pout <= 0.0 { + metrics.inc_pending_arb_sim_skipped("post_state_invalid"); + return; + } + (pin, pout) + } + }; + + // Clone the graph and apply the post-state to both directions of the + // affected pair. update_edge_from_reserves is idempotent for a given + // (from, to, pool_id) tuple and is a no-op if the edge is missing. + let mut graph = snapshot.graph.clone(); + graph.update_edge_from_reserves(in_idx, out_idx, pool_id, post_in, post_out, fee_factor); + graph.update_edge_from_reserves(out_idx, in_idx, pool_id, post_out, post_in, fee_factor); + + let cycles = ctx + .detector + .detect_from_affected(&graph, &[in_idx, out_idx]); + let profitable: Vec<_> = cycles.into_iter().filter(|c| c.is_profitable()).collect(); + + if profitable.is_empty() { + metrics.inc_pending_arb_sim_skipped("no_profitable_cycle"); + return; + } + + for cycle in &profitable { + let bucket = profit_bucket(cycle.profit_factor()); + metrics.inc_pending_arb_candidates(router_label, bucket); + } + + info!( + target: "aether::mempool", + router = %router_label, + protocol = ?swap.protocol, + pool = %meta.pool_id.address, + token_in = %swap.token_in, + token_out = %swap.token_out, + candidates = profitable.len(), + best_profit_bps = (profitable[0].profit_factor() * 10_000.0) as i64, + "MEMPOOL ARB CANDIDATE" + ); +} + +/// Map a V3 / Balancer post-state into the (post_in, post_out) reserves the +/// price graph stores per edge. Curve cannot reach here — the router +/// decoder rejects every Curve calldata shape with `CurveUnsupported` +/// before the pipeline sees it — but the variant is matched explicitly so +/// new protocol families fail the build instead of silently routing to +/// reserves of `(0.0, 0.0)`. +/// +/// **V3 mapping.** The predictor returns `new_sqrt_price_x96`. The marginal +/// post-state spot price (token1 per token0) is `(sqrt / 2^96)^2`. The +/// graph's `update_edge_from_reserves` derives the edge weight as +/// `(reserve_out / reserve_in) * fee_factor`, so we set the synthetic pair +/// `(reserve_in, reserve_out) = (1.0, spot_price_post)` for the +/// `token0 → token1` direction and the inverse for the reverse direction. +/// `fee_factor` is applied at the graph layer, matching the bootstrap +/// path that originally seeded the V3 edge with `price * fee`. +/// +/// **Balancer mapping.** For equal-weight 2-token pools the rate equals +/// `balance_out / balance_in` — directly usable as graph reserves with the +/// pool's `fee_factor`. The predictor only returns `analytical = true` for +/// the equal-weight case (unequal weights surface a low-confidence flag +/// and the call short-circuits to the EVM fallback metric). +fn unified_to_post_reserves( + swap_token_in: Address, + meta: &PoolMetadata, + post: &UnifiedPostState, +) -> (f64, f64) { + match post { + UnifiedPostState::UniswapV3(v3) => { + const TWO_POW_96: f64 = 79_228_162_514_264_337_593_543_950_336.0; + let sqrt_f = u256_to_f64_saturating(v3.new_sqrt_price_x96); + let price_t1_per_t0 = (sqrt_f / TWO_POW_96).powi(2); + if price_t1_per_t0 <= 0.0 { + return (0.0, 0.0); + } + if swap_token_in == meta.token0 { + (1.0, price_t1_per_t0) + } else { + (1.0, 1.0 / price_t1_per_t0) + } + } + UnifiedPostState::Balancer(b) => { + let b0 = u256_to_f64_saturating(b.new_balance0); + let b1 = u256_to_f64_saturating(b.new_balance1); + if swap_token_in == meta.token0 { + (b0, b1) + } else { + (b1, b0) + } + } + UnifiedPostState::Curve(_) => (0.0, 0.0), + } +} + +/// Predict V2 reserves after a swap of `dx` of `reserve_in` for `reserve_out`. +/// +/// `fee_factor` is `(10_000 - fee_bps) / 10_000` (e.g. `0.997` for 30 bps). +/// Math: with effective input `dx_eff = dx * fee_factor`, the constant- +/// product invariant gives `dy = (dx_eff * y) / (x + dx_eff)`, then +/// `x' = x + dx`, `y' = y - dy`. Returns `(0.0, 0.0)` when inputs are +/// non-positive so callers can detect an invalid swap. +fn predict_v2_post_state( + reserve_in: f64, + reserve_out: f64, + dx: f64, + fee_factor: f64, +) -> (f64, f64) { + if reserve_in <= 0.0 || reserve_out <= 0.0 || dx <= 0.0 || fee_factor <= 0.0 { + return (0.0, 0.0); + } + let dx_eff = dx * fee_factor; + let dy = (dx_eff * reserve_out) / (reserve_in + dx_eff); + let post_in = reserve_in + dx; + // dy is mathematically < reserve_out for any finite dx, but clamp to + // a positive epsilon to defend against f64 catastrophic cancellation + // on very large dx near reserve depletion. + let post_out = (reserve_out - dy).max(1.0); + (post_in, post_out) +} + +/// Coarse profit bucket for the candidate metric. Bounded cardinality so +/// dashboards can sum across routers without label explosion. +fn profit_bucket(profit_factor: f64) -> &'static str { + let bps = profit_factor * 10_000.0; + if bps < 10.0 { + "lt_10bps" + } else if bps < 50.0 { + "10_50bps" + } else if bps < 200.0 { + "50_200bps" + } else { + "gt_200bps" + } +} + +/// Saturating U256 → f64. The price graph already stores reserves as f64, +/// and Bellman-Ford runs in f64 weight space, so feeding the simulator a +/// f64 amount is consistent with the rest of the detection path. +/// +/// **Precision contract.** f64 has a 53-bit mantissa (~9.0e15). Any U256 +/// up to 2^53 - 1 round-trips losslessly. Above that the conversion +/// truncates lower bits — for an 18-decimal token this means amounts up to +/// ~9 million whole tokens are exact, and arbitrarily large amounts cap at +/// f64::MAX without panicking. Real on-chain swap sizes sit comfortably +/// below the lossless bound; the saturating return value protects against +/// adversarial calldata inflating dx beyond 2^256 → +inf in the math +/// kernel below. +fn u256_to_f64_saturating(v: U256) -> f64 { + let limbs = v.as_limbs(); + let mut result = 0.0f64; + let mut scale = 1.0f64; + for limb in limbs.iter() { + result += (*limb as f64) * scale; + // 2^64 — multiplying out limbs in increasing significance. + scale *= 18_446_744_073_709_551_616.0; + } + if result.is_finite() { + result + } else { + f64::MAX + } +} + +fn protocol_label(p: Protocol) -> &'static str { + match p { + Protocol::UniswapV2 => "uniswap_v2", + Protocol::UniswapV3 => "uniswap_v3", + Protocol::SushiSwap => "sushiswap", + Protocol::BalancerV2 => "balancer_v2", + } +} + +fn decode_error_label(err: &DecodeError) -> &'static str { + match err { + DecodeError::TooShort => "too_short", + DecodeError::UnknownSelector { .. } => "unknown_selector", + DecodeError::AbiDecode(_) => "abi_decode", + DecodeError::EmptyPath => "empty_path", + DecodeError::CurveUnsupported(_) => "curve_unsupported", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use aether_pools::router_decoder::IUniswapV2Router02::swapExactTokensForTokensCall; + use alloy::primitives::{address, B256, U256}; + use alloy::sol_types::SolCall; + + fn pending_event(to: Option, input: Vec) -> PendingTxEvent { + PendingTxEvent { + tx_hash: B256::ZERO, + from: alloy::primitives::Address::ZERO, + to, + value: U256::ZERO, + input, + gas_price: 0, + } + } + + #[test] + fn protocol_label_is_stable() { + assert_eq!(protocol_label(Protocol::UniswapV2), "uniswap_v2"); + assert_eq!(protocol_label(Protocol::UniswapV3), "uniswap_v3"); + assert_eq!(protocol_label(Protocol::SushiSwap), "sushiswap"); + assert_eq!(protocol_label(Protocol::BalancerV2), "balancer_v2"); + } + + #[test] + fn decode_error_label_covers_every_variant() { + assert_eq!(decode_error_label(&DecodeError::TooShort), "too_short"); + assert_eq!( + decode_error_label(&DecodeError::UnknownSelector { selector: [0; 4] }), + "unknown_selector" + ); + assert_eq!( + decode_error_label(&DecodeError::AbiDecode("x".into())), + "abi_decode" + ); + assert_eq!(decode_error_label(&DecodeError::EmptyPath), "empty_path"); + assert_eq!( + decode_error_label(&DecodeError::CurveUnsupported(alloy::primitives::Address::ZERO)), + "curve_unsupported" + ); + } + + #[test] + fn handle_event_decoded_swap_does_not_panic() { + let metrics = Arc::new(EngineMetrics::new()); + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let calldata = swapExactTokensForTokensCall { + amountIn: U256::from(1_000u64), + amountOutMin: U256::from(900u64), + path: vec![weth, usdc], + to: alloy::primitives::Address::ZERO, + deadline: U256::ZERO, + } + .abi_encode(); + let to = address!("7a250d5630B4cF539739dF2C5dAcb4c659F2488D"); + handle_event(&metrics, None, pending_event(Some(to), calldata)); + } + + #[test] + fn handle_event_unknown_selector_does_not_panic() { + let metrics = Arc::new(EngineMetrics::new()); + let mut calldata = vec![0xde, 0xad, 0xbe, 0xef]; + calldata.extend(std::iter::repeat_n(0u8, 64)); + let to = address!("7a250d5630B4cF539739dF2C5dAcb4c659F2488D"); + handle_event(&metrics, None, pending_event(Some(to), calldata)); + } + + #[test] + fn handle_event_no_to_does_not_panic() { + let metrics = Arc::new(EngineMetrics::new()); + handle_event( + &metrics, + None, + pending_event(None, vec![0x12, 0x34, 0x56, 0x78]), + ); + } + + // ----- predict_v2_post_state ----- + + #[test] + fn predict_v2_zero_inputs_return_zero() { + assert_eq!(predict_v2_post_state(0.0, 1.0, 1.0, 0.997), (0.0, 0.0)); + assert_eq!(predict_v2_post_state(1.0, 0.0, 1.0, 0.997), (0.0, 0.0)); + assert_eq!(predict_v2_post_state(1.0, 1.0, 0.0, 0.997), (0.0, 0.0)); + assert_eq!(predict_v2_post_state(1.0, 1.0, 1.0, 0.0), (0.0, 0.0)); + } + + #[test] + fn predict_v2_small_swap_matches_constant_product() { + // x=1000, y=1000, dx=10, fee=0.3% -> dy = 10*0.997*1000/(1000+10*0.997) + // dy ≈ 9.871 + let (post_in, post_out) = predict_v2_post_state(1000.0, 1000.0, 10.0, 0.997); + assert!((post_in - 1010.0).abs() < 1e-9); + let expected_dy = (10.0 * 0.997 * 1000.0) / (1000.0 + 10.0 * 0.997); + assert!((post_out - (1000.0 - expected_dy)).abs() < 1e-9); + } + + #[test] + fn predict_v2_invariant_grows_by_fee() { + // The k = x*y product increases by the fee accrual after a swap. + let (post_in, post_out) = predict_v2_post_state(1000.0, 1000.0, 100.0, 0.997); + let k_before = 1000.0 * 1000.0; + let k_after = post_in * post_out; + assert!(k_after > k_before, "fee should increase k"); + } + + // ----- profit_bucket ----- + + #[test] + fn profit_bucket_boundaries() { + // 5 bps → < 10 + assert_eq!(profit_bucket(0.0005), "lt_10bps"); + // 25 bps + assert_eq!(profit_bucket(0.0025), "10_50bps"); + // 100 bps + assert_eq!(profit_bucket(0.0100), "50_200bps"); + // 500 bps + assert_eq!(profit_bucket(0.0500), "gt_200bps"); + // exactly on boundary goes to upper bucket + assert_eq!(profit_bucket(0.0010), "10_50bps"); + assert_eq!(profit_bucket(0.0050), "50_200bps"); + assert_eq!(profit_bucket(0.0200), "gt_200bps"); + } + + // ----- u256_to_f64_saturating ----- + + #[test] + fn u256_to_f64_small_value() { + assert!((u256_to_f64_saturating(U256::from(1_000_000u64)) - 1_000_000.0).abs() < 1.0); + } + + // ----- pre_sim_filter ----- + + /// Build an empty SimContext suitable for tests that exercise the filter + /// without needing real graph state — registry is empty, token index + /// empty, snapshot has a zero-vertex graph. Any `lookup_pool` returns + /// `None`, which is what the `not_in_registry` test wants anyway. + fn empty_sim_ctx() -> Arc { + use aether_pools::new_pool_state_cache; + use aether_state::price_graph::PriceGraph; + Arc::new(SimContext::new( + Arc::new(ArcSwap::from_pointee(HashMap::::new())), + Arc::new(ArcSwap::from_pointee(TokenIndex::default())), + Arc::new(SnapshotManager::new(PriceGraph::new(0))), + BellmanFord::new(3, 1_000), + new_pool_state_cache(), + )) + } + + fn fake_swap(protocol: Protocol, token_in: Address, token_out: Address, amount_in: U256) -> DecodedSwap { + DecodedSwap { + protocol, + router: Address::ZERO, + token_in, + token_out, + amount_in, + amount_out_min: U256::ZERO, + recipient: Address::ZERO, + fee_bps: 0, + path_extra: vec![], + } + } + + fn filtered_count(metrics: &EngineMetrics, reason: &str) -> u64 { + metrics.mempool_filtered_count(reason) + } + + #[test] + fn pre_sim_filter_drops_same_token_swaps() { + let metrics = EngineMetrics::new(); + let ctx = empty_sim_ctx(); + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let swap = fake_swap(Protocol::UniswapV2, weth, weth, U256::from(1u64)); + assert!(!pre_sim_filter(&metrics, &ctx, &swap)); + assert_eq!(filtered_count(&metrics, "same_token"), 1); + } + + #[test] + fn pre_sim_filter_drops_zero_amount() { + let metrics = EngineMetrics::new(); + let ctx = empty_sim_ctx(); + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let swap = fake_swap(Protocol::UniswapV2, weth, usdc, U256::ZERO); + assert!(!pre_sim_filter(&metrics, &ctx, &swap)); + assert_eq!(filtered_count(&metrics, "zero_amount"), 1); + } + + #[test] + fn pre_sim_filter_drops_pair_absent_from_registry() { + let metrics = EngineMetrics::new(); + let ctx = empty_sim_ctx(); + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let swap = fake_swap(Protocol::UniswapV2, weth, usdc, U256::from(1_000u64)); + assert!(!pre_sim_filter(&metrics, &ctx, &swap)); + assert_eq!(filtered_count(&metrics, "not_in_registry"), 1); + } + + #[test] + fn pre_sim_filter_drops_v3_and_balancer_when_pair_not_registered() { + // V3 / Balancer now route through the same registry-membership + // check as V2 / Sushi: if the (token_in, token_out, protocol) + // triple is absent from `pool_registry`, the filter drops the + // swap under `not_in_registry` so the spawn_blocking + 3.8 MB + // graph clone is never scheduled. + let metrics = EngineMetrics::new(); + let ctx = empty_sim_ctx(); + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + for proto in [Protocol::UniswapV3, Protocol::BalancerV2] { + let swap = fake_swap(proto, weth, usdc, U256::from(1_000u64)); + assert!(!pre_sim_filter(&metrics, &ctx, &swap)); + } + assert_eq!(filtered_count(&metrics, "not_in_registry"), 2); + assert_eq!(filtered_count(&metrics, "same_token"), 0); + assert_eq!(filtered_count(&metrics, "zero_amount"), 0); + } + + #[test] + fn decoder_protocol_to_type_maps_all_decoded_protocols() { + assert_eq!( + decoder_protocol_to_type(Protocol::UniswapV2), + Some(ProtocolType::UniswapV2) + ); + assert_eq!( + decoder_protocol_to_type(Protocol::SushiSwap), + Some(ProtocolType::SushiSwap) + ); + assert_eq!( + decoder_protocol_to_type(Protocol::UniswapV3), + Some(ProtocolType::UniswapV3) + ); + assert_eq!( + decoder_protocol_to_type(Protocol::BalancerV2), + Some(ProtocolType::BalancerV2) + ); + } +} diff --git a/crates/grpc-server/src/metrics.rs b/crates/grpc-server/src/metrics.rs index 7e4bd1a..17afc3a 100644 --- a/crates/grpc-server/src/metrics.rs +++ b/crates/grpc-server/src/metrics.rs @@ -53,6 +53,45 @@ pub struct EngineMetrics { /// the detector's local graph is wrong or the simulation reverted /// in a way that masked the failure cycle_gate_dropped_total: IntCounterVec, + /// Pending DEX-router txs forwarded by the mempool subscription, labelled + /// by router (raw address) and the decoded protocol family. The + /// `decoded` label distinguishes successful ABI parses from + /// `decode_failure` so dashboards can surface decoder gaps directly. + pending_dex_tx_total: IntCounterVec, + /// Reason-tagged decoder failure counter. Reasons match + /// `aether_pools::router_decoder::DecodeError` variants so a dashboard + /// drill-down points at the exact path that needs work next. + pending_decode_errors_total: IntCounterVec, + /// Profitable cycles found by the post-state mempool simulator, labelled + /// by router and a coarse profit bucket. Counts candidates only — these + /// are not validated arbs and never get submitted; they prove the + /// post-state pipeline produces non-empty output on real traffic. + pending_arb_candidates_total: IntCounterVec, + /// Reasons the post-state simulator skipped a decoded swap (no pool in + /// registry, missing token index, no graph edge, zero reserves, etc.). + /// Mirrors `pending_decode_errors_total` for the layer above the decoder. + pending_arb_sim_skipped_total: IntCounterVec, + /// Pending-tx broadcast events the decode pipeline failed to receive + /// because it lagged behind the producer (tokio broadcast `Lagged(n)`). + /// Bumped by the `n` returned by the broadcast receiver so dashboards + /// can show *how many events* were dropped, not just how many lag + /// events fired. Sustained non-zero growth = pipeline is the bottleneck; + /// either widen the channel or shed mempool sources. + pending_pipeline_lagged_total: IntCounter, + /// Reason-tagged counter for pending swaps the pipeline drops *after* + /// the router decoder succeeds but *before* the post-state simulator + /// gets a chance to run. Distinct from `pending_arb_sim_skipped_total` + /// (which fires once the sim task has started and discovered a missing + /// graph edge / zero reserves / etc.). Bumping here short-circuits the + /// 3.8 MB graph clone the sim does, so it is cheap to be aggressive. + /// + /// Stable label set: + /// - `not_in_registry` — neither (token_in, token_out, protocol) + /// tuple is present in `pool_registry` + /// - `same_token` — decoder returned a self-swap (likely + /// fee-on-transfer wrapper) + /// - `zero_amount` — `amount_in == 0` (no profit possible) + mempool_filtered_total: IntCounterVec, } impl EngineMetrics { @@ -72,7 +111,9 @@ impl EngineMetrics { "aether_simulation_latency_ms", "EVM simulation latency in milliseconds", ) - .buckets(vec![0.5, 1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0]), + .buckets(vec![ + 0.5, 1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, + ]), ) .expect("aether_simulation_latency_ms histogram"); let cycles_detected = IntCounter::new( @@ -80,21 +121,17 @@ impl EngineMetrics { "Total negative cycles detected", ) .expect("aether_cycles_detected_total counter"); - let simulations_run = IntCounter::new( - "aether_simulations_run_total", - "Total simulations executed", - ) - .expect("aether_simulations_run_total counter"); + let simulations_run = + IntCounter::new("aether_simulations_run_total", "Total simulations executed") + .expect("aether_simulations_run_total counter"); let arbs_published = IntCounter::new( "aether_arbs_published_total", "Total validated arbs published", ) .expect("aether_arbs_published_total counter"); - let blocks_processed = IntCounter::new( - "aether_blocks_processed_total", - "Total blocks processed", - ) - .expect("aether_blocks_processed_total counter"); + let blocks_processed = + IntCounter::new("aether_blocks_processed_total", "Total blocks processed") + .expect("aether_blocks_processed_total counter"); let decode_errors = IntCounterVec::new( Opts::new( "aether_decode_errors_total", @@ -119,6 +156,51 @@ impl EngineMetrics { &["reason"], ) .expect("aether_cycle_gate_dropped_total counter vec"); + let pending_dex_tx_total = IntCounterVec::new( + Opts::new( + "aether_pending_dex_tx_total", + "Pending DEX-router txs forwarded by the mempool subscription, by router and decoded protocol", + ), + &["router", "protocol", "decoded"], + ) + .expect("aether_pending_dex_tx_total counter vec"); + let pending_decode_errors_total = IntCounterVec::new( + Opts::new( + "aether_pending_decode_errors_total", + "Pending-tx calldata decoder failures, by reason", + ), + &["reason"], + ) + .expect("aether_pending_decode_errors_total counter vec"); + let pending_arb_candidates_total = IntCounterVec::new( + Opts::new( + "aether_pending_arb_candidates_total", + "Profitable cycles found by the post-state mempool simulator, by router and profit bucket", + ), + &["router", "profit_bucket"], + ) + .expect("aether_pending_arb_candidates_total counter vec"); + let pending_arb_sim_skipped_total = IntCounterVec::new( + Opts::new( + "aether_pending_arb_sim_skipped_total", + "Decoded swaps the post-state simulator skipped, by reason", + ), + &["reason"], + ) + .expect("aether_pending_arb_sim_skipped_total counter vec"); + let pending_pipeline_lagged_total = IntCounter::new( + "aether_pending_pipeline_lagged_total", + "Pending-tx events dropped because the decode pipeline lagged behind the broadcast", + ) + .expect("aether_pending_pipeline_lagged_total counter"); + let mempool_filtered_total = IntCounterVec::new( + Opts::new( + "aether_mempool_filtered_total", + "Decoded pending swaps dropped before the post-state simulator runs, by reason", + ), + &["reason"], + ) + .expect("aether_mempool_filtered_total counter vec"); registry .register(Box::new(detection_latency_ms.clone())) @@ -147,6 +229,24 @@ impl EngineMetrics { registry .register(Box::new(cycle_gate_dropped_total.clone())) .expect("register aether_cycle_gate_dropped_total"); + registry + .register(Box::new(pending_dex_tx_total.clone())) + .expect("register aether_pending_dex_tx_total"); + registry + .register(Box::new(pending_decode_errors_total.clone())) + .expect("register aether_pending_decode_errors_total"); + registry + .register(Box::new(pending_arb_candidates_total.clone())) + .expect("register aether_pending_arb_candidates_total"); + registry + .register(Box::new(pending_arb_sim_skipped_total.clone())) + .expect("register aether_pending_arb_sim_skipped_total"); + registry + .register(Box::new(pending_pipeline_lagged_total.clone())) + .expect("register aether_pending_pipeline_lagged_total"); + registry + .register(Box::new(mempool_filtered_total.clone())) + .expect("register aether_mempool_filtered_total"); Self { registry, @@ -159,6 +259,12 @@ impl EngineMetrics { decode_errors, sim_evm_fallback_total, cycle_gate_dropped_total, + pending_dex_tx_total, + pending_decode_errors_total, + pending_arb_candidates_total, + pending_arb_sim_skipped_total, + pending_pipeline_lagged_total, + mempool_filtered_total, } } @@ -248,6 +354,69 @@ impl EngineMetrics { &self.registry } + /// Bump `aether_pending_dex_tx_total{router, protocol, decoded}` for a + /// pending DEX-router tx the mempool source forwarded. `protocol` is + /// `unknown` when decoding failed; `decoded` is `"true"` or `"false"`. + pub fn inc_pending_dex_tx(&self, router: &str, protocol: &str, decoded: bool) { + self.pending_dex_tx_total + .with_label_values(&[router, protocol, if decoded { "true" } else { "false" }]) + .inc(); + } + + /// Bump `aether_pending_decode_errors_total{reason="..."}`. Reasons + /// should be a small fixed set (`too_short`, `unknown_selector`, + /// `abi_decode`, `empty_path`) so dashboards can rely on stable labels. + pub fn inc_pending_decode_errors(&self, reason: &str) { + self.pending_decode_errors_total + .with_label_values(&[reason]) + .inc(); + } + + /// Bump `aether_pending_arb_candidates_total{router, profit_bucket}`. + /// Buckets are coarse (`<10bps`, `10-50bps`, `50-200bps`, `>200bps`) so + /// the cardinality stays bounded. + pub fn inc_pending_arb_candidates(&self, router: &str, profit_bucket: &str) { + self.pending_arb_candidates_total + .with_label_values(&[router, profit_bucket]) + .inc(); + } + + /// Bump `aether_pending_arb_sim_skipped_total{reason="..."}`. + pub fn inc_pending_arb_sim_skipped(&self, reason: &str) { + self.pending_arb_sim_skipped_total + .with_label_values(&[reason]) + .inc(); + } + + /// Add `n` to `aether_pending_pipeline_lagged_total`. Pass the count + /// returned by `broadcast::error::RecvError::Lagged(n)` so the metric + /// reflects events dropped, not lag events fired. + pub fn add_pending_pipeline_lagged(&self, n: u64) { + if n > 0 { + self.pending_pipeline_lagged_total.inc_by(n); + } + } + + /// Bump `aether_mempool_filtered_total{reason="..."}` for a decoded + /// pending swap the pipeline rejects before any sim work is scheduled. + /// See the field doc on `mempool_filtered_total` for the stable label + /// set. + pub fn inc_mempool_filtered(&self, reason: &str) { + self.mempool_filtered_total + .with_label_values(&[reason]) + .inc(); + } + + /// Read the current value of `aether_mempool_filtered_total{reason}`. + /// Public so tests in the `aether-rust` bin (a separate crate from the + /// `aether-grpc-server` lib) can assert filter behaviour without + /// re-implementing Prometheus text parsing. + pub fn mempool_filtered_count(&self, reason: &str) -> u64 { + self.mempool_filtered_total + .with_label_values(&[reason]) + .get() + } + /// Render the registered metrics in Prometheus text exposition format. /// `pub(crate)` so sibling modules (`provider::tests`) can assert on /// rendered counter values without exposing the whole registry. diff --git a/crates/ingestion/Cargo.toml b/crates/ingestion/Cargo.toml index f8ac163..6a56fc2 100644 --- a/crates/ingestion/Cargo.toml +++ b/crates/ingestion/Cargo.toml @@ -13,3 +13,5 @@ dashmap = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_yml = { workspace = true } +async-trait = "0.1" +futures = "0.3" diff --git a/crates/ingestion/src/lib.rs b/crates/ingestion/src/lib.rs index ac2569f..686e5a5 100644 --- a/crates/ingestion/src/lib.rs +++ b/crates/ingestion/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; -pub mod node_pool; pub mod event_decoder; +pub mod mempool; +pub mod node_pool; pub mod subscription; diff --git a/crates/ingestion/src/mempool.rs b/crates/ingestion/src/mempool.rs new file mode 100644 index 0000000..4e09f2f --- /dev/null +++ b/crates/ingestion/src/mempool.rs @@ -0,0 +1,345 @@ +//! Mempool tracking — pending-transaction subscription layer. +//! +//! This module subscribes to a node's pending-tx stream and fans out +//! [`PendingTxEvent`]s through the existing [`EventChannels`] broadcast. +//! Today the only supported source is Alchemy's `alchemy_pendingTransactions` +//! WebSocket method, which is the lowest-friction free option for plumbing +//! validation. A `MempoolSource` trait isolates that choice so future paid +//! feeds (Chainbound Fiber gRPC, bloXroute, self-hosted Reth `txpool` IPC) +//! can be added without touching downstream consumers. +//! +//! The subscription is **opt-in** via the `MEMPOOL_TRACKING` env var. With it +//! unset the module compiles in but never runs, so binaries on `main` keep +//! their current startup shape. +//! +//! # Privacy and scope +//! +//! - We filter by `toAddress` so only txs aimed at the configured DEX +//! router set reach the broadcast channel; mempool decoding lives in a +//! downstream module (`aether-pools::router_decoder`) and is not invoked +//! here. This module is purely transport. +//! - No bundle is constructed, no submission is performed. The Go executor +//! never sees these events. The rule "log-only until further notice" +//! exists to keep the testing scaffold isolated from execution risk. + +use std::sync::Arc; +use std::time::Duration; + +use alloy::consensus::Transaction as TransactionTrait; +use alloy::primitives::Address; +use alloy::providers::{Provider, ProviderBuilder, WsConnect}; +use alloy::rpc::types::Transaction; +use futures::StreamExt; +use serde_json::json; +use tokio::sync::watch; +use tracing::{debug, error, info, warn}; + +use crate::subscription::{EventChannels, PendingTxEvent}; + +/// Default reconnect backoff after a transport error. +const RECONNECT_BACKOFF: Duration = Duration::from_secs(2); + +/// Returns `true` when `MEMPOOL_TRACKING` is set to a truthy value. +/// +/// Accepted truthy values: `1`, `true`, `yes`, `on` (case-insensitive). Any +/// other value (including unset) disables the subscription, so default +/// behaviour on `main` is unchanged. +pub fn is_enabled() -> bool { + is_enabled_from_str(&std::env::var("MEMPOOL_TRACKING").unwrap_or_default()) +} + +/// Pure parser used by [`is_enabled`]; split out so unit tests can exercise the +/// truthy-string rules without mutating process-wide env (which is `unsafe` on +/// edition 2024 and race-prone under parallel `cargo test`). +fn is_enabled_from_str(value: &str) -> bool { + matches!( + value.to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) +} + +/// Configuration for the Alchemy pending-tx subscription. +#[derive(Debug, Clone)] +pub struct AlchemyMempoolConfig { + /// Full WebSocket URL including the `wss://` scheme and Alchemy API key. + /// Reuse the same `ETH_RPC_URL` value when it already points at Alchemy + /// over WebSocket; otherwise pass an explicit `ETH_WS_URL`. + pub ws_url: String, + /// Filter set: only txs whose `to` field is in this list are emitted. + /// Empty means "no filter" — emit every pending tx Alchemy sees, which + /// is firehose-grade and not recommended for production wiring. + pub router_filter: Vec
, +} + +impl AlchemyMempoolConfig { + /// Build the JSON params for the `alchemy_pendingTransactions` subscribe + /// call, applying the configured `toAddress` filter when non-empty. + fn subscribe_params(&self) -> serde_json::Value { + if self.router_filter.is_empty() { + json!(["alchemy_pendingTransactions"]) + } else { + let to_addresses: Vec = self + .router_filter + .iter() + .map(|a| format!("{:#x}", a)) + .collect(); + json!([ + "alchemy_pendingTransactions", + { "toAddress": to_addresses } + ]) + } + } +} + +/// Trait for any source that produces a stream of [`PendingTxEvent`]s. +/// +/// Implementations own their own reconnection / backoff logic and dispatch +/// directly to [`EventChannels::dispatch_pending_tx`]. Returning from `run` +/// indicates the source has shut down; callers may restart it. +#[async_trait::async_trait] +pub trait MempoolSource: Send + Sync { + /// Run the subscription loop until shutdown is signalled. + async fn run(&self, channels: Arc, shutdown: watch::Receiver); + + /// Human-readable identifier for logs / metrics. + fn name(&self) -> &'static str; +} + +/// Alchemy `alchemy_pendingTransactions` WebSocket subscription. +pub struct AlchemyMempool { + config: AlchemyMempoolConfig, +} + +impl AlchemyMempool { + pub fn new(config: AlchemyMempoolConfig) -> Self { + warn_if_non_alchemy_endpoint(&config.ws_url); + Self { config } + } + + /// One subscription attempt: connect, subscribe, drain, return on error. + /// Errors are returned to the outer reconnect loop in [`run`]. + async fn subscribe_once( + &self, + channels: &EventChannels, + shutdown: &mut watch::Receiver, + ) -> Result<(), Box> { + let ws = WsConnect::new(self.config.ws_url.clone()); + let provider = ProviderBuilder::new().connect_ws(ws).await?; + + let params = self.config.subscribe_params(); + info!( + target: "aether::mempool", + params = %params, + "subscribing to alchemy_pendingTransactions" + ); + + // alchemy_pendingTransactions is a non-standard subscription; route + // through the raw `eth_subscribe` path with the method-specific + // params object. + let sub = provider + .subscribe::<_, Transaction>(params) + .await?; + let mut stream = sub.into_stream(); + + loop { + tokio::select! { + next = stream.next() => { + match next { + Some(tx) => self.forward(channels, tx), + None => { + warn!( + target: "aether::mempool", + "alchemy pending stream closed by remote; will reconnect" + ); + return Err("stream closed".into()); + } + } + } + changed = shutdown.changed() => { + if changed.is_err() || *shutdown.borrow() { + info!( + target: "aether::mempool", + "shutdown signalled; exiting alchemy mempool subscription" + ); + return Ok(()); + } + } + } + } + } + + /// Map an alloy [`Transaction`] into the workspace [`PendingTxEvent`] and + /// dispatch it. Lossy by design — any field we don't surface today (gas + /// limit, type, access list) is recoverable via the tx hash later. + fn forward(&self, channels: &EventChannels, tx: Transaction) { + let from = tx.inner.signer(); + let envelope = tx.as_ref(); + let to: Option
= envelope.kind().to().copied(); + let event = PendingTxEvent { + tx_hash: *envelope.tx_hash(), + from, + to, + value: envelope.value(), + input: envelope.input().to_vec(), + gas_price: envelope.max_fee_per_gas(), + }; + debug!( + target: "aether::mempool", + tx_hash = %event.tx_hash, + to = ?event.to, + input_len = event.input.len(), + "pending tx forwarded" + ); + channels.dispatch_pending_tx(event); + } +} + +#[async_trait::async_trait] +impl MempoolSource for AlchemyMempool { + fn name(&self) -> &'static str { + "alchemy" + } + + async fn run(&self, channels: Arc, mut shutdown: watch::Receiver) { + let mut backoff = RECONNECT_BACKOFF; + loop { + if *shutdown.borrow() { + info!(target: "aether::mempool", "alchemy source shutting down"); + return; + } + + match self.subscribe_once(&channels, &mut shutdown).await { + Ok(()) => return, // clean shutdown + Err(e) => { + error!( + target: "aether::mempool", + error = %e, + backoff_secs = backoff.as_secs(), + "alchemy mempool subscribe failed; reconnecting" + ); + tokio::select! { + _ = tokio::time::sleep(backoff) => {} + changed = shutdown.changed() => { + if changed.is_err() || *shutdown.borrow() { + return; + } + } + } + // Exponential bounded backoff (cap 30 s) — we do not want + // to give up but also do not want to hammer the endpoint. + backoff = (backoff * 2).min(Duration::from_secs(30)); + } + } + } + } +} + +/// Warn loudly when the configured WebSocket endpoint is unlikely to be +/// Alchemy. `alchemy_pendingTransactions` is an Alchemy-proprietary +/// subscribe method — Reth, QuickNode, Infura and self-hosted Geth accept +/// the WS upgrade but never deliver events, so this case otherwise produces +/// zero metrics with no obvious failure mode. Heuristic only: matches the +/// hostnames Alchemy issues for mainnet/sepolia. +fn warn_if_non_alchemy_endpoint(ws_url: &str) { + let lower = ws_url.to_ascii_lowercase(); + let alchemy_markers = ["alchemy.com", "g.alchemy.com", "alchemyapi.io"]; + if alchemy_markers.iter().any(|m| lower.contains(m)) { + return; + } + warn!( + target: "aether::mempool", + ws_url = %ws_url, + "MEMPOOL_TRACKING enabled but WS endpoint does not look like Alchemy; \ + alchemy_pendingTransactions is Alchemy-only and will return no events \ + on Reth/QuickNode/Infura/Geth — see .env.example" + ); +} + +/// Default DEX router addresses on Ethereum mainnet that Aether watches. +/// +/// Curated for the testing scaffold: UniswapV2 Router02, UniswapV3 +/// SwapRouter, UniswapV3 SwapRouter02, SushiSwap Router02, Curve Router, +/// Balancer Vault. 1inch v6 AggregationRouter is intentionally absent — its +/// multi-step calldata does not decode against a simple `sol!` ABI and would +/// inflate the decode-failure counter without yielding usable hits in the +/// scaffold; revisit once the decoder has the multi-encode path. +pub fn default_router_addresses() -> Vec
{ + use alloy::primitives::address; + vec![ + address!("7a250d5630B4cF539739dF2C5dAcb4c659F2488D"), // UniswapV2 Router02 + address!("E592427A0AEce92De3Edee1F18E0157C05861564"), // UniswapV3 SwapRouter + address!("68b3465833fb72A70ecDF485E0e4C7bD8665Fc45"), // UniswapV3 SwapRouter02 + address!("d9e1cE17f2641f24aE83637ab66a2cca9C378B9F"), // SushiSwap Router02 + address!("99a58482BD75cbab83b27EC03CA68fF489b5788f"), // Curve Router + address!("BA12222222228d8Ba445958a75a0704d566BF2C8"), // Balancer V2 Vault + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_enabled_respects_truthy_strings() { + for v in ["1", "true", "TRUE", "True", "yes", "YES", "on", "On"] { + assert!(is_enabled_from_str(v), "{v} should enable"); + } + for v in ["", "0", "false", "no", "off", "anything", " 1 "] { + assert!(!is_enabled_from_str(v), "{v} should not enable"); + } + } + + #[test] + fn subscribe_params_omit_filter_when_empty() { + let cfg = AlchemyMempoolConfig { + ws_url: "wss://example".into(), + router_filter: vec![], + }; + let v = cfg.subscribe_params(); + assert_eq!(v, json!(["alchemy_pendingTransactions"])); + } + + #[test] + fn subscribe_params_apply_lowercase_addresses() { + use alloy::primitives::address; + let cfg = AlchemyMempoolConfig { + ws_url: "wss://example".into(), + router_filter: vec![address!("7a250d5630B4cF539739dF2C5dAcb4c659F2488D")], + }; + let v = cfg.subscribe_params(); + let expected = json!([ + "alchemy_pendingTransactions", + { + "toAddress": ["0x7a250d5630b4cf539739df2c5dacb4c659f2488d"] + } + ]); + assert_eq!(v, expected); + } + + #[test] + fn alchemy_marker_detection() { + // Marker present → no warn (cannot directly assert log absence here, + // but exercise both branches of the heuristic). + for url in [ + "wss://eth-mainnet.g.alchemy.com/v2/key", + "wss://eth-mainnet.alchemyapi.io/v2/key", + "wss://eth.alchemy.com/v2/key", + ] { + // Ensure no panic on any path; explicit assertion lives in the + // `warn_if_non_alchemy_endpoint` heuristic comment. + warn_if_non_alchemy_endpoint(url); + } + warn_if_non_alchemy_endpoint("wss://reth.local:8546"); + warn_if_non_alchemy_endpoint("wss://eth-mainnet.quiknode.pro/key"); + } + + #[test] + fn default_router_set_is_non_empty_and_uniqued() { + let v = default_router_addresses(); + assert!(!v.is_empty()); + let mut sorted = v.clone(); + sorted.sort(); + sorted.dedup(); + assert_eq!(sorted.len(), v.len(), "duplicate addresses in default set"); + } +} diff --git a/crates/pools/Cargo.toml b/crates/pools/Cargo.toml index d7bce4d..ffad571 100644 --- a/crates/pools/Cargo.toml +++ b/crates/pools/Cargo.toml @@ -11,3 +11,4 @@ toml = { workspace = true } tracing = { workspace = true } ruint = { workspace = true } dashmap = { workspace = true } +thiserror = { workspace = true } diff --git a/crates/pools/src/lib.rs b/crates/pools/src/lib.rs index 6706d22..1b1d401 100644 --- a/crates/pools/src/lib.rs +++ b/crates/pools/src/lib.rs @@ -1,10 +1,11 @@ -pub mod uniswap_v2; -pub mod uniswap_v3; -pub mod sushiswap; -pub mod curve; pub mod balancer; pub mod bancor; +pub mod curve; pub mod registry; +pub mod router_decoder; +pub mod sushiswap; +pub mod uniswap_v2; +pub mod uniswap_v3; use std::sync::Arc; diff --git a/crates/pools/src/router_decoder.rs b/crates/pools/src/router_decoder.rs new file mode 100644 index 0000000..003ffeb --- /dev/null +++ b/crates/pools/src/router_decoder.rs @@ -0,0 +1,691 @@ +//! Pending-tx calldata decoder for known DEX routers. +//! +//! Maps a raw `(to, calldata)` pair from a pending transaction to a +//! protocol-tagged [`DecodedSwap`] when the call selector matches one of the +//! supported router shapes. Anything we don't recognise returns +//! [`DecodeError::UnknownSelector`] so the caller can bump a decode-failure +//! metric and move on without taking the engine down. +//! +//! ## Coverage in this scaffold +//! +//! - **UniswapV2 Router02** and **SushiSwap Router02** share an ABI: we decode +//! the family of `swapExact*` / `swap*ForExact*` calls, extracting the +//! first hop only (the rest of the path is recoverable downstream). +//! - **UniswapV3 SwapRouter** and **SwapRouter02**: `exactInputSingle` and +//! `exactInput` (multi-hop bytes-encoded path). +//! - **Balancer V2 Vault**: `swap(SingleSwap, FundManagement, limit, deadline)` +//! single-pool variant. +//! +//! Out of scope (returns `UnknownSelector`): +//! +//! - Curve router — its `exchange` / `exchange_multiple` shape varies per +//! pool registry version and would inflate the decoder without yielding +//! reliable hits in the testing scaffold. +//! - 1inch v6 AggregationRouter — multi-encoded calldata; deferred so the +//! `decode_failure` counter can quantify the gap before we invest. +//! - `multicall` / `execute` wrappers (UniV3 SwapRouter `multicall`) — +//! handled in a follow-up that recursively peels nested calldata. +//! +//! Every decoded swap is paired with a [`Protocol`] tag so downstream +//! simulators can route to the right post-state computation. + +use alloy::primitives::{address, Address, U256}; +use alloy::sol; +use alloy::sol_types::SolCall; + +/// Curve Router addresses we knowingly cannot decode yet. +/// +/// Curve's `exchange` / `exchange_multiple` selectors vary per pool registry +/// version and the calldata shape is too divergent to handle in the scaffold. +/// We still want pending txs to these routers in the address filter (so the +/// firehose stays representative of real router traffic), but unknown-selector +/// errors against them otherwise drown out genuine decoder gaps in +/// other protocols. Marking them up-front lets the caller bump a dedicated +/// `curve_unsupported` reason instead of `unknown_selector`. +const CURVE_ROUTERS: &[Address] = + &[address!("99a58482BD75cbab83b27EC03CA68fF489b5788f")]; + +/// Returns `true` when `router` is a known Curve router that the decoder +/// cannot parse. Caller should short-circuit with a dedicated metric. +pub fn is_unsupported_curve_router(router: Address) -> bool { + CURVE_ROUTERS.contains(&router) +} + +/// Routers that share the UniswapV2 ABI but should be tagged as SushiSwap. +/// +/// SushiSwap forked Router02 verbatim, so its calldata is byte-for-byte +/// indistinguishable from UniswapV2's at the selector layer; the only signal +/// the decoder has is the `to` address. Without this dispatch every Sushi +/// pending tx falls into the UniswapV2 metric label and the registry lookup +/// hunts in the wrong protocol's pool set. +/// +/// Update this list when adding a new Sushi-flavoured router (e.g. SushiX, +/// Sushi RouteProcessor) — adding the address here is the only change +/// required for correct protocol attribution downstream. +const SUSHISWAP_ROUTERS: &[Address] = + &[address!("d9e1cE17f2641f24aE83637ab66a2cca9C378B9F")]; + +fn router_to_v2_protocol(router: Address) -> Protocol { + if SUSHISWAP_ROUTERS.contains(&router) { + Protocol::SushiSwap + } else { + Protocol::UniswapV2 + } +} + +/// Protocol tag attached to every successful decode. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Protocol { + UniswapV2, + UniswapV3, + SushiSwap, + BalancerV2, +} + +/// Minimal swap shape produced by the decoder. +/// +/// Multi-hop paths (V2 chained, V3 `exactInput`) collapse to first-hop +/// fields here; the full path is preserved in `path_extra` for callers that +/// need it (post-state simulation reapplies the full hop list). +#[derive(Debug, Clone)] +pub struct DecodedSwap { + pub protocol: Protocol, + /// Router address the tx is calling — useful for metric labelling. + pub router: Address, + /// First-hop input token. + pub token_in: Address, + /// First-hop output token (or final token for V3 `exactInputSingle`). + pub token_out: Address, + /// Amount of `token_in` the user is committing. + pub amount_in: U256, + /// Minimum `token_out` required by user for slippage protection. + pub amount_out_min: U256, + /// Recipient (`to`) the swap will pay out to. + pub recipient: Address, + /// Pool fee in hundredths-of-a-bp (V3) or `0` for non-V3 protocols. + pub fee_bps: u32, + /// Remaining path tokens past the first hop, in order. Empty for + /// single-hop swaps. + pub path_extra: Vec
, +} + +/// Reasons a pending tx might fail to decode. Caller maps these to a +/// `decode_failure` counter; the variants are intentionally fine-grained so +/// dashboards can show *why* coverage is low. +#[derive(Debug, Clone, thiserror::Error)] +pub enum DecodeError { + #[error("calldata too short for any selector")] + TooShort, + #[error("unknown selector {selector:?}")] + UnknownSelector { selector: [u8; 4] }, + #[error("known selector but ABI decode failed: {0}")] + AbiDecode(String), + #[error("path is empty or malformed")] + EmptyPath, + /// Recipient is a Curve router but the decoder does not yet support + /// Curve's `exchange` / `exchange_multiple` shapes. Distinct from + /// `UnknownSelector` so the `curve_unsupported` metric reason isolates + /// the known gap from genuine unmapped selectors elsewhere. + #[error("curve router {0} not yet supported by decoder")] + CurveUnsupported(Address), +} + +// ── Router ABIs ── +// +// Selectors are computed at compile time via the `sol!` macro. Only the +// methods we actually decode are listed; the rest are intentionally absent +// so an unsupported variant fails the selector lookup loudly. + +sol! { + /// UniswapV2 / SushiSwap Router02 surface — they share the ABI. + /// Includes the fee-on-transfer variants because meme-token routing + /// dominates the live mempool and the non-FOT shapes alone produce a + /// near-zero decode hit rate against real Alchemy traffic. + #[allow(missing_docs)] + interface IUniswapV2Router02 { + function swapExactTokensForTokens(uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline) external; + function swapTokensForExactTokens(uint256 amountOut, uint256 amountInMax, address[] path, address to, uint256 deadline) external; + function swapExactETHForTokens(uint256 amountOutMin, address[] path, address to, uint256 deadline) external payable; + function swapTokensForExactETH(uint256 amountOut, uint256 amountInMax, address[] path, address to, uint256 deadline) external; + function swapExactTokensForETH(uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline) external; + function swapETHForExactTokens(uint256 amountOut, address[] path, address to, uint256 deadline) external payable; + function swapExactTokensForTokensSupportingFeeOnTransferTokens(uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline) external; + function swapExactETHForTokensSupportingFeeOnTransferTokens(uint256 amountOutMin, address[] path, address to, uint256 deadline) external payable; + function swapExactTokensForETHSupportingFeeOnTransferTokens(uint256 amountIn, uint256 amountOutMin, address[] path, address to, uint256 deadline) external; + } + + /// UniswapV3 SwapRouter (deadline) and SwapRouter02 (no deadline) flavours. + /// The structs carry distinct selectors because of the deadline field + /// shift, so we declare both and try each. + #[allow(missing_docs)] + interface IUniswapV3Router { + struct ExactInputSingleParams { + address tokenIn; + address tokenOut; + uint24 fee; + address recipient; + uint256 deadline; + uint256 amountIn; + uint256 amountOutMinimum; + uint160 sqrtPriceLimitX96; + } + struct ExactInputSingleParams02 { + address tokenIn; + address tokenOut; + uint24 fee; + address recipient; + uint256 amountIn; + uint256 amountOutMinimum; + uint160 sqrtPriceLimitX96; + } + struct ExactInputParams { + bytes path; + address recipient; + uint256 deadline; + uint256 amountIn; + uint256 amountOutMinimum; + } + struct ExactInputParams02 { + bytes path; + address recipient; + uint256 amountIn; + uint256 amountOutMinimum; + } + function exactInputSingle(ExactInputSingleParams params) external payable returns (uint256); + function exactInputSingle02(ExactInputSingleParams02 params) external payable returns (uint256); + function exactInput(ExactInputParams params) external payable returns (uint256); + function exactInput02(ExactInputParams02 params) external payable returns (uint256); + } + + /// Balancer V2 Vault `swap` for the SingleSwap shape. + #[allow(missing_docs)] + interface IBalancerVault { + struct SingleSwap { + bytes32 poolId; + uint8 kind; + address assetIn; + address assetOut; + uint256 amount; + bytes userData; + } + struct FundManagement { + address sender; + bool fromInternalBalance; + address recipient; + bool toInternalBalance; + } + function swap(SingleSwap singleSwap, FundManagement funds, uint256 limit, uint256 deadline) external payable returns (uint256); + } +} + +/// Decode a pending tx's `(to, calldata)` into a [`DecodedSwap`]. +/// +/// `to` is required: anonymous calls (contract creation) always return +/// [`DecodeError::TooShort`]. The caller is expected to filter by router +/// address before calling — this function does not validate that the `to` +/// matches a known router; it only consumes the selector + payload. +pub fn decode_pending(to: Address, calldata: &[u8]) -> Result { + if calldata.len() < 4 { + return Err(DecodeError::TooShort); + } + // Short-circuit known Curve routers before selector dispatch so they map + // to a dedicated reason instead of inflating `unknown_selector`. They + // remain in the address filter because dropping them would skew the + // firehose's protocol mix away from real router traffic. + if is_unsupported_curve_router(to) { + return Err(DecodeError::CurveUnsupported(to)); + } + let selector: [u8; 4] = calldata[0..4].try_into().expect("4 bytes by check above"); + + // ── UniV2 / Sushi family ── + if let Some(swap) = try_uni_v2_family(selector, calldata, to)? { + return Ok(swap); + } + // ── UniV3 SwapRouter / SwapRouter02 ── + if let Some(swap) = try_uni_v3_family(selector, calldata, to)? { + return Ok(swap); + } + // ── Balancer V2 Vault ── + if let Some(swap) = try_balancer(selector, calldata, to)? { + return Ok(swap); + } + + Err(DecodeError::UnknownSelector { selector }) +} + +fn try_uni_v2_family( + selector: [u8; 4], + calldata: &[u8], + router: Address, +) -> Result, DecodeError> { + use IUniswapV2Router02::*; + if selector == swapExactTokensForTokensCall::SELECTOR { + let c = swapExactTokensForTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + c.amountIn, + c.amountOutMin, + c.to, + )?)); + } + if selector == swapTokensForExactTokensCall::SELECTOR { + let c = swapTokensForExactTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + c.amountInMax, + c.amountOut, + c.to, + )?)); + } + if selector == swapExactETHForTokensCall::SELECTOR { + let c = swapExactETHForTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + U256::ZERO, // amount_in carried as msg.value, unknown from calldata alone + c.amountOutMin, + c.to, + )?)); + } + if selector == swapExactTokensForETHCall::SELECTOR { + let c = swapExactTokensForETHCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + c.amountIn, + c.amountOutMin, + c.to, + )?)); + } + if selector == swapTokensForExactETHCall::SELECTOR { + let c = swapTokensForExactETHCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + c.amountInMax, + c.amountOut, + c.to, + )?)); + } + if selector == swapETHForExactTokensCall::SELECTOR { + let c = swapETHForExactTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + U256::ZERO, + c.amountOut, + c.to, + )?)); + } + if selector == swapExactTokensForTokensSupportingFeeOnTransferTokensCall::SELECTOR { + let c = swapExactTokensForTokensSupportingFeeOnTransferTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + c.amountIn, + c.amountOutMin, + c.to, + )?)); + } + if selector == swapExactETHForTokensSupportingFeeOnTransferTokensCall::SELECTOR { + let c = swapExactETHForTokensSupportingFeeOnTransferTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + U256::ZERO, + c.amountOutMin, + c.to, + )?)); + } + if selector == swapExactTokensForETHSupportingFeeOnTransferTokensCall::SELECTOR { + let c = swapExactTokensForETHSupportingFeeOnTransferTokensCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(decode_v2_call( + router, + c.path, + c.amountIn, + c.amountOutMin, + c.to, + )?)); + } + Ok(None) +} + +fn decode_v2_call( + router: Address, + path: Vec
, + amount_in: U256, + amount_out_min: U256, + to: Address, +) -> Result { + if path.len() < 2 { + return Err(DecodeError::EmptyPath); + } + let token_in = path[0]; + let token_out = path[1]; + let path_extra = path.iter().skip(2).copied().collect(); + Ok(DecodedSwap { + protocol: router_to_v2_protocol(router), + router, + token_in, + token_out, + amount_in, + amount_out_min, + recipient: to, + fee_bps: 0, + path_extra, + }) +} + +fn try_uni_v3_family( + selector: [u8; 4], + calldata: &[u8], + router: Address, +) -> Result, DecodeError> { + use IUniswapV3Router::*; + if selector == exactInputSingleCall::SELECTOR { + let c = exactInputSingleCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(DecodedSwap { + protocol: Protocol::UniswapV3, + router, + token_in: c.params.tokenIn, + token_out: c.params.tokenOut, + amount_in: c.params.amountIn, + amount_out_min: c.params.amountOutMinimum, + recipient: c.params.recipient, + fee_bps: c.params.fee.to::(), + path_extra: vec![], + })); + } + if selector == exactInputSingle02Call::SELECTOR { + let c = exactInputSingle02Call::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(DecodedSwap { + protocol: Protocol::UniswapV3, + router, + token_in: c.params.tokenIn, + token_out: c.params.tokenOut, + amount_in: c.params.amountIn, + amount_out_min: c.params.amountOutMinimum, + recipient: c.params.recipient, + fee_bps: c.params.fee.to::(), + path_extra: vec![], + })); + } + if selector == exactInputCall::SELECTOR { + let c = exactInputCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + let (token_in, token_out, fee, extras) = parse_v3_path(&c.params.path)?; + return Ok(Some(DecodedSwap { + protocol: Protocol::UniswapV3, + router, + token_in, + token_out, + amount_in: c.params.amountIn, + amount_out_min: c.params.amountOutMinimum, + recipient: c.params.recipient, + fee_bps: fee, + path_extra: extras, + })); + } + if selector == exactInput02Call::SELECTOR { + let c = exactInput02Call::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + let (token_in, token_out, fee, extras) = parse_v3_path(&c.params.path)?; + return Ok(Some(DecodedSwap { + protocol: Protocol::UniswapV3, + router, + token_in, + token_out, + amount_in: c.params.amountIn, + amount_out_min: c.params.amountOutMinimum, + recipient: c.params.recipient, + fee_bps: fee, + path_extra: extras, + })); + } + Ok(None) +} + +/// Decode a UniV3 packed path: `address(20) | fee(3) | address(20) | fee(3) | ... | address(20)`. +/// +/// Returns `(token_in, token_out_first, fee_first_hop, [remaining tokens])`. +fn parse_v3_path(path: &[u8]) -> Result<(Address, Address, u32, Vec
), DecodeError> { + const ADDR_LEN: usize = 20; + const FEE_LEN: usize = 3; + const HOP_LEN: usize = ADDR_LEN + FEE_LEN; + + // Minimum well-formed path: token_in | fee | token_out = 43 bytes. + if path.len() < HOP_LEN + ADDR_LEN { + return Err(DecodeError::EmptyPath); + } + + let token_in = Address::from_slice(&path[0..ADDR_LEN]); + let fee_bytes = &path[ADDR_LEN..ADDR_LEN + FEE_LEN]; + let fee = (u32::from(fee_bytes[0]) << 16) + | (u32::from(fee_bytes[1]) << 8) + | u32::from(fee_bytes[2]); + + let mut tokens: Vec
= Vec::new(); + let mut cursor = ADDR_LEN + FEE_LEN; + while cursor + ADDR_LEN <= path.len() { + tokens.push(Address::from_slice(&path[cursor..cursor + ADDR_LEN])); + cursor += ADDR_LEN; + // Skip the next fee chunk if there are more tokens to follow. + if cursor + FEE_LEN < path.len() { + cursor += FEE_LEN; + } + } + if tokens.is_empty() { + return Err(DecodeError::EmptyPath); + } + let token_out = tokens.remove(0); + Ok((token_in, token_out, fee, tokens)) +} + +fn try_balancer( + selector: [u8; 4], + calldata: &[u8], + router: Address, +) -> Result, DecodeError> { + use IBalancerVault::*; + if selector == swapCall::SELECTOR { + let c = swapCall::abi_decode(calldata) + .map_err(|e| DecodeError::AbiDecode(e.to_string()))?; + return Ok(Some(DecodedSwap { + protocol: Protocol::BalancerV2, + router, + token_in: c.singleSwap.assetIn, + token_out: c.singleSwap.assetOut, + amount_in: c.singleSwap.amount, + amount_out_min: c.limit, + recipient: c.funds.recipient, + fee_bps: 0, + path_extra: vec![], + })); + } + Ok(None) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{address, U256}; + + #[test] + fn too_short_calldata_rejected() { + let to = Address::ZERO; + let err = decode_pending(to, &[0x12, 0x34]).unwrap_err(); + assert!(matches!(err, DecodeError::TooShort)); + } + + #[test] + fn unknown_selector_returned_for_random_bytes() { + let to = Address::ZERO; + // 4-byte selector + 32 bytes of payload. + let mut data = vec![0xde, 0xad, 0xbe, 0xef]; + data.extend(std::iter::repeat_n(0u8, 32)); + let err = decode_pending(to, &data).unwrap_err(); + match err { + DecodeError::UnknownSelector { selector } => { + assert_eq!(selector, [0xde, 0xad, 0xbe, 0xef]); + } + other => panic!("expected UnknownSelector, got {:?}", other), + } + } + + #[test] + fn decode_uniswap_v2_swap_exact_tokens_for_tokens() { + use IUniswapV2Router02::swapExactTokensForTokensCall; + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let to_recipient = address!("000000000000000000000000000000000000dEaD"); + let amount_in = U256::from(1_000_000_000_000_000_000u128); // 1 ETH + let amount_out_min = U256::from(2_500_000_000u128); // 2500 USDC (6dp) + let path = vec![weth, usdc]; + let deadline = U256::from(99_999_999_999u64); + + let calldata = swapExactTokensForTokensCall { + amountIn: amount_in, + amountOutMin: amount_out_min, + path: path.clone(), + to: to_recipient, + deadline, + } + .abi_encode(); + + let router = address!("7a250d5630B4cF539739dF2C5dAcb4c659F2488D"); + let decoded = decode_pending(router, &calldata).expect("should decode"); + assert_eq!(decoded.protocol, Protocol::UniswapV2); + assert_eq!(decoded.router, router); + assert_eq!(decoded.token_in, weth); + assert_eq!(decoded.token_out, usdc); + assert_eq!(decoded.amount_in, amount_in); + assert_eq!(decoded.amount_out_min, amount_out_min); + assert_eq!(decoded.recipient, to_recipient); + assert_eq!(decoded.fee_bps, 0); + assert!(decoded.path_extra.is_empty()); + } + + #[test] + fn decode_uniswap_v3_exact_input_single_with_deadline() { + use IUniswapV3Router::{exactInputSingleCall, ExactInputSingleParams}; + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let recip = address!("000000000000000000000000000000000000dEaD"); + + let params = ExactInputSingleParams { + tokenIn: weth, + tokenOut: usdc, + fee: alloy::primitives::aliases::U24::from(3000), // 30 bps + recipient: recip, + deadline: U256::from(99u64), + amountIn: U256::from(2_000u64), + amountOutMinimum: U256::from(1_000u64), + sqrtPriceLimitX96: alloy::primitives::U160::ZERO, + }; + let calldata = exactInputSingleCall { params }.abi_encode(); + let router = address!("E592427A0AEce92De3Edee1F18E0157C05861564"); + let decoded = decode_pending(router, &calldata).expect("should decode"); + assert_eq!(decoded.protocol, Protocol::UniswapV3); + assert_eq!(decoded.token_in, weth); + assert_eq!(decoded.token_out, usdc); + assert_eq!(decoded.fee_bps, 3000); + assert_eq!(decoded.amount_in, U256::from(2_000u64)); + } + + #[test] + fn parse_v3_path_extracts_first_hop_and_extras() { + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let dai = address!("6B175474E89094C44Da98b954EedeAC495271d0F"); + + // Build path: WETH | 3000 | USDC | 500 | DAI (43 + 3 + 20 = 66 bytes). + let mut path = Vec::new(); + path.extend_from_slice(weth.as_slice()); + path.extend_from_slice(&[0x00, 0x0b, 0xb8]); // 3000 + path.extend_from_slice(usdc.as_slice()); + path.extend_from_slice(&[0x00, 0x01, 0xf4]); // 500 + path.extend_from_slice(dai.as_slice()); + + let (token_in, token_out, fee, extras) = parse_v3_path(&path).expect("parse"); + assert_eq!(token_in, weth); + assert_eq!(token_out, usdc); + assert_eq!(fee, 3000); + assert_eq!(extras, vec![dai]); + } + + #[test] + fn parse_v3_path_rejects_too_short() { + let res = parse_v3_path(&[0u8; 10]); + assert!(matches!(res, Err(DecodeError::EmptyPath))); + } + + #[test] + fn decode_sushiswap_router_tagged_as_sushi_not_uni_v2() { + use IUniswapV2Router02::swapExactTokensForTokensCall; + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let calldata = swapExactTokensForTokensCall { + amountIn: U256::from(1_000u64), + amountOutMin: U256::from(900u64), + path: vec![weth, usdc], + to: Address::ZERO, + deadline: U256::ZERO, + } + .abi_encode(); + let sushi_router = address!("d9e1cE17f2641f24aE83637ab66a2cca9C378B9F"); + let decoded = decode_pending(sushi_router, &calldata).expect("decode"); + assert_eq!( + decoded.protocol, + Protocol::SushiSwap, + "Sushi Router02 must dispatch to Protocol::SushiSwap so registry \ + lookups hit the Sushi pool set, not UniV2's" + ); + assert_eq!(decoded.router, sushi_router); + } + + #[test] + fn decode_uni_v2_router_still_tagged_as_uni_v2() { + use IUniswapV2Router02::swapExactTokensForTokensCall; + let weth = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); + let usdc = address!("A0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); + let calldata = swapExactTokensForTokensCall { + amountIn: U256::from(1u64), + amountOutMin: U256::from(0u64), + path: vec![weth, usdc], + to: Address::ZERO, + deadline: U256::ZERO, + } + .abi_encode(); + let uni_v2_router = address!("7a250d5630B4cF539739dF2C5dAcb4c659F2488D"); + let decoded = decode_pending(uni_v2_router, &calldata).expect("decode"); + assert_eq!(decoded.protocol, Protocol::UniswapV2); + } + + #[test] + fn empty_v2_path_rejected() { + use IUniswapV2Router02::swapExactTokensForTokensCall; + let calldata = swapExactTokensForTokensCall { + amountIn: U256::from(1u64), + amountOutMin: U256::from(0u64), + path: vec![], // intentionally empty + to: Address::ZERO, + deadline: U256::ZERO, + } + .abi_encode(); + let err = decode_pending(Address::ZERO, &calldata).unwrap_err(); + assert!(matches!(err, DecodeError::EmptyPath)); + } +} diff --git a/deploy/docker/grafana/dashboards/mempool.json b/deploy/docker/grafana/dashboards/mempool.json new file mode 100644 index 0000000..785c33e --- /dev/null +++ b/deploy/docker/grafana/dashboards/mempool.json @@ -0,0 +1,200 @@ +{ + "uid": "aether-mempool", + "title": "Aether \u2014 Mempool (testing scaffold)", + "tags": [ + "aether", + "mempool" + ], + "timezone": "browser", + "schemaVersion": 39, + "version": 1, + "refresh": "10s", + "time": { + "from": "now-1h", + "to": "now" + }, + "panels": [ + { + "id": 1, + "type": "stat", + "title": "Pending DEX txs (1m rate)", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "sum(rate(aether_pending_dex_tx_total{job=~\"aether-rust|aether-host-rust\"}[1m]))", + "legendFormat": "tx/s" + } + ], + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "" + }, + "colorMode": "value", + "graphMode": "area", + "textMode": "value_and_name" + }, + "gridPos": { + "x": 0, + "y": 0, + "w": 6, + "h": 6 + } + }, + { + "id": 2, + "type": "stat", + "title": "Decode success rate", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "100 * sum(rate(aether_pending_dex_tx_total{job=~\"aether-rust|aether-host-rust\",decoded=\"true\"}[5m])) / clamp_min(sum(rate(aether_pending_dex_tx_total{job=~\"aether-rust|aether-host-rust\"}[5m])), 1e-9)", + "legendFormat": "%" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100 + } + }, + "gridPos": { + "x": 6, + "y": 0, + "w": 6, + "h": 6 + } + }, + { + "id": 5, + "type": "timeseries", + "title": "Pending DEX tx rate by protocol", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "sum by (protocol) (rate(aether_pending_dex_tx_total{job=~\"aether-rust|aether-host-rust\"}[1m]))", + "legendFormat": "{{protocol}}" + } + ], + "gridPos": { + "x": 0, + "y": 6, + "w": 12, + "h": 9 + } + }, + { + "id": 6, + "type": "timeseries", + "title": "Decoder failures by reason", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "sum by (reason) (rate(aether_pending_decode_errors_total{job=~\"aether-rust|aether-host-rust\"}[1m]))", + "legendFormat": "{{reason}}" + } + ], + "gridPos": { + "x": 12, + "y": 6, + "w": 12, + "h": 9 + } + }, + { + "id": 8, + "type": "stat", + "title": "Arb candidates (1m rate)", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "sum(rate(aether_pending_arb_candidates_total{job=~\"aether-rust|aether-host-rust\"}[1m]))", + "legendFormat": "candidates/s" + } + ], + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "" + }, + "colorMode": "value", + "graphMode": "area" + }, + "gridPos": { + "x": 0, + "y": 24, + "w": 6, + "h": 6 + } + }, + { + "id": 9, + "type": "timeseries", + "title": "Arb candidates by profit bucket", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "sum by (profit_bucket) (rate(aether_pending_arb_candidates_total{job=~\"aether-rust|aether-host-rust\"}[1m]))", + "legendFormat": "{{profit_bucket}}" + } + ], + "gridPos": { + "x": 6, + "y": 24, + "w": 9, + "h": 9 + } + }, + { + "id": 10, + "type": "timeseries", + "title": "Sim skipped by reason", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "targets": [ + { + "refId": "A", + "expr": "sum by (reason) (rate(aether_pending_arb_sim_skipped_total{job=~\"aether-rust|aether-host-rust\"}[1m]))", + "legendFormat": "{{reason}}" + } + ], + "gridPos": { + "x": 15, + "y": 24, + "w": 9, + "h": 9 + } + } + ] +} \ No newline at end of file diff --git a/deploy/docker/prometheus.yml b/deploy/docker/prometheus.yml index a94c998..07b5393 100644 --- a/deploy/docker/prometheus.yml +++ b/deploy/docker/prometheus.yml @@ -22,3 +22,16 @@ scrape_configs: - job_name: "alertmanager" static_configs: - targets: ["alertmanager:9093"] + # Local-dev scrape targets when binaries run on the host while only the + # observability stack is in compose. host.docker.internal resolves from + # inside the container to the Docker Desktop bridge gateway. Failures + # here are expected in pure-compose deployments and harmless. + - job_name: "aether-host-rust" + static_configs: + - targets: ["host.docker.internal:9092"] + # The monitor binary defaults METRICS_PORT to 9090 (cmd/monitor/metrics.go); + # scrape that port so the host-side run is reachable without an explicit + # METRICS_PORT override. Mempool / MEV-Share counters land on this scrape. + - job_name: "aether-host-monitor" + static_configs: + - targets: ["host.docker.internal:9090"] diff --git a/scripts/mempool_capture.sh b/scripts/mempool_capture.sh new file mode 100755 index 0000000..7ae5f2b --- /dev/null +++ b/scripts/mempool_capture.sh @@ -0,0 +1,250 @@ +#!/usr/bin/env bash +# +# mempool_capture.sh — record a longer live-mainnet run of the mempool path +# under reports/mempool-stage1-proof/. Boots aether-rust with WS subscriptions +# (newHeads + alchemy_pendingTransactions), scrapes /metrics at the midpoint +# and at the end, parses the log into CSVs, and writes a summary.md with +# counters + sample tx hashes verifiable on Etherscan. +# +# Companion of scripts/mempool_smoke.sh — same env + boot, longer window, +# more artefacts, intended to be committed as proof of stage 1. +# +# Usage: +# ./scripts/mempool_capture.sh # 600 s window (default) +# DURATION=300 ./scripts/mempool_capture.sh +# +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT" + +if [ -f .env ]; then + set -a + # shellcheck disable=SC1091 + . ./.env + set +a +fi + +if [ -z "${ALCHEMY_API_KEY:-}" ]; then + echo "ERROR: ALCHEMY_API_KEY unset (looked in env + .env)" >&2 + exit 2 +fi + +DURATION="${DURATION:-600}" +HALF=$((DURATION / 2)) +METRICS_PORT="${RUST_METRICS_PORT:-9092}" + +export MEMPOOL_TRACKING=1 +export MEMPOOL_WS_URL="wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY}" +export ETH_RPC_URL="${ETH_RPC_URL:-wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY}}" +export RUST_METRICS_PORT="$METRICS_PORT" +# Keep mempool at debug for full tx hash trail in the log; everything else info. +export RUST_LOG="${RUST_LOG:-info,aether=info,aether::mempool=debug}" +unset DATABASE_URL || true + +OUT_DIR="$ROOT/reports/mempool-stage1-proof" +mkdir -p "$OUT_DIR" +LOG="$OUT_DIR/02_engine.log" +M_MID="$OUT_DIR/03_metrics_${HALF}s.txt" +M_END="$OUT_DIR/04_metrics_${DURATION}s.txt" +ENV_FILE="$OUT_DIR/01_env.txt" + +BIN="target/release/aether-rust" +if [ ! -x "$BIN" ]; then + echo "ERROR: missing $BIN — run: cargo build --release -p aether-grpc-server" >&2 + exit 2 +fi + +# Redacted env dump — show key prefix/suffix only. +{ + echo "## Capture environment" + echo "binary: $BIN" + echo "ALCHEMY_API_KEY: ${ALCHEMY_API_KEY:0:4}...${ALCHEMY_API_KEY: -4} (len=${#ALCHEMY_API_KEY})" + echo "ETH_RPC_URL: ${ETH_RPC_URL%${ALCHEMY_API_KEY}}" + echo "MEMPOOL_WS_URL: ${MEMPOOL_WS_URL%${ALCHEMY_API_KEY}}" + echo "MEMPOOL_TRACKING: $MEMPOOL_TRACKING" + echo "RUST_LOG: $RUST_LOG" + echo "metrics port: $METRICS_PORT" + echo "duration: ${DURATION}s" + echo "started at: $(date -u +%Y-%m-%dT%H:%M:%SZ)" +} > "$ENV_FILE" + +echo "==> capture starting (${DURATION}s) → $OUT_DIR" +"$BIN" >"$LOG" 2>&1 & +PID=$! +echo "==> engine pid=$PID" + +cleanup() { + if kill -0 "$PID" 2>/dev/null; then + kill "$PID" 2>/dev/null || true + sleep 1 + kill -9 "$PID" 2>/dev/null || true + fi +} +trap cleanup EXIT + +# Wait for /metrics to come up. +for i in $(seq 1 30); do + if curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >/dev/null 2>&1; then + echo "==> /metrics live after ${i}s" + break + fi + if ! kill -0 "$PID" 2>/dev/null; then + echo "ERROR: engine exited during boot — see $LOG" >&2 + tail -30 "$LOG" >&2 + exit 3 + fi + sleep 1 +done + +echo "==> running for ${DURATION}s, scrape at +${HALF}s and +${DURATION}s" +sleep "$HALF" +curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >"$M_MID" +echo "==> mid-run metrics scraped" +sleep $((DURATION - HALF)) +curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >"$M_END" +echo "==> end-run metrics scraped" + +kill "$PID" 2>/dev/null || true +wait "$PID" 2>/dev/null || true + +# Strip ANSI from the log once so all parsers see plain text. +PLAIN_LOG="$OUT_DIR/02_engine.log.plain" +perl -pe 's/\e\[[0-9;]*m//g' "$LOG" > "$PLAIN_LOG" + +# Parse pending DEX swaps into CSV. +DEX_CSV="$OUT_DIR/05_pending_dex_txs.csv" +{ + echo "ts,tx_hash,router,protocol,token_in,token_out,amount_in,fee_bps" + # Addresses are emitted in EIP-55 mixed case; the regex must tolerate + # both cases or it truncates at the first uppercase byte and the CSV + # ends up with two-character "token_in" values. Use [[:xdigit:]] so the + # parser is locale-stable. + grep "PENDING DEX SWAP decoded" "$PLAIN_LOG" \ + | sed -nE 's/^([^ ]+).*tx_hash=(0x[[:xdigit:]]+).*router=(0x[[:xdigit:]]+).*protocol=([A-Za-z0-9]+).*token_in=(0x[[:xdigit:]]+).*token_out=(0x[[:xdigit:]]+).*amount_in=([0-9]+).*fee_bps=([0-9]+).*/\1,\2,\3,\4,\5,\6,\7,\8/p' +} > "$DEX_CSV" + +# Parse filter drops + decode errors into a "reasons" CSV by scraping end metrics. +DROPS_CSV="$OUT_DIR/06_filter_drops.csv" +{ + echo "metric,label_reason,count" + awk ' + /^aether_mempool_filtered_total\{reason=/ { + match($0, /reason="[^"]+"/); r=substr($0,RSTART+8,RLENGTH-9); + print "aether_mempool_filtered_total," r "," $NF + } + /^aether_pending_decode_errors_total\{reason=/ { + match($0, /reason="[^"]+"/); r=substr($0,RSTART+8,RLENGTH-9); + print "aether_pending_decode_errors_total," r "," $NF + } + /^aether_pending_arb_sim_skipped_total\{reason=/ { + match($0, /reason="[^"]+"/); r=substr($0,RSTART+8,RLENGTH-9); + print "aether_pending_arb_sim_skipped_total," r "," $NF + } + ' "$M_END" +} > "$DROPS_CSV" + +# Headline counters. +extract() { + awk -v m="$1" '$1 ~ "^"m"(\\{|$)" { sum += $NF } END { print sum+0 }' "$M_END" +} +DEX=$(extract aether_pending_dex_tx_total) +DEC_ERR=$(extract aether_pending_decode_errors_total) +FILT=$(extract aether_mempool_filtered_total) +LAG=$(extract aether_pending_pipeline_lagged_total) +SKP=$(extract aether_pending_arb_sim_skipped_total) +CAN=$(extract aether_pending_arb_candidates_total) +BLOCKS=$(grep -c "Bellman-Ford detection complete" "$PLAIN_LOG" || true) +DECODED_SAMPLES=$(wc -l < "$DEX_CSV" | awk '{print $1-1}') + +# Pull a few real tx hashes for Etherscan cross-check. +SAMPLE_HASHES=$(awk -F, 'NR>1 && NR<=4 {print $2}' "$DEX_CSV") + +# WS engagement evidence. +WS_LINES=$(grep -E "transport=WebSocket|Subscriptions active|subscribing to alchemy_pendingTransactions" "$PLAIN_LOG" | head -5 || true) + +# Block heights observed. +BLOCK_HEIGHTS=$(grep -oE "block_number=[0-9]+|provider connected block=[0-9]+" "$PLAIN_LOG" \ + | grep -oE "[0-9]+" | sort -u) +FIRST_BLOCK=$(echo "$BLOCK_HEIGHTS" | head -1) +LAST_BLOCK=$(echo "$BLOCK_HEIGHTS" | tail -1) +BLOCK_COUNT=$(echo "$BLOCK_HEIGHTS" | wc -l | awk '{print $1}') + +SUMMARY="$OUT_DIR/07_summary.md" +{ + echo "# Mempool stage 1 — live mainnet capture" + echo + echo "**Date:** $(date -u +%Y-%m-%d) (UTC)" + echo "**Duration:** ${DURATION}s" + echo "**Branch:** \`feat/mempool-tracking-scaffold\` (PR #118)" + echo + echo "## Verdict" + echo + if [ "$DEX" -gt 0 ] && [ "$BLOCKS" -gt 0 ]; then + echo "**PASS** — both WS subscriptions delivered live mainnet events end-to-end." + else + echo "**FAIL** — see counters below + log." + fi + echo + echo "## WS subscriptions engaged" + echo + echo '```' + echo "$WS_LINES" + echo '```' + echo + echo "## Counters at +${DURATION}s" + echo + echo '| metric | value |' + echo '|---|---|' + echo "| aether_pending_dex_tx_total | $DEX |" + echo "| aether_pending_decode_errors_total | $DEC_ERR |" + echo "| aether_mempool_filtered_total | $FILT |" + echo "| aether_pending_pipeline_lagged_total | $LAG |" + echo "| aether_pending_arb_sim_skipped_total | $SKP |" + echo "| aether_pending_arb_candidates_total | $CAN |" + echo "| detection cycles run | $BLOCKS |" + echo "| decoded swap samples in CSV | $DECODED_SAMPLES |" + echo + echo "## Block heights observed (verify on https://etherscan.io/block/)" + echo + echo '```' + echo "$BLOCK_HEIGHTS" + echo '```' + echo + echo "first: $FIRST_BLOCK, last: $LAST_BLOCK, total unique: $BLOCK_COUNT" + echo + echo "## Sample pending tx hashes (verify on https://etherscan.io/tx/)" + echo + if [ -n "$SAMPLE_HASHES" ]; then + echo '```' + echo "$SAMPLE_HASHES" + echo '```' + else + echo "(no decoded swaps in this window — see CSV for raw forwards)" + fi + echo + echo "## Filter drop breakdown (from 06_filter_drops.csv)" + echo + echo '```' + cat "$DROPS_CSV" + echo '```' + echo + echo "## Files in this directory" + echo + echo '| file | purpose |' + echo '|---|---|' + echo "| 01_env.txt | redacted env vars used for capture |" + echo "| 02_engine.log | full engine stdout/stderr (with ANSI) |" + echo "| 02_engine.log.plain | ANSI-stripped copy for grep tooling |" + echo "| 03_metrics_${HALF}s.txt | /metrics scrape at midpoint |" + echo "| 04_metrics_${DURATION}s.txt | /metrics scrape at end |" + echo "| 05_pending_dex_txs.csv | parsed decoded swaps (ts, tx_hash, router, protocol, ...) |" + echo "| 06_filter_drops.csv | drop counts by metric + reason |" + echo "| 07_summary.md | this file |" +} > "$SUMMARY" + +echo +echo "==> capture done" +echo "==> summary: $SUMMARY" +echo +cat "$SUMMARY" diff --git a/scripts/mempool_smoke.sh b/scripts/mempool_smoke.sh new file mode 100755 index 0000000..9c9399f --- /dev/null +++ b/scripts/mempool_smoke.sh @@ -0,0 +1,157 @@ +#!/usr/bin/env bash +# +# mempool_smoke.sh — verify the Alchemy public-mempool path actually receives +# events on whatever tier the configured ALCHEMY_API_KEY belongs to. +# +# Boots aether-rust with MEMPOOL_TRACKING=1, lets the WS subscription run for +# DURATION seconds, scrapes /metrics twice (mid + end), then kills the binary +# and prints a verdict. +# +# PASS → at least one pending DEX tx forwarded by the decoder +# (aether_pending_dex_tx_total > 0) +# FAIL → zero events seen during the window +# +# No Postgres / Anvil / executor needed — engine boots with NoopLedger when +# DATABASE_URL is unset, and we just want to read the mempool counters. +# +# Usage: +# ./scripts/mempool_smoke.sh # 60s window, default +# DURATION=30 ./scripts/mempool_smoke.sh +# +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT" + +# Load .env so ALCHEMY_API_KEY / ETH_RPC_URL are available. +if [ -f .env ]; then + set -a + # shellcheck disable=SC1091 + . ./.env + set +a +fi + +if [ -z "${ALCHEMY_API_KEY:-}" ]; then + echo "ERROR: ALCHEMY_API_KEY unset (looked in env + .env)" >&2 + exit 2 +fi + +DURATION="${DURATION:-60}" +METRICS_PORT="${RUST_METRICS_PORT:-9092}" + +# Force the WS URL from the API key so we don't accidentally inherit an HTTPS +# ETH_RPC_URL (alchemy mempool needs wss://). +export MEMPOOL_TRACKING=1 +export MEMPOOL_WS_URL="wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY}" +export RUST_METRICS_PORT="$METRICS_PORT" +export RUST_LOG="${RUST_LOG:-info,aether=info,aether::mempool=debug}" +# Keep ledger off the smoke — NoopLedger path. Anything previously set wins. +unset DATABASE_URL || true + +LOG_DIR="$ROOT/build/mempool-smoke" +mkdir -p "$LOG_DIR" +LOG="$LOG_DIR/engine.log" +M30="$LOG_DIR/metrics_30s.txt" +M60="$LOG_DIR/metrics_${DURATION}s.txt" + +# Prefer release binary (faster, less log noise). +BIN="target/release/aether-rust" +if [ ! -x "$BIN" ]; then + BIN="target/debug/aether-rust" +fi +if [ ! -x "$BIN" ]; then + echo "ERROR: no aether-rust binary found at target/{release,debug}/" >&2 + echo "Run: cargo build --release -p aether-grpc-server" >&2 + exit 2 +fi + +echo "==> using binary: $BIN" +echo "==> WS URL: wss://eth-mainnet.g.alchemy.com/v2/****${ALCHEMY_API_KEY: -4}" +echo "==> duration: ${DURATION}s, metrics port: $METRICS_PORT" +echo "==> logs: $LOG" + +# Boot engine in background. +"$BIN" >"$LOG" 2>&1 & +PID=$! +echo "==> engine pid=$PID" + +# Always clean up on exit even if we error out. +cleanup() { + if kill -0 "$PID" 2>/dev/null; then + kill "$PID" 2>/dev/null || true + sleep 1 + kill -9 "$PID" 2>/dev/null || true + fi +} +trap cleanup EXIT + +# Wait for the metrics endpoint to come up (binary boots, opens port). +echo "==> waiting for /metrics on port $METRICS_PORT ..." +for i in $(seq 1 30); do + if curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >/dev/null 2>&1; then + echo "==> /metrics live after ${i}s" + break + fi + if ! kill -0 "$PID" 2>/dev/null; then + echo "ERROR: engine exited during boot — see $LOG" >&2 + tail -30 "$LOG" >&2 + exit 3 + fi + sleep 1 +done + +if ! curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >/dev/null 2>&1; then + echo "ERROR: /metrics never came up — see $LOG" >&2 + exit 3 +fi + +# First sample at half the window, second at full. +HALF=$((DURATION / 2)) +echo "==> sampling at +${HALF}s and +${DURATION}s ..." +sleep "$HALF" +curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >"$M30" +sleep $((DURATION - HALF)) +curl -fsS "http://127.0.0.1:$METRICS_PORT/metrics" >"$M60" + +# Tear down. +kill "$PID" 2>/dev/null || true +wait "$PID" 2>/dev/null || true + +# Parse. +extract() { + local metric="$1" + local file="$2" + awk -v m="$metric" '$1 ~ "^"m"(\\{|$)" { sum += $NF } END { print sum+0 }' "$file" +} + +DEX_30=$(extract aether_pending_dex_tx_total "$M30") +DEX_60=$(extract aether_pending_dex_tx_total "$M60") +DEC_30=$(extract aether_pending_decode_errors_total "$M30") +DEC_60=$(extract aether_pending_decode_errors_total "$M60") +LAG_60=$(extract aether_pending_pipeline_lagged_total "$M60") +SKP_60=$(extract aether_pending_arb_sim_skipped_total "$M60") +CAN_60=$(extract aether_pending_arb_candidates_total "$M60") + +echo +echo "============================================================" +echo "SMOKE RESULT (window: ${DURATION}s)" +echo "============================================================" +printf " pending DEX tx forwarded: t+%2ds %-6s t+%ds %-6s\n" "$HALF" "$DEX_30" "$DURATION" "$DEX_60" +printf " pending decode errors: t+%2ds %-6s t+%ds %-6s\n" "$HALF" "$DEC_30" "$DURATION" "$DEC_60" +printf " pipeline lagged events: %s\n" "$LAG_60" +printf " arb sim skipped (any reason): %s\n" "$SKP_60" +printf " arb candidates produced: %s\n" "$CAN_60" +echo + +if [ "$DEX_60" -gt 0 ] || [ "$DEC_60" -gt 0 ]; then + echo "VERDICT: PASS — Alchemy tier delivers pending txns to the subscription." + echo " Free tier sufficient for stage 1 live capture." + exit 0 +else + echo "VERDICT: FAIL — zero pending DEX txns AND zero decoder failures." + echo " Either the WS subscription is silent (tier issue) or the" + echo " router-allowlist filter is rejecting everything before" + echo " counters increment. Inspect log:" + echo " $LOG" + exit 1 +fi