diff --git a/Cargo.lock b/Cargo.lock index 27692e3..c1dcde3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1034,6 +1034,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1590,6 +1601,7 @@ dependencies = [ "alloy-node-bindings", "anyhow", "async-trait", + "backon", "chrono", "serde", "serde_json", @@ -1839,6 +1851,18 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" diff --git a/Cargo.toml b/Cargo.toml index f5995d1..820629b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ tokio-stream = "0.1.17" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } hex = "0.4" +backon = "1.5.2" [package] name = "event-scanner" @@ -66,6 +67,7 @@ alloy-node-bindings.workspace = true tokio-stream.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +backon.workspace = true [lints] workspace = true diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index c541674..1fe78f5 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -63,7 +63,7 @@ //! } //! ``` -use std::{cmp::Ordering, ops::RangeInclusive}; +use std::{cmp::Ordering, ops::RangeInclusive, time::Duration}; use tokio::{ join, sync::{mpsc, oneshot}, @@ -72,6 +72,9 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ error::ScannerError, + safe_provider::{ + DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, SafeProvider, + }, types::{ScannerMessage, ScannerStatus}, }; use alloy::{ @@ -79,7 +82,7 @@ use alloy::{ eips::BlockNumberOrTag, network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::{B256, BlockNumber}, - providers::{Provider, RootProvider}, + providers::RootProvider, pubsub::Subscription, rpc::client::ClientBuilder, transports::{ @@ -115,6 +118,9 @@ impl PartialEq> for Message { #[derive(Clone, Copy)] pub struct BlockRangeScanner { pub max_block_range: u64, + pub max_timeout: Duration, + pub max_retries: usize, + pub retry_interval: Duration, } impl Default for BlockRangeScanner { @@ -126,7 +132,12 @@ impl Default for BlockRangeScanner { impl BlockRangeScanner { #[must_use] pub fn new() -> Self { - Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE } + Self { + max_block_range: DEFAULT_MAX_BLOCK_RANGE, + max_timeout: DEFAULT_MAX_TIMEOUT, + max_retries: DEFAULT_MAX_RETRIES, + retry_interval: DEFAULT_RETRY_INTERVAL, + } } #[must_use] @@ -135,6 +146,24 @@ impl BlockRangeScanner { self } + #[must_use] + pub fn with_max_timeout(mut self, rpc_timeout: Duration) -> Self { + self.max_timeout = rpc_timeout; + self + } + + #[must_use] + pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self { + self.max_retries = rpc_max_retries; + self + } + + #[must_use] + pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self { + self.retry_interval = rpc_retry_interval; + self + } + /// Connects to the provider via WebSocket /// /// # Errors @@ -169,19 +198,26 @@ impl BlockRangeScanner { /// Returns an error if the connection fails #[must_use] pub fn connect(self, provider: RootProvider) -> ConnectedBlockRangeScanner { - ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range } + let safe_provider = SafeProvider::new(provider) + .max_timeout(self.max_timeout) + .max_retries(self.max_retries) + .retry_interval(self.retry_interval); + ConnectedBlockRangeScanner { + provider: safe_provider, + max_block_range: self.max_block_range, + } } } pub struct ConnectedBlockRangeScanner { - provider: RootProvider, + provider: SafeProvider, max_block_range: u64, } impl ConnectedBlockRangeScanner { - /// Returns the underlying Provider. + /// Returns the `SafeProvider` #[must_use] - pub fn provider(&self) -> &RootProvider { + pub fn provider(&self) -> &SafeProvider { &self.provider } @@ -233,7 +269,7 @@ pub enum Command { } struct Service { - provider: RootProvider, + provider: SafeProvider, max_block_range: u64, subscriber: Option>, websocket_connected: bool, @@ -244,7 +280,7 @@ struct Service { } impl Service { - pub fn new(provider: RootProvider, max_block_range: u64) -> (Self, mpsc::Sender) { + pub fn new(provider: SafeProvider, max_block_range: u64) -> (Self, mpsc::Sender) { let (cmd_tx, cmd_rx) = mpsc::channel(100); let service = Self { @@ -640,9 +676,9 @@ impl Service { Ok(()) } - async fn stream_live_blocks>( + async fn stream_live_blocks( mut range_start: BlockNumber, - provider: P, + provider: SafeProvider, sender: mpsc::Sender, block_confirmations: u64, max_block_range: u64, @@ -747,7 +783,7 @@ impl Service { } async fn get_block_subscription( - provider: &impl Provider, + provider: &SafeProvider, ) -> Result, ScannerError> { let ws_stream = provider .subscribe_blocks() @@ -966,6 +1002,7 @@ impl BlockRangeScannerClient { #[cfg(test)] mod tests { + use alloy::providers::{Provider, RootProvider}; use std::time::Duration; use tokio::time::timeout; @@ -981,8 +1018,9 @@ mod tests { use tokio::sync::mpsc; use tokio_stream::StreamExt; - fn mocked_provider(asserter: Asserter) -> RootProvider { - RootProvider::new(RpcClient::mocked(asserter)) + fn mocked_provider(asserter: Asserter) -> SafeProvider { + let root_provider = RootProvider::new(RpcClient::mocked(asserter)); + SafeProvider::new(root_provider) } #[test] diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index a841f4f..b31c4de 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,10 +3,10 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, + safe_provider::SafeProvider, }; use alloy::{ network::Network, - providers::{Provider, RootProvider}, rpc::types::{Filter, Log}, transports::{RpcError, TransportErrorKind}, }; @@ -25,7 +25,7 @@ pub enum ConsumerMode { pub async fn handle_stream( mut stream: ReceiverStream, - provider: &RootProvider, + provider: &SafeProvider, listeners: &[EventListener], mode: ConsumerMode, ) { @@ -42,7 +42,7 @@ pub async fn handle_stream( } pub fn spawn_log_consumers( - provider: &RootProvider, + provider: &SafeProvider, listeners: &[EventListener], range_tx: &Sender, mode: ConsumerMode, @@ -129,7 +129,7 @@ async fn get_logs( range: RangeInclusive, event_filter: &EventFilter, log_filter: &Filter, - provider: &RootProvider, + provider: &SafeProvider, ) -> Result, RpcError> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); diff --git a/src/lib.rs b/src/lib.rs index a5f4456..28165f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod block_range_scanner; pub mod error; pub mod event_scanner; +mod safe_provider; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; pub mod types; diff --git a/src/safe_provider.rs b/src/safe_provider.rs new file mode 100644 index 0000000..4b31039 --- /dev/null +++ b/src/safe_provider.rs @@ -0,0 +1,290 @@ +use std::{future::Future, time::Duration}; + +use alloy::{ + eips::BlockNumberOrTag, + network::Network, + providers::{Provider, RootProvider}, + pubsub::Subscription, + rpc::types::{Filter, Log}, + transports::{RpcError, TransportErrorKind}, +}; +use backon::{ExponentialBuilder, Retryable}; +use tracing::{error, info}; + +/// Safe provider wrapper with built-in retry and timeout mechanisms. +/// +/// This wrapper around Alloy providers automatically handles retries, +/// timeouts, and error logging for RPC calls. +#[derive(Clone)] +pub struct SafeProvider { + provider: RootProvider, + max_timeout: Duration, + max_retries: usize, + retry_interval: Duration, +} + +// RPC retry and timeout settings +/// Default timeout used by `SafeProvider` +pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); +/// Default maximum number of retry attempts. +pub const DEFAULT_MAX_RETRIES: usize = 5; +/// Default base delay between retries. +pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); + +impl SafeProvider { + /// Create a new `SafeProvider` with default settings. + #[must_use] + pub fn new(provider: RootProvider) -> Self { + Self { + provider, + max_timeout: DEFAULT_MAX_TIMEOUT, + max_retries: DEFAULT_MAX_RETRIES, + retry_interval: DEFAULT_RETRY_INTERVAL, + } + } + + #[must_use] + pub fn max_timeout(mut self, timeout: Duration) -> Self { + self.max_timeout = timeout; + self + } + + #[must_use] + pub fn max_retries(mut self, max_retries: usize) -> Self { + self.max_retries = max_retries; + self + } + + #[must_use] + pub fn retry_interval(mut self, retry_interval: Duration) -> Self { + self.retry_interval = retry_interval; + self + } + + /// Fetch a block by number with retry and timeout. + /// + /// # Errors + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. + pub async fn get_block_by_number( + &self, + number: BlockNumberOrTag, + ) -> Result, RpcError> { + info!("eth_getBlockByNumber called"); + let provider = self.provider.clone(); + let result = self + .retry_with_total_timeout(|| async { provider.get_block_by_number(number).await }) + .await; + if let Err(e) = &result { + error!(error = %e, "eth_getByBlockNumber failed"); + } + result + } + + /// Fetch the latest block number with retry and timeout. + /// + /// # Errors + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. + pub async fn get_block_number(&self) -> Result> { + info!("eth_getBlockNumber called"); + let operation = || self.provider.get_block_number(); + let result = self.retry_with_total_timeout(operation).await; + if let Err(e) = &result { + error!(error = %e, "eth_getBlockNumber failed"); + } + result + } + + /// Fetch a block by hash with retry and timeout. + /// + /// # Errors + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. + pub async fn get_block_by_hash( + &self, + hash: alloy::primitives::BlockHash, + ) -> Result, RpcError> { + info!("eth_getBlockByHash called"); + let provider = self.provider.clone(); + let result = self + .retry_with_total_timeout(|| async { provider.get_block_by_hash(hash).await }) + .await; + if let Err(e) = &result { + error!(error = %e, "eth_getBlockByHash failed"); + } + result + } + + /// Fetch logs for the given filter with retry and timeout. + /// + /// # Errors + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. + pub async fn get_logs( + &self, + filter: &Filter, + ) -> Result, RpcError> { + info!("eth_getLogs called"); + let provider = self.provider.clone(); + let result = + self.retry_with_total_timeout(|| async { provider.get_logs(filter).await }).await; + if let Err(e) = &result { + error!(error = %e, "eth_getLogs failed"); + } + result + } + + /// Subscribe to new block headers with retry and timeout. + /// + /// # Errors + /// + /// Returns an error if RPC call fails repeatedly even + /// after exhausting retries or if the call times out. + pub async fn subscribe_blocks( + &self, + ) -> Result, RpcError> { + info!("eth_subscribe called"); + let provider = self.provider.clone(); + let result = + self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; + if let Err(e) = &result { + error!(error = %e, "eth_subscribe failed"); + } + result + } + + /// Execute `operation` with exponential backoff and a total timeout. + /// + /// Wraps the retry logic with `tokio::time::timeout(self.max_timeout, ...)` so + /// the entire operation (including time spent inside the RPC call) cannot exceed + /// `max_timeout`. + /// + /// # Errors + /// + /// - Returns [`RpcError`] with message "total operation timeout exceeded" + /// if the overall timeout elapses. + /// - Propagates any [`RpcError`] from the underlying retries. + async fn retry_with_total_timeout( + &self, + operation: F, + ) -> Result> + where + F: Fn() -> Fut, + Fut: Future>>, + { + let retry_strategy = ExponentialBuilder::default() + .with_max_times(self.max_retries) + .with_min_delay(self.retry_interval); + + match tokio::time::timeout( + self.max_timeout, + operation.retry(retry_strategy).sleep(tokio::time::sleep), + ) + .await + { + Ok(res) => res, + Err(_) => Err(TransportErrorKind::custom_str("total operation timeout exceeded")), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::network::Ethereum; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::time::sleep; + + fn test_provider( + timeout: u64, + max_retries: usize, + retry_interval: u64, + ) -> SafeProvider { + SafeProvider { + provider: RootProvider::new_http("http://localhost:8545".parse().unwrap()), + max_timeout: Duration::from_millis(timeout), + max_retries, + retry_interval: Duration::from_millis(retry_interval), + } + } + + #[tokio::test] + async fn test_retry_with_timeout_succeeds_on_first_attempt() { + let provider = test_provider(100, 3, 10); + + let call_count = AtomicUsize::new(0); + + let result = provider + .retry_with_total_timeout(|| async { + call_count.fetch_add(1, Ordering::SeqCst); + Ok(42) + }) + .await; + + assert!(matches!(result, Ok(42))); + assert_eq!(call_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_retry_with_timeout_retries_on_error() { + let provider = test_provider(100, 3, 10); + + let call_count = AtomicUsize::new(0); + + let result = provider + .retry_with_total_timeout(|| async { + call_count.fetch_add(1, Ordering::SeqCst); + if call_count.load(Ordering::SeqCst) < 3 { + Err(TransportErrorKind::custom_str("temporary error")) + } else { + Ok(42) + } + }) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + assert_eq!(call_count.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_retry_with_timeout_fails_after_max_retries() { + let provider = test_provider(100, 2, 10); + + let call_count = AtomicUsize::new(0); + + let result = provider + .retry_with_total_timeout(|| async { + call_count.fetch_add(1, Ordering::SeqCst); + Err::>(TransportErrorKind::custom_str( + "permanent error", + )) + }) + .await; + + let err = result.unwrap_err(); + assert!(err.to_string().contains("permanent error"),); + assert_eq!(call_count.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_retry_with_timeout_respects_total_delay() { + let max_timeout = 50; + let provider = test_provider(max_timeout, 10, 1); + + let result = provider + .retry_with_total_timeout(move || async move { + sleep(Duration::from_millis(max_timeout + 10)).await; + Ok(42) + }) + .await; + + let err = result.unwrap_err(); + assert!(err.to_string().contains("total operation timeout exceeded"),); + } +}