Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 11 additions & 177 deletions crates/grpc-server/src/bin/aether_profit_scorer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
//! `decision=unprofitable` rows that PR-3 v2 (with full-block fetch) could
//! re-score upward.
//!
//! Inlined helpers (fetch_pool_state_at, build_graph, u256_to_f64, sol!
//! getReserves / slot0) are deliberate duplicates of the equivalents in
//! `bin/aether_replay.rs`. Extracting them into a shared module would
//! touch the merged replay file (2200+ lines) and inflate this PR's
//! review burden. Follow-up: deduplicate after the mempool phase lands.
//! Shared helpers (`fetch_pool_state_at`, `load_pools`, `u256_to_f64`,
//! `uniswap_v2_get_amount_out`, `load_executor_init_bytecode`, the `sol!`
//! `getReserves` / `slot0` calldata builders, and the `LoadedPool` /
//! `PoolState` types) live in `aether_grpc_server::historical` and are used
//! identically by `bin/aether_replay.rs`.
//!
//! Run with:
//!
Expand All @@ -56,14 +56,11 @@ use alloy::eips::{BlockId, BlockNumberOrTag};
use alloy::network::Ethereum;
use alloy::primitives::{address, Address, U256};
use alloy::providers::{DynProvider, Provider, ProviderBuilder};
use alloy::rpc::types::TransactionRequest;
use alloy::sol;
use alloy::sol_types::{SolCall, SolValue};
use alloy::sol_types::SolValue;
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use prometheus::{Encoder, Registry, TextEncoder};
use serde::Deserialize;
use sqlx::postgres::PgPoolOptions;
use tokio::time::{interval, MissedTickBehavior};
use tracing::{debug, error, info, warn};
Expand All @@ -74,6 +71,10 @@ use aether_detector::bellman_ford::BellmanFord;
use aether_detector::gas as gas_model;
use aether_detector::opportunity::DetectedCycle;
use aether_detector::optimizer::ternary_search_optimal_input;
use aether_grpc_server::historical::{
fetch_pool_state_at, load_executor_init_bytecode, load_pools, u256_to_f64,
uniswap_v2_get_amount_out, LoadedPool, PoolState, Q96,
};
use aether_grpc_server::profitability_writer::{
profit_writer_from_env, NewProfitabilityScore, PgProfitabilityWriter, ProfitabilitySink,
ProfitabilityWriterMetrics, UnscoredConfirmedPrediction, DECISION_NO_PATH,
Expand Down Expand Up @@ -113,10 +114,6 @@ const MAX_HOPS: usize = 4;
/// search is apples-to-apples with the production predictor.
const DETECT_BUDGET_US: u64 = 3_000;

/// 2^96 as f64. Used to convert UniswapV3 `sqrtPriceX96` into a
/// floating-point price.
const Q96: f64 = 79_228_162_514_264_337_593_543_950_336.0;

/// Default base fee assumption (wei) when `eth_getBlock(latest)` is
/// unavailable. 30 gwei matches the engine's typical assumption in
/// quiet markets; replaced by the actual base fee on every refresh.
Expand Down Expand Up @@ -173,11 +170,6 @@ const SIM_OWNER: Address = address!("1111111111111111111111111111111111111111");
const DEFAULT_EXECUTOR_ARTIFACT: &str =
"contracts/out/AetherExecutor.sol/AetherExecutor.json";

sol! {
function getReserves() external view returns (uint112 reserve0, uint112 reserve1, uint32 blockTimestampLast);
function slot0() external view returns (uint160 sqrtPriceX96, int24 tick, uint16 observationIndex, uint16 observationCardinality, uint16 observationCardinalityNext, uint8 feeProtocol, bool unlocked);
}

#[derive(Parser, Debug)]
#[command(name = "aether-profit-scorer", about = "Compute realised P&L per confirmed mempool prediction")]
struct Args {
Expand Down Expand Up @@ -808,30 +800,6 @@ fn verify_cycle_u256(
/// here is byte-identical to what would actually execute. Returns `None`
/// when any leg has zero reserves / zero input (drained-pool guard) or any
/// intermediate multiplication overflows U256.
fn uniswap_v2_get_amount_out(
amount_in: U256,
reserve_in: U256,
reserve_out: U256,
fee_bps: u32,
) -> Option<U256> {
if reserve_in.is_zero() || reserve_out.is_zero() || amount_in.is_zero() {
return None;
}
// fee_bps = 30 (0.30%) → multiplier 9970/10000. The 10_000 - fee_bps
// form matches the contract's hard-coded numerator for the default 30
// bps pool and generalises to lower-fee Uni V2 forks.
let fee_multiplier = U256::from(10_000u64.saturating_sub(fee_bps as u64));
let amount_in_with_fee = amount_in.checked_mul(fee_multiplier)?;
let numerator = amount_in_with_fee.checked_mul(reserve_out)?;
let denominator = reserve_in
.checked_mul(U256::from(10_000u64))?
.checked_add(amount_in_with_fee)?;
if denominator.is_zero() {
return None;
}
Some(numerator / denominator)
}

/// U256 → i128 with saturating overflow. The scorer's `net_profit_wei`
/// column is i128; profits beyond i128::MAX wei (≈170 quadrillion ETH —
/// numerically unreachable on Ethereum) saturate rather than wrap. The
Expand Down Expand Up @@ -956,22 +924,6 @@ fn balance_slot_for_token(token: Address) -> Option<U256> {
}

/// Load AetherExecutor init-bytecode from the forge-compiled JSON artifact.
fn load_executor_init_bytecode(artifact_path: &PathBuf) -> Result<Vec<u8>> {
let raw = std::fs::read_to_string(artifact_path)
.with_context(|| format!("read executor artifact {}", artifact_path.display()))?;
let v: serde_json::Value = serde_json::from_str(&raw).context("parse executor artifact JSON")?;
let hex_str = v
.pointer("/bytecode/object")
.and_then(|x| x.as_str())
.ok_or_else(|| anyhow::anyhow!("missing /bytecode/object in artifact"))?;
let stripped = hex_str.strip_prefix("0x").unwrap_or(hex_str);
let bytes = alloy::hex::decode(stripped).context("decode bytecode hex")?;
if bytes.is_empty() {
anyhow::bail!("executor bytecode is empty");
}
Ok(bytes)
}

/// Build `Vec<SwapStep>` from a detected cycle using pre-fetched running
/// states (synchronous — no RPC calls). Ported from aether_replay's
/// `build_steps_from_cycle` but sync and fed from `running_states`.
Expand Down Expand Up @@ -1204,51 +1156,10 @@ fn protocol_label(p: ProtocolType) -> &'static str {

// ----- inlined helpers (duplicate of aether_replay.rs; see module docstring) -----

#[derive(Clone, Copy, Debug)]
enum PoolState {
V2 { r0: U256, r1: U256 },
V3 { sqrt_price_x96: U256 },
}

#[derive(Clone, Debug)]
struct LoadedPool {
address: Address,
token0: Address,
token1: Address,
protocol: ProtocolType,
fee_bps: u32,
}

#[derive(Deserialize)]
struct PoolsConfig {
pools: Vec<PoolEntry>,
}

#[derive(Deserialize)]
struct PoolEntry {
address: String,
token0: String,
token1: String,
protocol: String,
fee_bps: u32,
}

fn parse_protocol(s: &str) -> Option<ProtocolType> {
match s {
"uniswap_v2" => Some(ProtocolType::UniswapV2),
"sushiswap" => Some(ProtocolType::SushiSwap),
"uniswap_v3" => Some(ProtocolType::UniswapV3),
"curve" => Some(ProtocolType::Curve),
"balancer_v2" => Some(ProtocolType::BalancerV2),
"bancor_v3" => Some(ProtocolType::BancorV3),
_ => None,
}
}

/// Map the short-form protocol strings the engine writes into
/// `mempool_predictions.protocol` (see `aether_grpc_server::mempool_writer`
/// `PROTOCOL_*` constants) to `ProtocolType`. Distinct from
/// [`parse_protocol`], which reads the long-form names used in
/// `historical::parse_protocol`, which reads the long-form names used in
/// `config/pools.toml`. Kept narrow on purpose: only the protocols the
/// scorer can actually score are returned; Balancer / Curve / Bancor
/// fall through to `None` so we don't add edges for hops the engine
Expand Down Expand Up @@ -1343,83 +1254,6 @@ async fn load_predicted_pools(
Ok(out)
}

fn load_pools(path: &PathBuf) -> Result<Vec<LoadedPool>> {
let raw = std::fs::read_to_string(path)
.with_context(|| format!("read pool config {}", path.display()))?;
let cfg: PoolsConfig = toml::from_str(&raw).context("parse pool config")?;
let mut out = Vec::new();
for entry in cfg.pools {
let Some(protocol) = parse_protocol(&entry.protocol) else {
continue;
};
// v1 scorer supports the same protocols aether-replay supports.
if !matches!(
protocol,
ProtocolType::UniswapV2 | ProtocolType::SushiSwap | ProtocolType::UniswapV3
) {
continue;
}
out.push(LoadedPool {
address: entry.address.parse().context("pool address")?,
token0: entry.token0.parse().context("token0")?,
token1: entry.token1.parse().context("token1")?,
protocol,
fee_bps: entry.fee_bps,
});
}
Ok(out)
}

async fn fetch_pool_state_at(
provider: &impl Provider,
pool: &LoadedPool,
block: u64,
) -> Result<Option<PoolState>> {
let block_id = BlockId::Number(BlockNumberOrTag::Number(block));
let state = match pool.protocol {
ProtocolType::UniswapV2 | ProtocolType::SushiSwap => {
let calldata = getReservesCall {}.abi_encode();
let tx = TransactionRequest::default()
.to(pool.address)
.input(calldata.into());
let out = provider.call(tx).block(block_id).await?;
if out.len() >= 64 {
Some(PoolState::V2 {
r0: U256::from_be_slice(&out[0..32]),
r1: U256::from_be_slice(&out[32..64]),
})
} else {
None
}
}
ProtocolType::UniswapV3 => {
let calldata = slot0Call {}.abi_encode();
let tx = TransactionRequest::default()
.to(pool.address)
.input(calldata.into());
let out = provider.call(tx).block(block_id).await?;
if out.len() >= 32 {
Some(PoolState::V3 {
sqrt_price_x96: U256::from_be_slice(&out[0..32]),
})
} else {
None
}
}
_ => None,
};
Ok(state)
}

fn u256_to_f64(v: U256) -> f64 {
let limbs = v.as_limbs();
let mut acc = 0.0f64;
for (i, &limb) in limbs.iter().enumerate() {
acc += (limb as f64) * (2f64).powi((64 * i) as i32);
}
acc
}

struct ScorerState {
graph: PriceGraph,
token_index: TokenIndex,
Expand Down
Loading