Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 400 additions & 0 deletions cmd/reconciler/main.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions crates/grpc-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ path = "src/main.rs"
name = "aether-replay"
path = "src/bin/aether_replay.rs"

[[bin]]
name = "aether-profit-scorer"
path = "src/bin/aether_profit_scorer.rs"

[dependencies]
aether-common = { path = "../common" }
aether-ingestion = { path = "../ingestion" }
Expand Down Expand Up @@ -48,6 +52,8 @@ serde_json = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
sha2 = "0.10"
sqlx = { workspace = true }
bigdecimal = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true }
prost-build = { workspace = true }
Expand Down
2,085 changes: 2,085 additions & 0 deletions crates/grpc-server/src/bin/aether_profit_scorer.rs

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions crates/grpc-server/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,24 @@ impl AetherEngine {
(1.0 / price) * fee, meta.pool_id, pool_addr,
meta.protocol, liq,
);
// Seed the synthetic `(1.0, spot_price)` reserve
// pair on the edge. `add_edge` only sets weight;
// without this follow-up the edge stays at
// `reserve_in = reserve_out = 0.0` and the
// `reserves_zero` guard in
// `mempool_pipeline::try_post_state_scan` drops
// every V3 swap before it reaches the post-state
// predictor. The convention `(1.0, spot_price)`
// matches the scorer's `state_to_graph_reserves`
// V3 branch so the two sides stay in lockstep.
graph.update_edge_from_reserves(
meta.token0_idx, meta.token1_idx,
meta.pool_id, 1.0, price, fee,
);
graph.update_edge_from_reserves(
meta.token1_idx, meta.token0_idx,
meta.pool_id, 1.0, 1.0 / price, fee,
);
// Seed the V3 pool-state cache. Liquidity is set
// to zero here because slot0 does not expose it —
// a separate `liquidity()` RPC would be required
Expand Down Expand Up @@ -1240,6 +1258,28 @@ impl AetherEngine {
meta.protocol,
liq,
);
// Refresh the synthetic `(1.0, spot_price)` reserve
// pair on the edge so live V3 sqrtPrice updates flow
// through to `mempool_pipeline::try_post_state_scan`'s
// `reserves_zero` guard. Same convention used by the
// bootstrap branch and the scorer's
// `state_to_graph_reserves`.
graph.update_edge_from_reserves(
meta.token0_idx,
meta.token1_idx,
meta.pool_id,
1.0,
price,
fee,
);
graph.update_edge_from_reserves(
meta.token1_idx,
meta.token0_idx,
meta.pool_id,
1.0,
1.0 / price,
fee,
);
// Snapshot is published once per detection cycle, not per event.
// Refresh the V3 pool-state cache entry. The event
// carries everything `predict_post_state` needs
Expand Down Expand Up @@ -2570,6 +2610,60 @@ mod tests {
assert!(graph.has_dirty_edges());
}

/// V3 graph edges must carry the synthetic `(1.0, spot_price)` reserve
/// pair after a V3Update event. Regression guard for the bug where
/// `add_edge` set the weight but left `reserve_in == reserve_out == 0.0`,
/// causing `mempool_pipeline::try_post_state_scan`'s `reserves_zero`
/// guard to drop every V3 swap before reaching the post-state predictor.
#[tokio::test]
async fn test_v3_update_seeds_synthetic_reserves() {
let (tx, _rx) = broadcast::channel(100);
let engine = AetherEngine::new(EngineConfig::default(), tx);

let pool = Address::repeat_byte(0xCD);
let token0 = Address::repeat_byte(0x31);
let token1 = Address::repeat_byte(0x41);

engine
.register_pool(pool, token0, token1, ProtocolType::UniswapV3, 5)
.await;

// sqrt_price_x96 = 2 * 2^96 → price = 4.0. Asymmetric value catches
// any direction-swap bug between forward and reverse edges.
let sqrt_x96 = U256::from(2u128) * (U256::from(1u128) << 96);
let event = PoolEvent::V3Update {
pool,
sqrt_price_x96: sqrt_x96,
liquidity: 1_000_000,
tick: 0,
};
engine.handle_pool_update(event).await;

let reg = engine.pool_registry.load();
let meta = reg.get(&pool).expect("V3 pool registered");
let t0 = meta.token0_idx;
let t1 = meta.token1_idx;
let pool_id = meta.pool_id;

let graph = engine.working_graph.lock().await;
let fwd = graph
.edges_from(t0)
.iter()
.find(|e| e.to == t1 && e.pool_id == pool_id)
.expect("V3 forward edge present");
let rev = graph
.edges_from(t1)
.iter()
.find(|e| e.to == t0 && e.pool_id == pool_id)
.expect("V3 reverse edge present");

// price = (sqrt/2^96)^2 = 2^2 = 4.0
assert!((fwd.reserve_in - 1.0).abs() < 1e-9, "fwd reserve_in {}", fwd.reserve_in);
assert!((fwd.reserve_out - 4.0).abs() < 1e-6, "fwd reserve_out {}", fwd.reserve_out);
assert!((rev.reserve_in - 1.0).abs() < 1e-9, "rev reserve_in {}", rev.reserve_in);
assert!((rev.reserve_out - 0.25).abs() < 1e-9, "rev reserve_out {}", rev.reserve_out);
}

#[tokio::test]
async fn test_pool_created_auto_registers() {
let (tx, _rx) = broadcast::channel(100);
Expand Down
1 change: 1 addition & 0 deletions crates/grpc-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/// crate-private; only the two types the binary and integration tests
/// actually need are re-exported publicly.
pub(crate) mod metrics;
pub mod profitability_writer;
pub mod provider;

pub use metrics::{start_metrics_server, EngineMetrics};
12 changes: 12 additions & 0 deletions crates/grpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio_stream::wrappers::UnixListenerStream;
mod cycle_gating;
mod engine;
mod mempool_pipeline;
mod mempool_writer;
mod pipeline;
mod service;
mod tracing_init;
Expand Down Expand Up @@ -146,6 +147,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// the engine's BellmanFord config so the analytical scan
// honours the same hop / latency budget as the main path.
let engine_cfg = EngineConfig::default();
// Mempool prediction writer: optional persistence to a separate
// Postgres DSN. MEMPOOL_LEDGER_DSN unset → NoopMempoolSink, no
// DB writes, behaviour identical to today. Distinct from the
// trade ledger's DATABASE_URL so an operator can enable mempool
// observability without provisioning the executor schema.
let writer_metrics =
mempool_writer::MempoolWriterMetrics::register(metrics.registry());
let prediction_sink = mempool_writer::mempool_writer_from_env(writer_metrics).await;
let engine_git_sha = std::env::var("AETHER_GIT_SHA").ok();
let sim_ctx = Arc::new(mempool_pipeline::SimContext::new(
Arc::clone(engine.pool_registry()),
Arc::clone(engine.token_index()),
Expand All @@ -155,6 +165,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
engine_cfg.detection_time_budget_us,
),
Arc::clone(engine.pool_states()),
prediction_sink,
engine_git_sha,
));
let pipeline_handle = mempool_pipeline::spawn_mempool_pipeline(
Arc::clone(engine.event_channels()),
Expand Down
81 changes: 80 additions & 1 deletion crates/grpc-server/src/mempool_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ use aether_state::snapshot::SnapshotManager;
use aether_state::token_index::TokenIndex;
use alloy::primitives::{Address, U256};
use arc_swap::ArcSwap;
use chrono::Utc;
use tokio::sync::watch;
use tracing::{debug, info, warn};
use uuid::Uuid;

use crate::engine::PoolMetadata;
use crate::mempool_writer::{
MempoolPredictionSink, NewMempoolPrediction, PredictedPostState, PROTOCOL_BALANCER,
PROTOCOL_SUSHI, PROTOCOL_UNI_V2, PROTOCOL_UNI_V3,
};
use crate::EngineMetrics;

/// Pair-keyed pool index built from the live pool registry. Lookup is O(1)
Expand Down Expand Up @@ -80,26 +86,40 @@ pub struct SimContext {
/// Balancer mempool sim path to call `predict_post_state_with_fallback`
/// without round-tripping through the pool registry RPC.
pub pool_states: PoolStateCache,
/// Optional persistence sink for mempool predictions. `Arc<NoopMempoolSink>`
/// when `MEMPOOL_LEDGER_DSN` is unset (no DB writes, no behaviour
/// change); `Arc<PgMempoolWriter>` when set. Always present so the
/// post-state path can call `insert_prediction` unconditionally.
pub prediction_sink: Arc<dyn MempoolPredictionSink>,
/// Engine build's git sha, copied onto every persisted prediction so
/// the reconciler / scorer can correlate row outcomes with the engine
/// version that produced them. `None` when the env var is unset.
pub engine_git_sha: Option<String>,
/// Cached `(registry_ptr, PairIndex)` so the second and following pending
/// swaps under the same registry generation lookup in O(1). The Mutex
/// guards rebuild only — the steady-state path is `lock + ptr_eq + read`.
pair_index_cache: Mutex<Option<(usize, Arc<PairIndex>)>>,
}

impl SimContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
pool_registry: Arc<ArcSwap<HashMap<Address, PoolMetadata>>>,
token_index: Arc<ArcSwap<TokenIndex>>,
snapshot_manager: Arc<SnapshotManager>,
detector: BellmanFord,
pool_states: PoolStateCache,
prediction_sink: Arc<dyn MempoolPredictionSink>,
engine_git_sha: Option<String>,
) -> Self {
Self {
pool_registry,
token_index,
snapshot_manager,
detector,
pool_states,
prediction_sink,
engine_git_sha,
pair_index_cache: Mutex::new(None),
}
}
Expand Down Expand Up @@ -215,8 +235,9 @@ fn handle_event(
let ctx = Arc::clone(ctx);
let swap = swap.clone();
let router_label = router_label.clone();
let tx_hash = event.tx_hash;
tokio::task::spawn_blocking(move || {
try_post_state_scan(&metrics, &ctx, &router_label, &swap);
try_post_state_scan(&metrics, &ctx, &router_label, &swap, tx_hash, to);
});
}
}
Expand Down Expand Up @@ -352,6 +373,8 @@ fn try_post_state_scan(
ctx: &SimContext,
router_label: &str,
swap: &DecodedSwap,
event_tx_hash: alloy::primitives::B256,
event_to: Address,
) {
let target_protocol = match swap.protocol {
Protocol::UniswapV2 => ProtocolType::UniswapV2,
Expand Down Expand Up @@ -453,6 +476,47 @@ fn try_post_state_scan(
.detect_from_affected(&graph, &[in_idx, out_idx]);
let profitable: Vec<_> = cycles.into_iter().filter(|c| c.is_profitable()).collect();

// Persist the prediction unconditionally — both profitable and
// unprofitable swaps are useful signal for the reconciler (issue #131
// Go half), which needs the full population of decoded mempool swaps
// to compute block / ordering / pool-path accuracy. The
// `profit_factor_predicted` column is the SQL signal that the engine
// would have considered acting on the swap.
let post_state_json = match swap.protocol {
Protocol::UniswapV2 | Protocol::SushiSwap => PredictedPostState::V2 {
reserve_in: post_in,
reserve_out: post_out,
},
Protocol::UniswapV3 => PredictedPostState::V3 {
reserve_in: post_in,
reserve_out: post_out,
},
Protocol::BalancerV2 => PredictedPostState::Balancer {
reserve_in: post_in,
reserve_out: post_out,
},
}
.into_json();
let prediction = NewMempoolPrediction {
prediction_id: Uuid::new_v4(),
decoded_at: Utc::now(),
pending_tx_hash: event_tx_hash,
router_address: event_to,
protocol: decoder_protocol_label(swap.protocol),
token_in: swap.token_in,
token_out: swap.token_out,
amount_in: swap.amount_in,
pool_address: Some(meta.pool_id.address),
predicted_target_block: snapshot.block_number.saturating_add(1),
predicted_post_state: post_state_json,
profit_factor_predicted: profitable.first().map(|c| c.profit_factor()),
// Reserved for the MEV-Share SSE path; Alchemy WS pendings carry
// no builder-side timestamp today.
detection_lead_ms: None,
engine_git_sha: ctx.engine_git_sha.clone(),
};
ctx.prediction_sink.insert_prediction(prediction);

if profitable.is_empty() {
metrics.inc_pending_arb_sim_skipped("no_profitable_cycle");
return;
Expand All @@ -476,6 +540,18 @@ fn try_post_state_scan(
);
}

/// Wire label for the `protocol` column on `mempool_predictions`. Pinned to
/// the strings declared in [`crate::mempool_writer`] so the writer and the
/// pipeline cannot drift. Matches issue #131's schema body.
fn decoder_protocol_label(p: Protocol) -> &'static str {
match p {
Protocol::UniswapV2 => PROTOCOL_UNI_V2,
Protocol::SushiSwap => PROTOCOL_SUSHI,
Protocol::UniswapV3 => PROTOCOL_UNI_V3,
Protocol::BalancerV2 => PROTOCOL_BALANCER,
}
}

/// Map a V3 / Balancer post-state into the (post_in, post_out) reserves the
/// price graph stores per edge. Curve cannot reach here — the router
/// decoder rejects every Curve calldata shape with `CurveUnsupported`
Expand Down Expand Up @@ -758,6 +834,7 @@ mod tests {
/// empty, snapshot has a zero-vertex graph. Any `lookup_pool` returns
/// `None`, which is what the `not_in_registry` test wants anyway.
fn empty_sim_ctx() -> Arc<SimContext> {
use crate::mempool_writer::NoopMempoolSink;
use aether_pools::new_pool_state_cache;
use aether_state::price_graph::PriceGraph;
Arc::new(SimContext::new(
Expand All @@ -766,6 +843,8 @@ mod tests {
Arc::new(SnapshotManager::new(PriceGraph::new(0))),
BellmanFord::new(3, 1_000),
new_pool_state_cache(),
Arc::new(NoopMempoolSink::new()),
None,
))
}

Expand Down
Loading