diff --git a/crates/grpc-server/src/bin/aether_profit_scorer.rs b/crates/grpc-server/src/bin/aether_profit_scorer.rs index 4c08e40..47d352f 100644 --- a/crates/grpc-server/src/bin/aether_profit_scorer.rs +++ b/crates/grpc-server/src/bin/aether_profit_scorer.rs @@ -32,11 +32,11 @@ //! `decision=unprofitable` rows that PR-3 v2 (with full-block fetch) could //! re-score upward. //! -//! Inlined helpers (fetch_pool_state_at, build_graph, u256_to_f64, sol! -//! getReserves / slot0) are deliberate duplicates of the equivalents in -//! `bin/aether_replay.rs`. Extracting them into a shared module would -//! touch the merged replay file (2200+ lines) and inflate this PR's -//! review burden. Follow-up: deduplicate after the mempool phase lands. +//! Shared helpers (`fetch_pool_state_at`, `load_pools`, `u256_to_f64`, +//! `uniswap_v2_get_amount_out`, `load_executor_init_bytecode`, the `sol!` +//! `getReserves` / `slot0` calldata builders, and the `LoadedPool` / +//! `PoolState` types) live in `aether_grpc_server::historical` and are used +//! identically by `bin/aether_replay.rs`. //! //! Run with: //! @@ -56,14 +56,11 @@ use alloy::eips::{BlockId, BlockNumberOrTag}; use alloy::network::Ethereum; use alloy::primitives::{address, Address, U256}; use alloy::providers::{DynProvider, Provider, ProviderBuilder}; -use alloy::rpc::types::TransactionRequest; -use alloy::sol; -use alloy::sol_types::{SolCall, SolValue}; +use alloy::sol_types::SolValue; use anyhow::{Context, Result}; use chrono::Utc; use clap::Parser; use prometheus::{Encoder, Registry, TextEncoder}; -use serde::Deserialize; use sqlx::postgres::PgPoolOptions; use tokio::time::{interval, MissedTickBehavior}; use tracing::{debug, error, info, warn}; @@ -74,6 +71,10 @@ use aether_detector::bellman_ford::BellmanFord; use aether_detector::gas as gas_model; use aether_detector::opportunity::DetectedCycle; use aether_detector::optimizer::ternary_search_optimal_input; +use aether_grpc_server::historical::{ + fetch_pool_state_at, load_executor_init_bytecode, load_pools, u256_to_f64, + uniswap_v2_get_amount_out, LoadedPool, PoolState, Q96, +}; use aether_grpc_server::profitability_writer::{ profit_writer_from_env, NewProfitabilityScore, PgProfitabilityWriter, ProfitabilitySink, ProfitabilityWriterMetrics, UnscoredConfirmedPrediction, DECISION_NO_PATH, @@ -113,10 +114,6 @@ const MAX_HOPS: usize = 4; /// search is apples-to-apples with the production predictor. const DETECT_BUDGET_US: u64 = 3_000; -/// 2^96 as f64. Used to convert UniswapV3 `sqrtPriceX96` into a -/// floating-point price. -const Q96: f64 = 79_228_162_514_264_337_593_543_950_336.0; - /// Default base fee assumption (wei) when `eth_getBlock(latest)` is /// unavailable. 30 gwei matches the engine's typical assumption in /// quiet markets; replaced by the actual base fee on every refresh. @@ -173,11 +170,6 @@ const SIM_OWNER: Address = address!("1111111111111111111111111111111111111111"); const DEFAULT_EXECUTOR_ARTIFACT: &str = "contracts/out/AetherExecutor.sol/AetherExecutor.json"; -sol! { - function getReserves() external view returns (uint112 reserve0, uint112 reserve1, uint32 blockTimestampLast); - function slot0() external view returns (uint160 sqrtPriceX96, int24 tick, uint16 observationIndex, uint16 observationCardinality, uint16 observationCardinalityNext, uint8 feeProtocol, bool unlocked); -} - #[derive(Parser, Debug)] #[command(name = "aether-profit-scorer", about = "Compute realised P&L per confirmed mempool prediction")] struct Args { @@ -808,30 +800,6 @@ fn verify_cycle_u256( /// here is byte-identical to what would actually execute. Returns `None` /// when any leg has zero reserves / zero input (drained-pool guard) or any /// intermediate multiplication overflows U256. -fn uniswap_v2_get_amount_out( - amount_in: U256, - reserve_in: U256, - reserve_out: U256, - fee_bps: u32, -) -> Option { - if reserve_in.is_zero() || reserve_out.is_zero() || amount_in.is_zero() { - return None; - } - // fee_bps = 30 (0.30%) → multiplier 9970/10000. The 10_000 - fee_bps - // form matches the contract's hard-coded numerator for the default 30 - // bps pool and generalises to lower-fee Uni V2 forks. - let fee_multiplier = U256::from(10_000u64.saturating_sub(fee_bps as u64)); - let amount_in_with_fee = amount_in.checked_mul(fee_multiplier)?; - let numerator = amount_in_with_fee.checked_mul(reserve_out)?; - let denominator = reserve_in - .checked_mul(U256::from(10_000u64))? - .checked_add(amount_in_with_fee)?; - if denominator.is_zero() { - return None; - } - Some(numerator / denominator) -} - /// U256 → i128 with saturating overflow. The scorer's `net_profit_wei` /// column is i128; profits beyond i128::MAX wei (≈170 quadrillion ETH — /// numerically unreachable on Ethereum) saturate rather than wrap. The @@ -956,22 +924,6 @@ fn balance_slot_for_token(token: Address) -> Option { } /// Load AetherExecutor init-bytecode from the forge-compiled JSON artifact. -fn load_executor_init_bytecode(artifact_path: &PathBuf) -> Result> { - let raw = std::fs::read_to_string(artifact_path) - .with_context(|| format!("read executor artifact {}", artifact_path.display()))?; - let v: serde_json::Value = serde_json::from_str(&raw).context("parse executor artifact JSON")?; - let hex_str = v - .pointer("/bytecode/object") - .and_then(|x| x.as_str()) - .ok_or_else(|| anyhow::anyhow!("missing /bytecode/object in artifact"))?; - let stripped = hex_str.strip_prefix("0x").unwrap_or(hex_str); - let bytes = alloy::hex::decode(stripped).context("decode bytecode hex")?; - if bytes.is_empty() { - anyhow::bail!("executor bytecode is empty"); - } - Ok(bytes) -} - /// Build `Vec` from a detected cycle using pre-fetched running /// states (synchronous — no RPC calls). Ported from aether_replay's /// `build_steps_from_cycle` but sync and fed from `running_states`. @@ -1204,51 +1156,10 @@ fn protocol_label(p: ProtocolType) -> &'static str { // ----- inlined helpers (duplicate of aether_replay.rs; see module docstring) ----- -#[derive(Clone, Copy, Debug)] -enum PoolState { - V2 { r0: U256, r1: U256 }, - V3 { sqrt_price_x96: U256 }, -} - -#[derive(Clone, Debug)] -struct LoadedPool { - address: Address, - token0: Address, - token1: Address, - protocol: ProtocolType, - fee_bps: u32, -} - -#[derive(Deserialize)] -struct PoolsConfig { - pools: Vec, -} - -#[derive(Deserialize)] -struct PoolEntry { - address: String, - token0: String, - token1: String, - protocol: String, - fee_bps: u32, -} - -fn parse_protocol(s: &str) -> Option { - match s { - "uniswap_v2" => Some(ProtocolType::UniswapV2), - "sushiswap" => Some(ProtocolType::SushiSwap), - "uniswap_v3" => Some(ProtocolType::UniswapV3), - "curve" => Some(ProtocolType::Curve), - "balancer_v2" => Some(ProtocolType::BalancerV2), - "bancor_v3" => Some(ProtocolType::BancorV3), - _ => None, - } -} - /// Map the short-form protocol strings the engine writes into /// `mempool_predictions.protocol` (see `aether_grpc_server::mempool_writer` /// `PROTOCOL_*` constants) to `ProtocolType`. Distinct from -/// [`parse_protocol`], which reads the long-form names used in +/// `historical::parse_protocol`, which reads the long-form names used in /// `config/pools.toml`. Kept narrow on purpose: only the protocols the /// scorer can actually score are returned; Balancer / Curve / Bancor /// fall through to `None` so we don't add edges for hops the engine @@ -1343,83 +1254,6 @@ async fn load_predicted_pools( Ok(out) } -fn load_pools(path: &PathBuf) -> Result> { - let raw = std::fs::read_to_string(path) - .with_context(|| format!("read pool config {}", path.display()))?; - let cfg: PoolsConfig = toml::from_str(&raw).context("parse pool config")?; - let mut out = Vec::new(); - for entry in cfg.pools { - let Some(protocol) = parse_protocol(&entry.protocol) else { - continue; - }; - // v1 scorer supports the same protocols aether-replay supports. - if !matches!( - protocol, - ProtocolType::UniswapV2 | ProtocolType::SushiSwap | ProtocolType::UniswapV3 - ) { - continue; - } - out.push(LoadedPool { - address: entry.address.parse().context("pool address")?, - token0: entry.token0.parse().context("token0")?, - token1: entry.token1.parse().context("token1")?, - protocol, - fee_bps: entry.fee_bps, - }); - } - Ok(out) -} - -async fn fetch_pool_state_at( - provider: &impl Provider, - pool: &LoadedPool, - block: u64, -) -> Result> { - let block_id = BlockId::Number(BlockNumberOrTag::Number(block)); - let state = match pool.protocol { - ProtocolType::UniswapV2 | ProtocolType::SushiSwap => { - let calldata = getReservesCall {}.abi_encode(); - let tx = TransactionRequest::default() - .to(pool.address) - .input(calldata.into()); - let out = provider.call(tx).block(block_id).await?; - if out.len() >= 64 { - Some(PoolState::V2 { - r0: U256::from_be_slice(&out[0..32]), - r1: U256::from_be_slice(&out[32..64]), - }) - } else { - None - } - } - ProtocolType::UniswapV3 => { - let calldata = slot0Call {}.abi_encode(); - let tx = TransactionRequest::default() - .to(pool.address) - .input(calldata.into()); - let out = provider.call(tx).block(block_id).await?; - if out.len() >= 32 { - Some(PoolState::V3 { - sqrt_price_x96: U256::from_be_slice(&out[0..32]), - }) - } else { - None - } - } - _ => None, - }; - Ok(state) -} - -fn u256_to_f64(v: U256) -> f64 { - let limbs = v.as_limbs(); - let mut acc = 0.0f64; - for (i, &limb) in limbs.iter().enumerate() { - acc += (limb as f64) * (2f64).powi((64 * i) as i32); - } - acc -} - struct ScorerState { graph: PriceGraph, token_index: TokenIndex, diff --git a/crates/grpc-server/src/bin/aether_replay.rs b/crates/grpc-server/src/bin/aether_replay.rs index 3115157..fa6a498 100644 --- a/crates/grpc-server/src/bin/aether_replay.rs +++ b/crates/grpc-server/src/bin/aether_replay.rs @@ -29,6 +29,10 @@ use aether_detector::bellman_ford::BellmanFord; use aether_detector::gas as gas_model; use aether_detector::opportunity::DetectedCycle; use aether_detector::optimizer::ternary_search_optimal_input; +use aether_grpc_server::historical::{ + fetch_pool_state_at, getReservesCall, load_executor_init_bytecode, load_pools, slot0Call, + u256_to_f64, uniswap_v2_get_amount_out, LoadedPool, PoolState, Q96, +}; use aether_simulator::calldata::{ build_execute_arb_calldata, build_univ2_swap_calldata, build_univ3_swap_calldata, }; @@ -37,22 +41,6 @@ use aether_simulator::EvmSimulator; use aether_state::price_graph::PriceGraph; use aether_state::token_index::TokenIndex; -sol! { - function getReserves() external view returns (uint112 reserve0, uint112 reserve1, uint32 blockTimestampLast); - function slot0() external view returns (uint160 sqrtPriceX96, int24 tick, uint16 observationIndex, uint16 observationCardinality, uint16 observationCardinalityNext, uint8 feeProtocol, bool unlocked); -} - -/// 2^96 as f64, used to convert UniswapV3 `sqrtPriceX96` into a floating-point price. -const Q96: f64 = 79_228_162_514_264_337_593_543_950_336.0; - -/// Per-pool state fetched from the chain. V3 carries `sqrtPriceX96`; V2/Sushi -/// carry `(reserve0, reserve1)`. -#[derive(Clone, Copy)] -enum PoolState { - V2 { r0: U256, r1: U256 }, - V3 { sqrt_price_x96: U256 }, -} - /// Well-known mainnet token labels for readable output. fn token_label(addr: &Address) -> String { const WETH: Address = address!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); @@ -183,41 +171,6 @@ struct Args { no_optimizer: bool, } -#[derive(serde::Deserialize)] -struct PoolEntry { - protocol: String, - address: String, - token0: String, - token1: String, - fee_bps: u32, -} - -#[derive(serde::Deserialize)] -struct PoolsConfig { - #[serde(default)] - pools: Vec, -} - -struct LoadedPool { - address: Address, - token0: Address, - token1: Address, - protocol: ProtocolType, - fee_bps: u32, -} - -fn parse_protocol(s: &str) -> Option { - match s { - "uniswap_v2" => Some(ProtocolType::UniswapV2), - "sushiswap" => Some(ProtocolType::SushiSwap), - "uniswap_v3" => Some(ProtocolType::UniswapV3), - "curve" => Some(ProtocolType::Curve), - "balancer_v2" => Some(ProtocolType::BalancerV2), - "bancor_v3" => Some(ProtocolType::BancorV3), - _ => None, - } -} - /// Built-in 7-pool set matching the integration tests. Enough token diversity /// (WETH/USDC/USDT/DAI) to produce real triangle-arb cycles when reserves are /// fetched from any recent mainnet block. @@ -266,83 +219,6 @@ fn default_pool_set() -> Vec { ] } -fn load_pools(path: &PathBuf) -> Result> { - let raw = std::fs::read_to_string(path) - .with_context(|| format!("read pool config {}", path.display()))?; - let cfg: PoolsConfig = toml::from_str(&raw).context("parse pool config")?; - - let mut out = Vec::new(); - for entry in cfg.pools { - // Phase 1 supports V2/Sushi (via `getReserves`) and UniswapV3 (via - // `slot0().sqrtPriceX96`). Curve / Balancer / Bancor are deferred. - let Some(protocol) = parse_protocol(&entry.protocol) else { - continue; - }; - if !matches!( - protocol, - ProtocolType::UniswapV2 | ProtocolType::SushiSwap | ProtocolType::UniswapV3 - ) { - continue; - } - out.push(LoadedPool { - address: entry.address.parse().context("pool address")?, - token0: entry.token0.parse().context("token0")?, - token1: entry.token1.parse().context("token1")?, - protocol, - fee_bps: entry.fee_bps, - }); - } - Ok(out) -} - -async fn fetch_pool_state_at( - provider: &impl Provider, - pool: &LoadedPool, - block: u64, -) -> Option { - let block_id = BlockId::Number(BlockNumberOrTag::Number(block)); - match pool.protocol { - ProtocolType::UniswapV2 | ProtocolType::SushiSwap => { - let calldata = getReservesCall {}.abi_encode(); - let tx = TransactionRequest::default() - .to(pool.address) - .input(calldata.into()); - match provider.call(tx).block(block_id).await { - Ok(out) if out.len() >= 64 => Some(PoolState::V2 { - r0: U256::from_be_slice(&out[0..32]), - r1: U256::from_be_slice(&out[32..64]), - }), - _ => None, - } - } - ProtocolType::UniswapV3 => { - let calldata = slot0Call {}.abi_encode(); - let tx = TransactionRequest::default() - .to(pool.address) - .input(calldata.into()); - match provider.call(tx).block(block_id).await { - // slot0 returns 7 values; only the first 32-byte word (sqrtPriceX96) is used. - Ok(out) if out.len() >= 32 => Some(PoolState::V3 { - sqrt_price_x96: U256::from_be_slice(&out[0..32]), - }), - _ => None, - } - } - _ => None, - } -} - -/// Truncate a U256 reserve to f64 for graph weight computation. -/// Loss of precision is acceptable: we only care about the ratio. -fn u256_to_f64(v: U256) -> f64 { - let limbs = v.as_limbs(); - let mut acc = 0.0f64; - for (i, &limb) in limbs.iter().enumerate() { - acc += (limb as f64) * (2f64).powi((64 * i) as i32); - } - acc -} - fn build_graph( pools: &[LoadedPool], states: &[(usize, PoolState)], @@ -535,7 +411,11 @@ async fn main() -> Result<()> { let pre_state_block = args.block - 1; let mut states = Vec::with_capacity(pools.len()); for (i, pool) in pools.iter().enumerate() { - if let Some(state) = fetch_pool_state_at(&provider, pool, pre_state_block).await { + if let Some(state) = fetch_pool_state_at(&provider, pool, pre_state_block) + .await + .ok() + .flatten() + { states.push((i, state)); } } @@ -1697,22 +1577,6 @@ struct SimOutcome { /// We use `bytecode.object` (deploy bytecode, runs constructor + installs /// runtime) not `deployedBytecode` — the constructor fills `aavePool` /// immutable and sets `owner = msg.sender`. -fn load_executor_init_bytecode(artifact_path: &PathBuf) -> Result> { - let raw = std::fs::read_to_string(artifact_path) - .with_context(|| format!("read executor artifact {}", artifact_path.display()))?; - let v: serde_json::Value = serde_json::from_str(&raw).context("parse executor artifact JSON")?; - let hex_str = v - .pointer("/bytecode/object") - .and_then(|x| x.as_str()) - .ok_or_else(|| anyhow::anyhow!("missing /bytecode/object in artifact"))?; - let stripped = hex_str.strip_prefix("0x").unwrap_or(hex_str); - let bytes = alloy::hex::decode(stripped).context("decode bytecode hex")?; - if bytes.is_empty() { - anyhow::bail!("executor bytecode is empty — artifact may be abstract / interface-only"); - } - Ok(bytes) -} - /// Deploy `AetherExecutor` on the Anvil fork from `SIM_OWNER` with constructor /// args `(AAVE_POOL, BALANCER_VAULT, BANCOR_NETWORK)`. Returns the deployed /// contract address. Impersonation + balance seeding are done here so the @@ -1892,29 +1756,6 @@ async fn build_steps_from_cycle( Some(steps) } -/// UniswapV2 `getAmountOut` — exact math, no rounding. Returns `None` on -/// zero-liquidity input (prevents the "drained pool" outlier from producing -/// a non-zero forecast). -fn uniswap_v2_get_amount_out( - amount_in: U256, - reserve_in: U256, - reserve_out: U256, - fee_bps: u32, -) -> Option { - if reserve_in.is_zero() || reserve_out.is_zero() || amount_in.is_zero() { - return None; - } - // Default UniV2 fee: 30 bps → multiplier 997/1000. - let fee_multiplier = U256::from(10_000u64 - fee_bps as u64); - let amount_in_with_fee = amount_in.checked_mul(fee_multiplier)?; - let numerator = amount_in_with_fee.checked_mul(reserve_out)?; - let denom = reserve_in.checked_mul(U256::from(10_000u64))?.checked_add(amount_in_with_fee)?; - if denom.is_zero() { - return None; - } - Some(numerator / denom) -} - /// WETH's `balanceOf` mapping is at storage slot 3 (verified against mainnet /// WETH9 source). Used by `simulate_rpc_with_erc20_profit` to locate the /// `balanceOf(SIM_OWNER)` slot for pre/post-sim balance diff. @@ -2107,6 +1948,7 @@ fn truncate_err(s: &str) -> String { #[cfg(test)] mod tests { use super::*; + use aether_grpc_server::historical::parse_protocol; #[test] fn token_label_known_symbols() { diff --git a/crates/grpc-server/src/historical.rs b/crates/grpc-server/src/historical.rs new file mode 100644 index 0000000..999f66e --- /dev/null +++ b/crates/grpc-server/src/historical.rs @@ -0,0 +1,319 @@ +//! Shared helpers for historical-block tooling — replay (`aether-replay`) and +//! mempool profit scoring (`aether-profit-scorer`). Both binaries fetch pool +//! state at a specific block, load pools from `config/pools.toml`, decode +//! `getReserves`/`slot0` calldata, and run identical math to convert U256 +//! reserves into f64 graph weights. Before this module, each binary kept its +//! own inline copy of every helper; this file is the single source of truth. + +use std::path::PathBuf; + +use alloy::eips::{BlockId, BlockNumberOrTag}; +use alloy::primitives::{Address, U256}; +use alloy::providers::Provider; +use alloy::rpc::types::TransactionRequest; +use alloy::sol; +use alloy::sol_types::SolCall; +use anyhow::{Context, Result}; + +use aether_common::types::ProtocolType; + +sol! { + function getReserves() external view returns (uint112 reserve0, uint112 reserve1, uint32 blockTimestampLast); + function slot0() external view returns (uint160 sqrtPriceX96, int24 tick, uint16 observationIndex, uint16 observationCardinality, uint16 observationCardinalityNext, uint8 feeProtocol, bool unlocked); +} + +/// 2^96 as f64, used to convert UniswapV3 `sqrtPriceX96` into a floating-point +/// price. +pub const Q96: f64 = 79_228_162_514_264_337_593_543_950_336.0; + +/// Per-pool state fetched from the chain. V3 carries `sqrtPriceX96`; V2/Sushi +/// carry `(reserve0, reserve1)`. +#[derive(Clone, Copy, Debug)] +pub enum PoolState { + V2 { r0: U256, r1: U256 }, + V3 { sqrt_price_x96: U256 }, +} + +#[derive(Clone, Debug)] +pub struct LoadedPool { + pub address: Address, + pub token0: Address, + pub token1: Address, + pub protocol: ProtocolType, + pub fee_bps: u32, +} + +#[derive(serde::Deserialize)] +pub struct PoolEntry { + pub protocol: String, + pub address: String, + pub token0: String, + pub token1: String, + pub fee_bps: u32, +} + +#[derive(serde::Deserialize)] +pub struct PoolsConfig { + #[serde(default)] + pub pools: Vec, +} + +/// Parse the long-form protocol string used in `config/pools.toml` (e.g. +/// `"uniswap_v2"`, `"uniswap_v3"`). Distinct from the scorer-local +/// `parse_db_protocol`, which reads the short-form strings the engine writes +/// into `mempool_predictions.protocol`. +pub fn parse_protocol(s: &str) -> Option { + match s { + "uniswap_v2" => Some(ProtocolType::UniswapV2), + "sushiswap" => Some(ProtocolType::SushiSwap), + "uniswap_v3" => Some(ProtocolType::UniswapV3), + "curve" => Some(ProtocolType::Curve), + "balancer_v2" => Some(ProtocolType::BalancerV2), + "bancor_v3" => Some(ProtocolType::BancorV3), + _ => None, + } +} + +/// Load + filter pools from a `pools.toml` config. Only V2 / Sushi / V3 pools +/// are returned; Curve / Balancer / Bancor entries parse but are dropped +/// because the historical tooling can't compute reserves for them yet. +pub fn load_pools(path: &PathBuf) -> Result> { + let raw = std::fs::read_to_string(path) + .with_context(|| format!("read pool config {}", path.display()))?; + let cfg: PoolsConfig = toml::from_str(&raw).context("parse pool config")?; + + let mut out = Vec::new(); + for entry in cfg.pools { + let Some(protocol) = parse_protocol(&entry.protocol) else { + continue; + }; + if !matches!( + protocol, + ProtocolType::UniswapV2 | ProtocolType::SushiSwap | ProtocolType::UniswapV3 + ) { + continue; + } + out.push(LoadedPool { + address: entry.address.parse().context("pool address")?, + token0: entry.token0.parse().context("token0")?, + token1: entry.token1.parse().context("token1")?, + protocol, + fee_bps: entry.fee_bps, + }); + } + Ok(out) +} + +/// Fetch `(getReserves)` or `(slot0().sqrtPriceX96)` for a pool at a specific +/// block. Returns `Ok(None)` if the call returns fewer bytes than expected +/// (truncated response, non-pool address). RPC errors propagate via `Err` — +/// callers that prefer the swallow-as-None behaviour wrap with +/// `.ok().flatten()`. +pub async fn fetch_pool_state_at( + provider: &impl Provider, + pool: &LoadedPool, + block: u64, +) -> Result> { + let block_id = BlockId::Number(BlockNumberOrTag::Number(block)); + let state = match pool.protocol { + ProtocolType::UniswapV2 | ProtocolType::SushiSwap => { + let calldata = getReservesCall {}.abi_encode(); + let tx = TransactionRequest::default() + .to(pool.address) + .input(calldata.into()); + let out = provider.call(tx).block(block_id).await?; + if out.len() >= 64 { + Some(PoolState::V2 { + r0: U256::from_be_slice(&out[0..32]), + r1: U256::from_be_slice(&out[32..64]), + }) + } else { + None + } + } + ProtocolType::UniswapV3 => { + let calldata = slot0Call {}.abi_encode(); + let tx = TransactionRequest::default() + .to(pool.address) + .input(calldata.into()); + let out = provider.call(tx).block(block_id).await?; + if out.len() >= 32 { + Some(PoolState::V3 { + sqrt_price_x96: U256::from_be_slice(&out[0..32]), + }) + } else { + None + } + } + _ => None, + }; + Ok(state) +} + +/// Truncate a U256 to f64 by summing each 64-bit limb scaled by its power of +/// two. Loss of precision is acceptable: callers use the result for graph +/// edge weights (`-ln(price)`), where only the ratio matters. +pub fn u256_to_f64(v: U256) -> f64 { + let limbs = v.as_limbs(); + let mut acc = 0.0f64; + for (i, &limb) in limbs.iter().enumerate() { + acc += (limb as f64) * (2f64).powi((64 * i) as i32); + } + acc +} + +/// UniswapV2 / SushiSwap constant-product output formula: +/// `dy = (dx * (10_000 - fee_bps) * y) / (x * 10_000 + dx * (10_000 - fee_bps))`. +/// +/// Returns `None` on zero reserves, zero input, or U256 overflow at any step. +/// `saturating_sub` on `(10_000 - fee_bps)` guards against pathological +/// configs where `fee_bps > 10_000` would otherwise underflow — for real +/// configs (max 30 bps V2, 100 bps V3) the saturating form is identical to +/// straight subtraction. +pub fn uniswap_v2_get_amount_out( + amount_in: U256, + reserve_in: U256, + reserve_out: U256, + fee_bps: u32, +) -> Option { + if reserve_in.is_zero() || reserve_out.is_zero() || amount_in.is_zero() { + return None; + } + let fee_multiplier = U256::from(10_000u64.saturating_sub(fee_bps as u64)); + let amount_in_with_fee = amount_in.checked_mul(fee_multiplier)?; + let numerator = amount_in_with_fee.checked_mul(reserve_out)?; + let denominator = reserve_in + .checked_mul(U256::from(10_000u64))? + .checked_add(amount_in_with_fee)?; + if denominator.is_zero() { + return None; + } + Some(numerator / denominator) +} + +/// Load the deployed-bytecode field from a forge-compiled AetherExecutor +/// artifact JSON. The pointer `/bytecode/object` matches forge's standard +/// output shape (`out/AetherExecutor.sol/AetherExecutor.json`). +pub fn load_executor_init_bytecode(artifact_path: &PathBuf) -> Result> { + let raw = std::fs::read_to_string(artifact_path) + .with_context(|| format!("read executor artifact {}", artifact_path.display()))?; + let v: serde_json::Value = serde_json::from_str(&raw).context("parse executor artifact JSON")?; + let hex_str = v + .pointer("/bytecode/object") + .and_then(|x| x.as_str()) + .ok_or_else(|| anyhow::anyhow!("missing /bytecode/object in artifact"))?; + let stripped = hex_str.strip_prefix("0x").unwrap_or(hex_str); + let bytes = alloy::hex::decode(stripped).context("decode bytecode hex")?; + if bytes.is_empty() { + anyhow::bail!("executor bytecode is empty — artifact may be abstract / interface-only"); + } + Ok(bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_protocol_known_strings() { + assert!(matches!(parse_protocol("uniswap_v2"), Some(ProtocolType::UniswapV2))); + assert!(matches!(parse_protocol("sushiswap"), Some(ProtocolType::SushiSwap))); + assert!(matches!(parse_protocol("uniswap_v3"), Some(ProtocolType::UniswapV3))); + assert!(matches!(parse_protocol("curve"), Some(ProtocolType::Curve))); + assert!(matches!(parse_protocol("balancer_v2"), Some(ProtocolType::BalancerV2))); + assert!(matches!(parse_protocol("bancor_v3"), Some(ProtocolType::BancorV3))); + assert!(parse_protocol("unknown").is_none()); + assert!(parse_protocol("").is_none()); + } + + #[test] + fn u256_to_f64_small_values_exact() { + assert_eq!(u256_to_f64(U256::ZERO), 0.0); + assert_eq!(u256_to_f64(U256::from(1u64)), 1.0); + assert_eq!(u256_to_f64(U256::from(1_000_000_000_000_000_000u64)), 1e18); + } + + #[test] + fn uniswap_v2_get_amount_out_canonical() { + // 1 token in, 1000:1000 reserves, 30 bps fee. + // Expected: (1 * 9970 * 1000) / (1000 * 10000 + 1 * 9970) = 9970000 / 10009970 ≈ 0 + let out = uniswap_v2_get_amount_out(U256::from(1u64), U256::from(1000u64), U256::from(1000u64), 30); + assert_eq!(out, Some(U256::from(0u64))); + // Larger trade. + let out = uniswap_v2_get_amount_out( + U256::from(1_000_000_000_000_000_000u64), + U256::from(1_000_000_000_000_000_000_000u128), + U256::from(1_000_000_000_000_000_000_000u128), + 30, + ); + assert!(out.is_some()); + assert!(out.unwrap() > U256::ZERO); + } + + #[test] + fn uniswap_v2_get_amount_out_zero_inputs() { + assert_eq!( + uniswap_v2_get_amount_out(U256::ZERO, U256::from(1u64), U256::from(1u64), 30), + None + ); + assert_eq!( + uniswap_v2_get_amount_out(U256::from(1u64), U256::ZERO, U256::from(1u64), 30), + None + ); + assert_eq!( + uniswap_v2_get_amount_out(U256::from(1u64), U256::from(1u64), U256::ZERO, 30), + None + ); + } + + #[test] + fn load_pools_filters_unsupported_protocols() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + tmp.path(), + r#" +[[pools]] +protocol = "uniswap_v2" +address = "0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc" +token0 = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" +token1 = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" +fee_bps = 30 + +[[pools]] +protocol = "curve" +address = "0xbEbc44782C7dB0a1A60Cb6fe97d0b483032FF1C7" +token0 = "0x6B175474E89094C44Da98b954EedeAC495271d0F" +token1 = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" +fee_bps = 4 + +[[pools]] +protocol = "uniswap_v3" +address = "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640" +token0 = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" +token1 = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" +fee_bps = 5 + +[[pools]] +protocol = "unknown_dex" +address = "0x0000000000000000000000000000000000000001" +token0 = "0x0000000000000000000000000000000000000002" +token1 = "0x0000000000000000000000000000000000000003" +fee_bps = 30 +"#, + ) + .unwrap(); + let pools = load_pools(&tmp.path().to_path_buf()).unwrap(); + // Curve + unknown filtered out; V2 + V3 retained. + assert_eq!(pools.len(), 2); + assert!(pools.iter().any(|p| matches!(p.protocol, ProtocolType::UniswapV2))); + assert!(pools.iter().any(|p| matches!(p.protocol, ProtocolType::UniswapV3))); + } + + #[test] + fn load_pools_handles_missing_pools_key() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), "# empty config\n").unwrap(); + let pools = load_pools(&tmp.path().to_path_buf()).unwrap(); + assert!(pools.is_empty()); + } +} diff --git a/crates/grpc-server/src/lib.rs b/crates/grpc-server/src/lib.rs index 19ef24d..0c0d447 100644 --- a/crates/grpc-server/src/lib.rs +++ b/crates/grpc-server/src/lib.rs @@ -5,6 +5,7 @@ /// without depending on the binary entry point. The `metrics` module is /// crate-private; only the two types the binary and integration tests /// actually need are re-exported publicly. +pub mod historical; pub(crate) mod metrics; pub mod profitability_writer; pub mod provider;