Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5f92f5c
feat: add backon library
LeoPatOZ Oct 16, 2025
2ad6516
feat: add provider wrapper
LeoPatOZ Oct 16, 2025
41824f0
Merge branch 'main' into retry-logic
LeoPatOZ Oct 16, 2025
22c2448
feat: use internal
LeoPatOZ Oct 16, 2025
d738ab2
feat: add get logs to safe provider
LeoPatOZ Oct 16, 2025
d068c8e
chore: made var public
LeoPatOZ Oct 16, 2025
d1e12e5
feat: added safe provider conifgs to block range scanner
LeoPatOZ Oct 16, 2025
0e9aa3b
feat: use safe provider in event scanner
LeoPatOZ Oct 16, 2025
319940a
feat: undo safe provider errors
LeoPatOZ Oct 16, 2025
65527b8
feat: implement stream block
LeoPatOZ Oct 20, 2025
b5bf428
test: add basic testing to safe provider
LeoPatOZ Oct 20, 2025
6241e83
chore: delete other constants
LeoPatOZ Oct 20, 2025
e24a6f7
Merge branch 'main' into retry-logic
LeoPatOZ Oct 20, 2025
79eaaae
fix: fmt
LeoPatOZ Oct 20, 2025
112c860
feat: add logging to rpc calls
LeoPatOZ Oct 21, 2025
9fc6543
chore: add comments and rename timeout
LeoPatOZ Oct 21, 2025
8bd13b1
chore: doctest
LeoPatOZ Oct 21, 2025
af7ce01
feat: add total timeout
LeoPatOZ Oct 21, 2025
c1ff8b5
Merge branch 'main' into retry-logic
LeoPatOZ Oct 23, 2025
471f767
ref: imporving tracing message
LeoPatOZ Oct 23, 2025
855e167
ref: collapse timeout fn to one
LeoPatOZ Oct 23, 2025
ff40e8a
ref: better syntax
LeoPatOZ Oct 23, 2025
992853c
ref: remove with and address default nit
LeoPatOZ Oct 23, 2025
ab35dc5
fix: doctest
LeoPatOZ Oct 23, 2025
cdf9b95
Update src/safe_provider.rs
LeoPatOZ Oct 23, 2025
3c0a971
Merge remote-tracking branch 'refs/remotes/origin/retry-logic' into r…
LeoPatOZ Oct 23, 2025
74cd3d7
ref: use atomic usize
LeoPatOZ Oct 23, 2025
41cc733
ref: update test to match for error
LeoPatOZ Oct 23, 2025
4af01ab
ref: update doc
LeoPatOZ Oct 23, 2025
3e32b8e
Merge branch 'main' into retry-logic
LeoPatOZ Oct 23, 2025
3329c38
fix: merge errors with connect methods
LeoPatOZ Oct 23, 2025
586b03e
fix: root --> safe provider
LeoPatOZ Oct 23, 2025
8a080e4
ref: tracing update
LeoPatOZ Oct 23, 2025
982ea96
ref: remove doc
LeoPatOZ Oct 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tokio-stream = "0.1.17"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
hex = "0.4"
backon = "1.5.2"

[package]
name = "event-scanner"
Expand Down Expand Up @@ -66,6 +67,7 @@ alloy-node-bindings.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
backon.workspace = true

[lints]
workspace = true
Expand Down
66 changes: 52 additions & 14 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
//! }
//! ```

use std::{cmp::Ordering, ops::RangeInclusive};
use std::{cmp::Ordering, ops::RangeInclusive, time::Duration};
use tokio::{
join,
sync::{mpsc, oneshot},
Expand All @@ -72,14 +72,17 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream};

use crate::{
error::ScannerError,
safe_provider::{
DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, SafeProvider,
},
types::{ScannerMessage, ScannerStatus},
};
use alloy::{
consensus::BlockHeader,
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::{B256, BlockNumber},
providers::{Provider, RootProvider},
providers::RootProvider,
pubsub::Subscription,
rpc::client::ClientBuilder,
transports::{
Expand Down Expand Up @@ -115,6 +118,9 @@ impl PartialEq<RangeInclusive<BlockNumber>> for Message {
#[derive(Clone, Copy)]
pub struct BlockRangeScanner {
pub max_block_range: u64,
pub max_timeout: Duration,
pub max_retries: usize,
pub retry_interval: Duration,
}

impl Default for BlockRangeScanner {
Expand All @@ -126,7 +132,12 @@ impl Default for BlockRangeScanner {
impl BlockRangeScanner {
#[must_use]
pub fn new() -> Self {
Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE }
Self {
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
max_timeout: DEFAULT_MAX_TIMEOUT,
max_retries: DEFAULT_MAX_RETRIES,
retry_interval: DEFAULT_RETRY_INTERVAL,
}
}

#[must_use]
Expand All @@ -135,6 +146,24 @@ impl BlockRangeScanner {
self
}

#[must_use]
pub fn with_max_timeout(mut self, rpc_timeout: Duration) -> Self {
self.max_timeout = rpc_timeout;
self
}

#[must_use]
pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self {
self.max_retries = rpc_max_retries;
self
}

#[must_use]
pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self {
self.retry_interval = rpc_retry_interval;
self
}

/// Connects to the provider via WebSocket
///
/// # Errors
Expand Down Expand Up @@ -169,19 +198,26 @@ impl BlockRangeScanner {
/// Returns an error if the connection fails
#[must_use]
pub fn connect<N: Network>(self, provider: RootProvider<N>) -> ConnectedBlockRangeScanner<N> {
ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }
let safe_provider = SafeProvider::new(provider)
.max_timeout(self.max_timeout)
.max_retries(self.max_retries)
.retry_interval(self.retry_interval);
ConnectedBlockRangeScanner {
provider: safe_provider,
max_block_range: self.max_block_range,
}
}
}

pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RootProvider<N>,
provider: SafeProvider<N>,
max_block_range: u64,
}

impl<N: Network> ConnectedBlockRangeScanner<N> {
/// Returns the underlying Provider.
/// Returns the `SafeProvider`
#[must_use]
pub fn provider(&self) -> &RootProvider<N> {
pub fn provider(&self) -> &SafeProvider<N> {
&self.provider
}

Expand Down Expand Up @@ -233,7 +269,7 @@ pub enum Command {
}

struct Service<N: Network> {
provider: RootProvider<N>,
provider: SafeProvider<N>,
max_block_range: u64,
subscriber: Option<mpsc::Sender<Message>>,
websocket_connected: bool,
Expand All @@ -244,7 +280,7 @@ struct Service<N: Network> {
}

impl<N: Network> Service<N> {
pub fn new(provider: RootProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
pub fn new(provider: SafeProvider<N>, max_block_range: u64) -> (Self, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);

let service = Self {
Expand Down Expand Up @@ -640,9 +676,9 @@ impl<N: Network> Service<N> {
Ok(())
}

async fn stream_live_blocks<P: Provider<N>>(
async fn stream_live_blocks(
mut range_start: BlockNumber,
provider: P,
provider: SafeProvider<N>,
sender: mpsc::Sender<Message>,
block_confirmations: u64,
max_block_range: u64,
Expand Down Expand Up @@ -747,7 +783,7 @@ impl<N: Network> Service<N> {
}

async fn get_block_subscription(
provider: &impl Provider<N>,
provider: &SafeProvider<N>,
) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
let ws_stream = provider
.subscribe_blocks()
Expand Down Expand Up @@ -966,6 +1002,7 @@ impl BlockRangeScannerClient {
#[cfg(test)]
mod tests {

use alloy::providers::{Provider, RootProvider};
use std::time::Duration;
use tokio::time::timeout;

Expand All @@ -981,8 +1018,9 @@ mod tests {
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

fn mocked_provider(asserter: Asserter) -> RootProvider<Ethereum> {
RootProvider::new(RpcClient::mocked(asserter))
fn mocked_provider(asserter: Asserter) -> SafeProvider<Ethereum> {
let root_provider = RootProvider::new(RpcClient::mocked(asserter));
SafeProvider::new(root_provider)
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions src/event_scanner/modes/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::ops::RangeInclusive;
use crate::{
block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage},
event_scanner::{filter::EventFilter, listener::EventListener, message::Message},
safe_provider::SafeProvider,
};
use alloy::{
network::Network,
providers::{Provider, RootProvider},
rpc::types::{Filter, Log},
transports::{RpcError, TransportErrorKind},
};
Expand All @@ -25,7 +25,7 @@ pub enum ConsumerMode {

pub async fn handle_stream<N: Network>(
mut stream: ReceiverStream<BlockRangeMessage>,
provider: &RootProvider<N>,
provider: &SafeProvider<N>,
listeners: &[EventListener],
mode: ConsumerMode,
) {
Expand All @@ -42,7 +42,7 @@ pub async fn handle_stream<N: Network>(
}

pub fn spawn_log_consumers<N: Network>(
provider: &RootProvider<N>,
provider: &SafeProvider<N>,
listeners: &[EventListener],
range_tx: &Sender<BlockRangeMessage>,
mode: ConsumerMode,
Expand Down Expand Up @@ -129,7 +129,7 @@ async fn get_logs<N: Network>(
range: RangeInclusive<u64>,
event_filter: &EventFilter,
log_filter: &Filter,
provider: &RootProvider<N>,
provider: &SafeProvider<N>,
) -> Result<Vec<Log>, RpcError<TransportErrorKind>> {
let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end());

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod block_range_scanner;
pub mod error;
pub mod event_scanner;
mod safe_provider;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
pub mod types;
Expand Down
Loading