Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions crates/grpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use prometheus::{Encoder, Histogram, HistogramOpts, IntCounter, Registry, TextEncoder};
use prometheus::{
Encoder, Histogram, HistogramOpts, IntCounter, IntCounterVec, Opts, Registry, TextEncoder,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tracing::{info, warn};
Expand All @@ -15,7 +17,7 @@ pub struct EngineMetrics {
simulations_run: IntCounter,
arbs_published: IntCounter,
blocks_processed: IntCounter,
decode_errors: IntCounter,
decode_errors: IntCounterVec,
}

impl EngineMetrics {
Expand Down Expand Up @@ -58,11 +60,14 @@ impl EngineMetrics {
"Total blocks processed",
)
.expect("aether_blocks_processed_total counter");
let decode_errors = IntCounter::new(
"aether_decode_errors_total",
"Total logs the event decoder could not parse",
let decode_errors = IntCounterVec::new(
Opts::new(
"aether_decode_errors_total",
"Total logs the event decoder could not parse, labelled by reason",
),
&["reason"],
)
.expect("aether_decode_errors_total counter");
.expect("aether_decode_errors_total counter vec");

registry
.register(Box::new(detection_latency_ms.clone()))
Expand Down Expand Up @@ -130,8 +135,11 @@ impl EngineMetrics {
self.blocks_processed.inc();
}

pub fn inc_decode_errors(&self) {
self.decode_errors.inc();
/// Bump `aether_decode_errors_total{reason="..."}` for the given reason.
/// Labels come from `DecodeReason::as_str()` so the label set stays
/// stable and enumerable for dashboards / alerts.
pub fn inc_decode_errors(&self, reason: &str) {
self.decode_errors.with_label_values(&[reason]).inc();
}

/// Render the registered metrics in Prometheus text exposition format.
Expand Down Expand Up @@ -247,7 +255,9 @@ mod tests {
metrics.inc_simulations_run(3);
metrics.inc_arbs_published(4);
metrics.inc_blocks_processed();
metrics.inc_decode_errors();
metrics.inc_decode_errors("unknown_topic");
metrics.inc_decode_errors("malformed_payload");
metrics.inc_decode_errors("insufficient_topics");

let output = String::from_utf8(metrics.render()).expect("metrics output utf-8");

Expand All @@ -272,6 +282,8 @@ mod tests {
assert!(output.contains("aether_simulations_run_total 3"));
assert!(output.contains("aether_arbs_published_total 4"));
assert!(output.contains("aether_blocks_processed_total 1"));
assert!(output.contains("aether_decode_errors_total 1"));
assert!(output.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 1"#));
assert!(output.contains(r#"aether_decode_errors_total{reason="malformed_payload"} 1"#));
assert!(output.contains(r#"aether_decode_errors_total{reason="insufficient_topics"} 1"#));
}
}
111 changes: 89 additions & 22 deletions crates/grpc-server/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ impl RpcProvider {
let topics = log.topics();
let data = &log.data().data;
match event_decoder::decode_log(topics, data, address, None) {
Some(event) => self.event_channels.dispatch_pool_update(event),
None => self.record_decode_failure(address, topics),
Ok(event) => self.event_channels.dispatch_pool_update(event),
Err(reason) => self.record_decode_failure(address, topics, reason),
}
}

Expand All @@ -469,33 +469,40 @@ impl RpcProvider {
pub fn process_logs(&self, logs: &[(Address, Vec<B256>, Vec<u8>)]) {
for (address, topics, data) in logs {
match event_decoder::decode_log(topics, data, *address, None) {
Some(event) => self.event_channels.dispatch_pool_update(event),
None => self.record_decode_failure(*address, topics),
Ok(event) => self.event_channels.dispatch_pool_update(event),
Err(reason) => self.record_decode_failure(*address, topics, reason),
}
}
}

/// Surface a decoder drop to operators. Bumps `aether_decode_errors_total`
/// (the primary ops signal — it's a monotonic counter wired to alerting)
/// and emits a `trace!` with the offending pool address and first topic
/// for triage.
/// Surface a decoder drop to operators. Bumps
/// `aether_decode_errors_total{reason="..."}` (the primary ops signal —
/// a labelled counter wired to alerting) and emits a `trace!` with the
/// offending pool address, first topic, and reason for triage.
///
/// The per-event log is deliberately `trace!`, not `warn!`: in discovery
/// mode (`monitored_pools = []`) every unmatched log on mainnet — tens
/// of thousands per block — lands here, and a `warn!` would swamp Loki.
/// Operators should watch the counter; the trace line exists only for
/// targeted debugging when someone actively raises `RUST_LOG`.
/// of thousands per block — lands here as `unknown_topic`, and a `warn!`
/// would swamp Loki. Operators should watch the per-reason counter;
/// `malformed_payload` / `insufficient_topics` spikes are the real
/// data-integrity signals worth paging on.
///
/// Called from the hot path, so it must be cheap — the counter is a
/// single atomic increment and `trace!` is compiled to a tiny level
/// check at the disabled level.
fn record_decode_failure(&self, address: Address, topics: &[B256]) {
self.metrics.inc_decode_errors();
fn record_decode_failure(
&self,
address: Address,
topics: &[B256],
reason: event_decoder::DecodeReason,
) {
self.metrics.inc_decode_errors(reason.as_str());
let topic0 = topics.first().copied().unwrap_or_default();
trace!(
pool = %address,
%topic0,
"Event decoder returned None; log skipped"
reason = reason.as_str(),
"Event decoder drop"
);
}

Expand Down Expand Up @@ -641,9 +648,10 @@ mod tests {
}

/// End-to-end check that a dropped log actually moves the
/// `aether_decode_errors_total` counter. The unit test in `metrics.rs`
/// only exercises `inc_decode_errors()` directly; this one proves the
/// real call path through `process_logs → record_decode_failure →
/// `aether_decode_errors_total` counter and picks the correct
/// `reason` label. The unit test in `metrics.rs` only exercises
/// `inc_decode_errors()` directly; this one proves the real call path
/// through `process_logs → record_decode_failure →
/// metrics.inc_decode_errors` is wired correctly end-to-end.
#[test]
fn test_process_logs_decode_failure_increments_counter() {
Expand All @@ -656,7 +664,7 @@ mod tests {
};
let provider = RpcProvider::new(config, channels, Arc::clone(&metrics));

// Unknown topic0 → decode_log returns None → counter bumps by 1.
// Unknown topic0 → DecodeReason::UnknownTopic → counter bumps by 1.
let unknown_topic = B256::repeat_byte(0xFF);
provider.process_logs(&[(
Address::ZERO,
Expand All @@ -666,8 +674,8 @@ mod tests {

let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8");
assert!(
rendered.contains("aether_decode_errors_total 1"),
"expected counter at 1, got: {rendered}"
rendered.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 1"#),
"expected unknown_topic counter at 1, got: {rendered}"
);

// Second drop should advance the counter, not reset it.
Expand All @@ -678,8 +686,67 @@ mod tests {
)]);
let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8");
assert!(
rendered.contains("aether_decode_errors_total 2"),
"expected counter at 2 after second drop, got: {rendered}"
rendered.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 2"#),
"expected unknown_topic counter at 2 after second drop, got: {rendered}"
);
}

/// Malformed payload on a known event signature must bump
/// `aether_decode_errors_total{reason="malformed_payload"}`, NOT the
/// unknown_topic series.
#[test]
fn test_process_logs_malformed_payload_reason_label() {
let channels = Arc::new(EventChannels::new());
let metrics = Arc::new(EngineMetrics::new());

let config = ProviderConfig {
rpc_url: "http://localhost:8545".to_string(),
..ProviderConfig::default()
};
let provider = RpcProvider::new(config, channels, Arc::clone(&metrics));

// Sync needs 64 bytes; give it 32.
provider.process_logs(&[(
Address::ZERO,
vec![EventSignatures::sync_topic()],
vec![0u8; 32],
)]);

let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8");
assert!(
rendered.contains(r#"aether_decode_errors_total{reason="malformed_payload"} 1"#),
"expected malformed_payload counter at 1, got: {rendered}"
);
assert!(
!rendered.contains(r#"aether_decode_errors_total{reason="unknown_topic"} 1"#),
"unknown_topic counter must not be touched by malformed payload"
);
}

/// Too few topics on a known event signature must bump
/// `aether_decode_errors_total{reason="insufficient_topics"}`.
#[test]
fn test_process_logs_insufficient_topics_reason_label() {
let channels = Arc::new(EventChannels::new());
let metrics = Arc::new(EngineMetrics::new());

let config = ProviderConfig {
rpc_url: "http://localhost:8545".to_string(),
..ProviderConfig::default()
};
let provider = RpcProvider::new(config, channels, Arc::clone(&metrics));

// PairCreated requires 3 topics; give it 2.
provider.process_logs(&[(
Address::ZERO,
vec![EventSignatures::pair_created_topic(), B256::ZERO],
vec![0u8; 64],
)]);

let rendered = String::from_utf8(metrics.render()).expect("metrics utf-8");
assert!(
rendered.contains(r#"aether_decode_errors_total{reason="insufficient_topics"} 1"#),
"expected insufficient_topics counter at 1, got: {rendered}"
);
}

Expand Down
Loading
Loading