From 198c02d71ac3b782d9499f981006bc13d8b8a08d Mon Sep 17 00:00:00 2001 From: 0xfandom Date: Thu, 23 Apr 2026 15:35:55 +0530 Subject: [PATCH] feat(metrics): split aether_decode_errors_total by reason Replace single IntCounter with IntCounterVec labelled by reason (unknown_topic / malformed_payload / insufficient_topics). decode_log now returns Result so the reason propagates to record_decode_failure. --- crates/grpc-server/src/metrics.rs | 32 ++-- crates/grpc-server/src/provider.rs | 111 +++++++++--- crates/ingestion/src/event_decoder.rs | 161 +++++++++++++----- .../tests/anvil_fork_test.rs | 6 +- 4 files changed, 235 insertions(+), 75 deletions(-) diff --git a/crates/grpc-server/src/metrics.rs b/crates/grpc-server/src/metrics.rs index bcf4afc..22143e7 100644 --- a/crates/grpc-server/src/metrics.rs +++ b/crates/grpc-server/src/metrics.rs @@ -2,7 +2,9 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Registry, TextEncoder}; +use prometheus::{ + Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, Opts, Registry, TextEncoder, +}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tracing::{info, warn}; @@ -15,7 +17,7 @@ pub struct EngineMetrics { simulations_run: IntCounter, arbs_published: IntCounter, blocks_processed: IntCounter, - decode_errors: IntCounter, + decode_errors: IntCounterVec, } impl EngineMetrics { @@ -58,11 +60,14 @@ impl EngineMetrics { "Total blocks processed", ) .expect("aether_blocks_processed_total counter"); - let decode_errors = IntCounter::new( - "aether_decode_errors_total", - "Total logs the event decoder could not parse", + let decode_errors = IntCounterVec::new( + Opts::new( + "aether_decode_errors_total", + "Total logs the event decoder could not parse, labelled by reason", + ), + &["reason"], ) - .expect("aether_decode_errors_total counter"); + .expect("aether_decode_errors_total counter vec"); registry .register(Box::new(detection_latency_ms.clone())) @@ -130,8 +135,11 @@ impl EngineMetrics { self.blocks_processed.inc(); } - pub fn inc_decode_errors(&self) { - self.decode_errors.inc(); + /// Bump `aether_decode_errors_total{reason="..."}` for the given reason. + /// Labels come from `DecodeReason::as_str()` so the label set stays + /// stable and enumerable for dashboards / alerts. + pub fn inc_decode_errors(&self, reason: &str) { + self.decode_errors.with_label_values(&[reason]).inc(); } /// Render the registered metrics in Prometheus text exposition format. @@ -247,7 +255,9 @@ mod tests { metrics.inc_simulations_run(3); metrics.inc_arbs_published(4); metrics.inc_blocks_processed(); - metrics.inc_decode_errors(); + metrics.inc_decode_errors("unknown_topic"); + metrics.inc_decode_errors("malformed_payload"); + metrics.inc_decode_errors("insufficient_topics"); let output = String::from_utf8(metrics.render()).expect("metrics output utf-8"); @@ -272,6 +282,8 @@ mod tests { assert!(output.contains("aether_simulations_run_total 3")); assert!(output.contains("aether_arbs_published_total 4")); assert!(output.contains("aether_blocks_processed_total 1")); - assert!(output.contains("aether_decode_errors_total 1")); + assert!(output.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 1"#)); + assert!(output.contains(r#"aether_decode_errors_total{reason="malformed_payload"} 1"#)); + assert!(output.contains(r#"aether_decode_errors_total{reason="insufficient_topics"} 1"#)); } } diff --git a/crates/grpc-server/src/provider.rs b/crates/grpc-server/src/provider.rs index f06d836..7b43f1b 100644 --- a/crates/grpc-server/src/provider.rs +++ b/crates/grpc-server/src/provider.rs @@ -450,8 +450,8 @@ impl RpcProvider { let topics = log.topics(); let data = &log.data().data; match event_decoder::decode_log(topics, data, address, None) { - Some(event) => self.event_channels.dispatch_pool_update(event), - None => self.record_decode_failure(address, topics), + Ok(event) => self.event_channels.dispatch_pool_update(event), + Err(reason) => self.record_decode_failure(address, topics, reason), } } @@ -469,33 +469,40 @@ impl RpcProvider { pub fn process_logs(&self, logs: &[(Address, Vec, Vec)]) { for (address, topics, data) in logs { match event_decoder::decode_log(topics, data, *address, None) { - Some(event) => self.event_channels.dispatch_pool_update(event), - None => self.record_decode_failure(*address, topics), + Ok(event) => self.event_channels.dispatch_pool_update(event), + Err(reason) => self.record_decode_failure(*address, topics, reason), } } } - /// Surface a decoder drop to operators. Bumps `aether_decode_errors_total` - /// (the primary ops signal — it's a monotonic counter wired to alerting) - /// and emits a `trace!` with the offending pool address and first topic - /// for triage. + /// Surface a decoder drop to operators. Bumps + /// `aether_decode_errors_total{reason="..."}` (the primary ops signal — + /// a labelled counter wired to alerting) and emits a `trace!` with the + /// offending pool address, first topic, and reason for triage. /// /// The per-event log is deliberately `trace!`, not `warn!`: in discovery /// mode (`monitored_pools = []`) every unmatched log on mainnet — tens - /// of thousands per block — lands here, and a `warn!` would swamp Loki. - /// Operators should watch the counter; the trace line exists only for - /// targeted debugging when someone actively raises `RUST_LOG`. + /// of thousands per block — lands here as `unknown_topic`, and a `warn!` + /// would swamp Loki. Operators should watch the per-reason counter; + /// `malformed_payload` / `insufficient_topics` spikes are the real + /// data-integrity signals worth paging on. /// /// Called from the hot path, so it must be cheap — the counter is a /// single atomic increment and `trace!` is compiled to a tiny level /// check at the disabled level. - fn record_decode_failure(&self, address: Address, topics: &[B256]) { - self.metrics.inc_decode_errors(); + fn record_decode_failure( + &self, + address: Address, + topics: &[B256], + reason: event_decoder::DecodeReason, + ) { + self.metrics.inc_decode_errors(reason.as_str()); let topic0 = topics.first().copied().unwrap_or_default(); trace!( pool = %address, %topic0, - "Event decoder returned None; log skipped" + reason = reason.as_str(), + "Event decoder drop" ); } @@ -641,9 +648,10 @@ mod tests { } /// End-to-end check that a dropped log actually moves the - /// `aether_decode_errors_total` counter. The unit test in `metrics.rs` - /// only exercises `inc_decode_errors()` directly; this one proves the - /// real call path through `process_logs → record_decode_failure → + /// `aether_decode_errors_total` counter and picks the correct + /// `reason` label. The unit test in `metrics.rs` only exercises + /// `inc_decode_errors()` directly; this one proves the real call path + /// through `process_logs → record_decode_failure → /// metrics.inc_decode_errors` is wired correctly end-to-end. #[test] fn test_process_logs_decode_failure_increments_counter() { @@ -656,7 +664,7 @@ mod tests { }; let provider = RpcProvider::new(config, channels, Arc::clone(&metrics)); - // Unknown topic0 → decode_log returns None → counter bumps by 1. + // Unknown topic0 → DecodeReason::UnknownTopic → counter bumps by 1. let unknown_topic = B256::repeat_byte(0xFF); provider.process_logs(&[( Address::ZERO, @@ -666,8 +674,8 @@ mod tests { let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8"); assert!( - rendered.contains("aether_decode_errors_total 1"), - "expected counter at 1, got: {rendered}" + rendered.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 1"#), + "expected unknown_topic counter at 1, got: {rendered}" ); // Second drop should advance the counter, not reset it. @@ -678,8 +686,67 @@ mod tests { )]); let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8"); assert!( - rendered.contains("aether_decode_errors_total 2"), - "expected counter at 2 after second drop, got: {rendered}" + rendered.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 2"#), + "expected unknown_topic counter at 2 after second drop, got: {rendered}" + ); + } + + /// Malformed payload on a known event signature must bump + /// `aether_decode_errors_total{reason="malformed_payload"}`, NOT the + /// unknown_topic series. + #[test] + fn test_process_logs_malformed_payload_reason_label() { + let channels = Arc::new(EventChannels::new()); + let metrics = Arc::new(EngineMetrics::new()); + + let config = ProviderConfig { + rpc_url: "http://localhost:8545".to_string(), + ..ProviderConfig::default() + }; + let provider = RpcProvider::new(config, channels, Arc::clone(&metrics)); + + // Sync needs 64 bytes; give it 32. + provider.process_logs(&[( + Address::ZERO, + vec![EventSignatures::sync_topic()], + vec![0u8; 32], + )]); + + let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8"); + assert!( + rendered.contains(r#"aether_decode_errors_total{reason="malformed_payload"} 1"#), + "expected malformed_payload counter at 1, got: {rendered}" + ); + assert!( + !rendered.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 1"#), + "unknown_topic counter must not be touched by malformed payload" + ); + } + + /// Too few topics on a known event signature must bump + /// `aether_decode_errors_total{reason="insufficient_topics"}`. + #[test] + fn test_process_logs_insufficient_topics_reason_label() { + let channels = Arc::new(EventChannels::new()); + let metrics = Arc::new(EngineMetrics::new()); + + let config = ProviderConfig { + rpc_url: "http://localhost:8545".to_string(), + ..ProviderConfig::default() + }; + let provider = RpcProvider::new(config, channels, Arc::clone(&metrics)); + + // PairCreated requires 3 topics; give it 2. + provider.process_logs(&[( + Address::ZERO, + vec![EventSignatures::pair_created_topic(), B256::ZERO], + vec![0u8; 64], + )]); + + let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8"); + assert!( + rendered.contains(r#"aether_decode_errors_total{reason="insufficient_topics"} 1"#), + "expected insufficient_topics counter at 1, got: {rendered}" ); } diff --git a/crates/ingestion/src/event_decoder.rs b/crates/ingestion/src/event_decoder.rs index 643e75e..a7621f7 100644 --- a/crates/ingestion/src/event_decoder.rs +++ b/crates/ingestion/src/event_decoder.rs @@ -110,16 +110,45 @@ impl EventSignatures { } } -/// Decode a raw log into a PoolEvent -/// Returns None if the log doesn't match any known event +/// Reason a log failed to decode. Surfaced as the `reason` label on +/// `aether_decode_errors_total` so ops can alert on malformed payloads +/// (real bug) without drowning in benign unknown-topic noise. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DecodeReason { + /// topic0 didn't match any known event signature (or topics was empty). + /// Expected to be high-volume in discovery mode. + UnknownTopic, + /// Payload length check failed in a known-event decoder — data-integrity + /// signal worth paging on. + MalformedPayload, + /// Fewer topics than the known-event decoder requires (e.g. PairCreated + /// short-path). Indicates an upstream / node-side producer bug. + InsufficientTopics, +} + +impl DecodeReason { + /// Stable label value for Prometheus. + pub fn as_str(self) -> &'static str { + match self { + DecodeReason::UnknownTopic => "unknown_topic", + DecodeReason::MalformedPayload => "malformed_payload", + DecodeReason::InsufficientTopics => "insufficient_topics", + } + } +} + +/// Decode a raw log into a PoolEvent. +/// +/// On failure the `DecodeReason` is propagated so callers can bump the +/// appropriate Prometheus label — see `aether_decode_errors_total`. pub fn decode_log( topics: &[B256], data: &[u8], source_address: Address, protocol_hint: Option, -) -> Option { +) -> Result { if topics.is_empty() { - return None; + return Err(DecodeReason::UnknownTopic); } let topic0 = topics[0]; @@ -135,18 +164,21 @@ pub fn decode_log( } else if topic0 == EventSignatures::pair_created_topic() { decode_pair_created(topics, data) } else { - None + Err(DecodeReason::UnknownTopic) } } -fn decode_swap_v2(topics: &[B256], data: &[u8], pool: Address) -> Option { +fn decode_swap_v2(topics: &[B256], data: &[u8], pool: Address) -> Result { // V2 Swap(address indexed sender, uint256 amount0In, uint256 amount1In, // uint256 amount0Out, uint256 amount1Out, address indexed to) // // topics: [topic0, sender (indexed), to (indexed)] // data: 4 × 32-byte words — amount0In | amount1In | amount0Out | amount1Out - if topics.len() < 3 || data.len() < 128 { - return None; + if topics.len() < 3 { + return Err(DecodeReason::InsufficientTopics); + } + if data.len() < 128 { + return Err(DecodeReason::MalformedPayload); } let sender = Address::from_slice(&topics[1].as_slice()[12..]); @@ -163,7 +195,7 @@ fn decode_swap_v2(topics: &[B256], data: &[u8], pool: Address) -> Option, -) -> Option { +) -> Result { if data.len() < 64 { - return None; + return Err(DecodeReason::MalformedPayload); } let reserve0 = U256::from_be_slice(&data[0..32]); let reserve1 = U256::from_be_slice(&data[32..64]); @@ -188,7 +220,7 @@ fn decode_sync( trace!(pool = %pool, r0 = %reserve0, r1 = %reserve1, "Sync event decoded"); - Some(PoolEvent::ReserveUpdate { + Ok(PoolEvent::ReserveUpdate { pool, protocol, reserve0, @@ -196,9 +228,9 @@ fn decode_sync( }) } -fn decode_swap_v3(topics: &[B256], data: &[u8], pool: Address) -> Option { +fn decode_swap_v3(topics: &[B256], data: &[u8], pool: Address) -> Result { if data.len() < 160 { - return None; + return Err(DecodeReason::MalformedPayload); } // amount0: int256 (bytes 0-32) // amount1: int256 (bytes 32-64) @@ -230,7 +262,7 @@ fn decode_swap_v3(topics: &[B256], data: &[u8], pool: Address) -> Option Option { +) -> Result { // Curve events update reserves; we'd need to query on-chain for new balances // For now, emit a generic reserve update that triggers a state refresh let _ = topics; - Some(PoolEvent::ReserveUpdate { + Ok(PoolEvent::ReserveUpdate { pool, protocol: ProtocolType::Curve, reserve0: U256::ZERO, // Will be refreshed from on-chain @@ -254,15 +286,18 @@ fn decode_token_exchange( }) } -fn decode_pair_created(topics: &[B256], data: &[u8]) -> Option { - if topics.len() < 3 || data.len() < 64 { - return None; +fn decode_pair_created(topics: &[B256], data: &[u8]) -> Result { + if topics.len() < 3 { + return Err(DecodeReason::InsufficientTopics); + } + if data.len() < 64 { + return Err(DecodeReason::MalformedPayload); } let token0 = Address::from_slice(&topics[1].as_slice()[12..]); let token1 = Address::from_slice(&topics[2].as_slice()[12..]); let pool = Address::from_slice(&data[12..32]); - Some(PoolEvent::PoolCreated { + Ok(PoolEvent::PoolCreated { token0, token1, pool, @@ -316,7 +351,7 @@ mod tests { let topics = vec![EventSignatures::sync_topic()]; let event = decode_log(&topics, &data, pool_addr, None); - assert!(event.is_some()); + assert!(event.is_ok()); let got = event.unwrap(); let PoolEvent::ReserveUpdate { @@ -360,7 +395,7 @@ mod tests { // Only 32 bytes instead of 64 let data = vec![0u8; 32]; let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_none()); + assert!(event.is_err()); } // ── V2 Swap event decode tests ── @@ -397,7 +432,7 @@ mod tests { data.extend_from_slice(&u256_to_be_bytes(amount1_out)); let event = decode_log(&topics, &data, pool_addr, None); - assert!(event.is_some(), "V2 Swap must decode, not fall through"); + assert!(event.is_ok(), "V2 Swap must decode, not fall through"); match event.unwrap() { PoolEvent::V2Swap { @@ -429,7 +464,10 @@ mod tests { // missing `to` topic ]; let data = vec![0u8; 128]; - assert!(decode_log(&topics, &data, Address::ZERO, None).is_none()); + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::InsufficientTopics + ); } #[test] @@ -441,7 +479,10 @@ mod tests { ]; // 96 bytes instead of 128 let data = vec![0u8; 96]; - assert!(decode_log(&topics, &data, Address::ZERO, None).is_none()); + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::MalformedPayload + ); } // ── V3 Swap event decode tests ── @@ -482,7 +523,7 @@ mod tests { ]; let event = decode_log(&topics, &data, pool_addr, None); - assert!(event.is_some()); + assert!(event.is_ok()); let got = event.unwrap(); let PoolEvent::V3Update { @@ -540,7 +581,7 @@ mod tests { // Only 128 bytes instead of 160 let data = vec![0u8; 128]; let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_none()); + assert!(event.is_err()); } // ── TokenExchange (Curve) decode tests ── @@ -555,7 +596,7 @@ mod tests { let data = vec![0u8; 128]; // sold_id, tokens_sold, bought_id, tokens_bought let event = decode_log(&topics, &data, pool_addr, None); - assert!(event.is_some()); + assert!(event.is_ok()); let got = event.unwrap(); let PoolEvent::ReserveUpdate { @@ -604,7 +645,7 @@ mod tests { ]; let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_some()); + assert!(event.is_ok()); let got = event.unwrap(); let PoolEvent::PoolCreated { @@ -628,8 +669,10 @@ mod tests { // Missing third topic ]; let data = vec![0u8; 64]; - let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_none()); + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::InsufficientTopics + ); } #[test] @@ -641,28 +684,66 @@ mod tests { ]; // Only 32 bytes instead of 64 let data = vec![0u8; 32]; - let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_none()); + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::MalformedPayload + ); } // ── Unknown event tests ── #[test] - fn test_decode_unknown_event_returns_none() { + fn test_decode_unknown_event_returns_unknown_topic() { let unknown_topic = B256::from([0xABu8; 32]); let topics = vec![unknown_topic]; let data = vec![0u8; 64]; - let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_none()); + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::UnknownTopic + ); } #[test] - fn test_decode_empty_topics_returns_none() { + fn test_decode_empty_topics_returns_unknown_topic() { let topics: Vec = vec![]; let data = vec![0u8; 64]; - let event = decode_log(&topics, &data, Address::ZERO, None); - assert!(event.is_none()); + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::UnknownTopic + ); + } + + #[test] + fn test_decode_sync_malformed_payload() { + // Sync requires 64-byte payload (2 × U256). Give 32 bytes. + let topics = vec![EventSignatures::sync_topic()]; + let data = vec![0u8; 32]; + + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::MalformedPayload + ); + } + + #[test] + fn test_decode_v3_swap_malformed_payload() { + let topics = vec![EventSignatures::swap_v3_topic()]; + let data = vec![0u8; 96]; // V3 needs 160 bytes + + assert_eq!( + decode_log(&topics, &data, Address::ZERO, None).unwrap_err(), + DecodeReason::MalformedPayload + ); + } + + #[test] + fn test_decode_reason_label_strings() { + // Guard the Prometheus label contract — these strings are baked into + // dashboards and alerts. + assert_eq!(DecodeReason::UnknownTopic.as_str(), "unknown_topic"); + assert_eq!(DecodeReason::MalformedPayload.as_str(), "malformed_payload"); + assert_eq!(DecodeReason::InsufficientTopics.as_str(), "insufficient_topics"); } } diff --git a/crates/integration-tests/tests/anvil_fork_test.rs b/crates/integration-tests/tests/anvil_fork_test.rs index 830b81a..5d9d480 100644 --- a/crates/integration-tests/tests/anvil_fork_test.rs +++ b/crates/integration-tests/tests/anvil_fork_test.rs @@ -201,7 +201,7 @@ async fn test_anvil_fork_log_fetch_and_decode() { for log in &logs { let topics = log.topics().to_vec(); let data = log.data().data.to_vec(); - if event_decoder::decode_log(&topics, &data, log.address(), None).is_some() { + if event_decoder::decode_log(&topics, &data, log.address(), None).is_ok() { decoded_count += 1; } } @@ -279,7 +279,7 @@ async fn test_anvil_fork_full_pipeline() { .collect(); for (address, topics, data) in &raw_logs { - if let Some(event) = event_decoder::decode_log(topics, data, *address, None) { + if let Ok(event) = event_decoder::decode_log(topics, data, *address, None) { channels.dispatch_pool_update(event); } } @@ -299,7 +299,7 @@ async fn test_anvil_fork_full_pipeline() { let mut pools_seen = std::collections::HashSet::new(); for (address, topics, data) in &raw_logs { - if let Some(event_decoder::PoolEvent::ReserveUpdate { + if let Ok(event_decoder::PoolEvent::ReserveUpdate { pool, reserve0, reserve1,