Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8a37a89
feat(mempool): live tracking scaffold (subscribe + decode + SSE)
0xfandom Apr 30, 2026
c333731
feat(decoder): add UniV2 fee-on-transfer swap variants
0xfandom Apr 30, 2026
f9ca2e1
chore(infra): scrape host.docker.internal for hybrid local-dev
0xfandom Apr 30, 2026
e3a2378
feat(mempool): analytical V2 post-state simulator
0xfandom Apr 30, 2026
e80bedf
fix(decoder): tag SushiSwap router as Protocol::SushiSwap
0xfandom May 5, 2026
b5fe2e6
fix(observability): scrape monitor on its default port (9090)
0xfandom May 5, 2026
78b136a
perf(mempool): O(1) pool lookup via cached pair index
0xfandom May 5, 2026
d5271b0
feat(mempool): meter pipeline broadcast drops via lagged counter
0xfandom May 5, 2026
1f97d85
fix(monitor): force MEV-Share reconnect after 60s read silence
0xfandom May 5, 2026
689e9fe
fix(decoder): tag known Curve routers with dedicated reason
0xfandom May 5, 2026
15539d6
docs(mempool): document Alchemy-only requirement and warn at startup
0xfandom May 5, 2026
7b19078
perf(mempool): offload post-state scan to blocking pool
0xfandom May 5, 2026
843dcc7
chore(mempool): nit cleanups from PR review
0xfandom May 5, 2026
32ffa7e
chore(monitor): drop MEV-Share SSE consumer + dashboard panels
0xfandom May 6, 2026
808dbe7
feat(mempool): pre-sim filter + aether_mempool_filtered_total
0xfandom May 6, 2026
ad62370
chore(scripts): add mempool_capture.sh for live mainnet proof runs
0xfandom May 7, 2026
373ec1f
fix(metrics): dedent doc-list items to satisfy clippy
0xfandom May 8, 2026
dc61df2
feat(mempool): wire V3 + Balancer post-state into the mempool sim
0xfandom May 8, 2026
12a31fa
feat(mempool,engine): info-log pipeline stages for live observability
0xfandom May 11, 2026
39e920f
fix(mempool): pure is_enabled_from_str helper to avoid env mutation i…
Pablosinyores May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,28 @@ SLACK_WEBHOOK_URL=
# POSTGRES_USER=aether
# POSTGRES_PASSWORD=aether
# POSTGRES_DB=aether

# Mempool tracking — opt-in. When set to a truthy value (1/true/yes/on),
# aether-rust spawns the Alchemy alchemy_pendingTransactions WS subscription
# and the calldata decoder, and aether-monitor connects to the Flashbots
# MEV-Share SSE stream. Both binaries log + emit metrics only — no bundle is
# constructed and no submission is made.
#
# IMPORTANT: alchemy_pendingTransactions is an Alchemy-proprietary subscribe
# method. Reth, QuickNode, Infura, and self-hosted Geth do NOT implement it —
# the WS handshake succeeds but the subscription returns no events, so this
# feature silently produces zero metrics on a non-Alchemy endpoint. The
# subscription task logs a warning at startup if MEMPOOL_WS_URL / ETH_RPC_URL
# does not look like an Alchemy host. To use a non-Alchemy provider you need
# a different mempool source (Chainbound Fiber, bloXroute, self-hosted Reth
# txpool IPC) — see crates/ingestion/src/mempool.rs MempoolSource trait.
# MEMPOOL_TRACKING=1

# Override the WS endpoint for the mempool subscription. Defaults to
# ETH_RPC_URL when unset; only set this if you want a different node for
# the mempool stream than your block / log subscriptions. MUST point at an
# Alchemy endpoint until a non-Alchemy MempoolSource lands.
# MEMPOOL_WS_URL=wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY}

# Override the Flashbots MEV-Share SSE endpoint. Defaults to mainnet.
# MEV_SHARE_URL=https://mev-share.flashbots.net
27 changes: 22 additions & 5 deletions crates/grpc-server/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,10 +1572,14 @@ impl AetherEngine {
Err(_) => return None,
};
if net_profit < self.config.min_profit_threshold_wei {
debug!(
net_profit,
threshold = self.config.min_profit_threshold_wei,
"Below min profit threshold"
let net_profit_eth = net_profit as f64 / 1e18;
let threshold_eth = self.config.min_profit_threshold_wei as f64 / 1e18;
info!(
net_profit_wei = net_profit,
net_profit_eth = format!("{:.6}", net_profit_eth),
threshold_eth = format!("{:.6}", threshold_eth),
hops = candidate.hops.len(),
"CYCLE REJECTED: below min profit threshold"
);
return None;
}
Expand Down Expand Up @@ -1812,9 +1816,22 @@ impl AetherEngine {
self.metrics.observe_simulation_latency_us(sim_us);

if !sim_result.success {
debug!(sim_us, reason = ?sim_result.revert_reason, "Simulation failed, skipping");
info!(
sim_us,
reason = ?sim_result.revert_reason,
hops = input.opp.hops.len(),
expected_net_wei = input.net_profit,
"REVM SIM REVERTED"
);
continue;
}
info!(
sim_us,
hops = input.opp.hops.len(),
expected_net_wei = input.net_profit,
expected_net_eth = format!("{:.6}", input.net_profit as f64 / 1e18),
"REVM SIM OK"
);

// Post-sim cross-check (gate 4 of the candidate gating layer).
// The pre-sim gates catch corruption signatures visible from
Expand Down
78 changes: 69 additions & 9 deletions crates/grpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio_stream::wrappers::UnixListenerStream;

mod cycle_gating;
mod engine;
mod mempool_pipeline;
mod pipeline;
mod service;
mod tracing_init;
Expand Down Expand Up @@ -77,8 +78,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Bootstrap pools from config file at startup.
// Supports AETHER_POOLS_CONFIG env var to override the default path,
// so the binary works regardless of the working directory.
let pools_config = std::env::var("AETHER_POOLS_CONFIG")
.unwrap_or_else(|_| "config/pools.toml".to_string());
let pools_config =
std::env::var("AETHER_POOLS_CONFIG").unwrap_or_else(|_| "config/pools.toml".to_string());
let pool_count = engine.bootstrap_pools(&pools_config).await;
info!(pool_count, path = %pools_config, "Pools loaded at startup");

Expand Down Expand Up @@ -115,13 +116,64 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
provider_clone.run(provider_shutdown_rx).await;
});

// Mempool tracking is opt-in via MEMPOOL_TRACKING=1. When unset the
// engine behaves identically to today; when set we spawn the Alchemy
// pending-tx subscription and the decode pipeline that consumes it.
let mempool_handles = if aether_ingestion::mempool::is_enabled() {
info!("MEMPOOL_TRACKING enabled — spawning pending-tx subscription + decode pipeline");
let ws_url = std::env::var("MEMPOOL_WS_URL")
.or_else(|_| std::env::var("ETH_RPC_URL"))
.unwrap_or_default();
if ws_url.is_empty() {
tracing::warn!(
"MEMPOOL_TRACKING set but neither MEMPOOL_WS_URL nor ETH_RPC_URL provided; skipping"
);
None
} else {
let cfg = aether_ingestion::mempool::AlchemyMempoolConfig {
ws_url,
router_filter: aether_ingestion::mempool::default_router_addresses(),
};
let source = Arc::new(aether_ingestion::mempool::AlchemyMempool::new(cfg));
let channels = Arc::clone(engine.event_channels());
let source_shutdown = shutdown_rx.clone();
let source_handle = tokio::spawn(async move {
use aether_ingestion::mempool::MempoolSource;
source.run(channels, source_shutdown).await;
});
// Build the post-state simulation context from the engine's
// live registry / token index / snapshot. The detector mirrors
// the engine's BellmanFord config so the analytical scan
// honours the same hop / latency budget as the main path.
let engine_cfg = EngineConfig::default();
let sim_ctx = Arc::new(mempool_pipeline::SimContext::new(
Arc::clone(engine.pool_registry()),
Arc::clone(engine.token_index()),
Arc::clone(engine.snapshot_manager()),
aether_detector::bellman_ford::BellmanFord::new(
engine_cfg.max_hops,
engine_cfg.detection_time_budget_us,
),
Arc::clone(engine.pool_states()),
));
let pipeline_handle = mempool_pipeline::spawn_mempool_pipeline(
Arc::clone(engine.event_channels()),
Arc::clone(&metrics),
Some(sim_ctx),
shutdown_rx.clone(),
);
Some((source_handle, pipeline_handle))
}
} else {
None
};

// Read the listen address from the environment so the systemd unit and
// the binary always agree. Default to localhost TCP for development.
//
// Production (UDS): GRPC_ADDRESS=unix:///var/run/aether/engine.sock
// Development (TCP): GRPC_ADDRESS=[::1]:50051 (default)
let addr_str =
std::env::var("GRPC_ADDRESS").unwrap_or_else(|_| "[::1]:50051".to_string());
let addr_str = std::env::var("GRPC_ADDRESS").unwrap_or_else(|_| "[::1]:50051".to_string());

let server = Server::builder()
.add_service(ArbServiceServer::new(arb_service))
Expand All @@ -136,7 +188,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match std::fs::remove_file(uds_path) {
Ok(()) => info!(path = %uds_path, "Removed stale UDS socket"),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => tracing::warn!(path = %uds_path, error = %e, "Failed to remove stale UDS socket"),
Err(e) => {
tracing::warn!(path = %uds_path, error = %e, "Failed to remove stale UDS socket")
}
}

// Ensure parent directory exists.
Expand All @@ -148,10 +202,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Restrict socket access to the process owner — UDS bypasses
// network-layer controls (iptables, mTLS), so file permissions
// are the only access control for ControlService endpoints.
std::fs::set_permissions(
uds_path,
PermissionsExt::from_mode(0o600),
)?;
std::fs::set_permissions(uds_path, PermissionsExt::from_mode(0o600))?;
info!(path = %uds_path, "gRPC server listening on UDS");
let stream = UnixListenerStream::new(uds);
server.serve_with_incoming(stream).await.map_err(|e| {
Expand Down Expand Up @@ -196,6 +247,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
error!(error = %e, "Provider task panicked");
}

if let Some((source_handle, pipeline_handle)) = mempool_handles {
if let Err(e) = source_handle.await {
error!(error = %e, "Mempool source task panicked");
}
if let Err(e) = pipeline_handle.await {
error!(error = %e, "Mempool pipeline task panicked");
}
}

server_result?;

Ok(())
Expand Down
Loading