diff --git a/Cargo.lock b/Cargo.lock index 27692e3..87d9bff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -828,6 +828,56 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" +dependencies = [ + "windows-sys 0.60.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.60.2", +] + [[package]] name = "anyhow" version = "1.0.99" @@ -1216,6 +1266,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "const-hex" version = "1.15.0" @@ -1566,6 +1622,27 @@ dependencies = [ "zeroize", ] +[[package]] +name = "env_filter" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +dependencies = [ + "log", +] + +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1593,6 +1670,7 @@ dependencies = [ "chrono", "serde", "serde_json", + "test-log", "thiserror", "tokio", "tokio-stream", @@ -2249,6 +2327,12 @@ dependencies = [ "serde", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.10.5" @@ -2330,6 +2414,20 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "latest_events_then_live_scanning" +version = "0.3.0-alpha" +dependencies = [ + "alloy", + "alloy-node-bindings", + "anyhow", + "event-scanner", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2566,6 +2664,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + [[package]] name = "openssl" version = "0.10.73" @@ -3654,6 +3758,28 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "test-log" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e33b98a582ea0be1168eba097538ee8dd4bbe0f2b01b22ac92ea30054e5be7b" +dependencies = [ + "env_logger", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "451b374529930d7601b1eef8d32bc79ae870b6079b069401709c2a8bf9e75f36" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "thiserror" version = "2.0.16" @@ -4053,6 +4179,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index f5995d1..e3570ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "examples/historical_scanning", "examples/live_scanning", "examples/latest_events_scanning", - "examples/sync_scanning" + "examples/latest_events_then_live_scanning", + "examples/sync_scanning", ] resolver = "2" @@ -32,6 +33,7 @@ chrono = { version = "0.4", features = ["serde"] } tokio-stream = "0.1.17" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } +test-log = { version = "0.2.18", features = ["trace"] } hex = "0.4" [package] @@ -65,7 +67,10 @@ chrono.workspace = true alloy-node-bindings.workspace = true tokio-stream.workspace = true tracing.workspace = true + +[dev-dependencies] tracing-subscriber.workspace = true +test-log.workspace = true [lints] workspace = true diff --git a/README.md b/README.md index 997813a..4ead2e1 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Event Scanner is a Rust library for streaming EVM-based smart contract events. I - [Defining Event Filters](#defining-event-filters) - [Scanning Modes](#scanning-modes) - [Scanning Latest Events](#scanning-latest-events) + - [Scanning Latest Events Then Live](#scanning-latest-events-then-live) - [Examples](#examples) - [Testing](#testing) @@ -97,7 +98,14 @@ async fn run_scanner( ### Building a Scanner -`EventScanner` provides mode-specific constructors and a builder pattern to configure settings before connecting: +`EventScanner` provides mode-specific constructors and a builder pattern to configure settings before connecting. +Once configured, connect using one of: + +- `connect_ws::(ws_url)` +- `connect_ipc::(path)` +- `connect::(provider)` + +This will connect the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes). ```rust // Live streaming mode @@ -122,17 +130,6 @@ let scanner = EventScanner::latest() .connect_ws::(ws_url).await?; ``` -**Available Modes:** -- `EventScanner::live()` – Streams new blocks as they arrive -- `EventScanner::historic()` – Processes historical block ranges -- `EventScanner::sync()` – Processes historical data then transitions to live streaming -- `EventScanner::latest()` – Processes a specific number of events then optionally switches to live scanning mode - -**Global Configuration Options:** -- `block_read_limit(usize)` – Sets the maximum number of blocks to process per read operation. This prevents RPC provider errors from overly large block range queries. -- Connect with `connect_ws::(url)`, `connect_ipc::(path)`, or `connect(provider)`. - -**Starting the Scanner:** Invoking `scanner.start()` starts the scanner in the specified mode. ### Defining Event Filters @@ -168,20 +165,41 @@ Register multiple filters by invoking `subscribe` repeatedly. The flexibility provided by `EventFilter` allows you to build sophisticated event monitoring systems that can track events at different granularities depending on your application's needs. +Batch builder examples: + +```rust +// Multiple contract addresses at once +let multi_addr = EventFilter::new() + .with_contract_addresses([*counter_contract.address(), *other_counter_contract.address()]); + +// Multiple event names at once +let multi_events = EventFilter::new() + .with_events([Counter::CountIncreased::SIGNATURE, Counter::CountDecreased::SIGNATURE]); + +// Multiple event signature hashes at once +let multi_sigs = EventFilter::new() + .with_event_signatures([ + Counter::CountIncreased::SIGNATURE_HASH, + Counter::CountDecreased::SIGNATURE_HASH, + ]); +``` + ### Scanning Modes -- **Live mode** – `EventScanner::live()` creates a scanner that subscribes to new blocks as they arrive. -- **Historical mode** – `EventScanner::historic()` creates a scanner for processing historical block ranges. -- **Sync mode** – `EventScanner::sync()` creates a scanner that processes historical data then automatically transitions to live streaming. -- **Latest mode** – `EventScanner::latest()` creates a scanner that processes a set number of events. +- **Live** – `EventScanner::live()` creates a scanner that streams new blocks as they arrive. On detecting a reorg, the scanner emits `ScannerStatus::ReorgDetected` and recalculates the confirmed window, streaming logs from the corrected confirmed block range. +- **Historic** – `EventScanner::historic()` creates a scanner for streaming events from a past block range. Currently no reorg logic has been implemented (NOTE ⚠️: still WIP). +- **Latest Events** – `EventScanner::latest()` creates a scanner that streams the specified number of recently emitted events. On detecting a reorg, the scanner re-fetches all of the events in the specified block range (default: Earliest..=Latest). +- **Sync from Block** – `EventScanner::sync().from_block(start)` creates a scanner that streams events from a given start block, and then automatically transitions to live streaming. Reorgs are handled as per the particular mode phase the scanner is in (historic or live). +- **Sync from Latest** - `EventScanner::sync().from_latest(count)` creates a scanner that streams the most recent `count` events, then automatically transitions to live streaming. Reorgs are handled as per the particular mode phase the scanner is in (latest events or live). + +#### Configuration Tips -**Configuration Tips:** - Set `block_read_limit` based on your RPC provider's limits (e.g., Alchemy, Infura may limit queries to 2000 blocks) - For live mode, if the WebSocket subscription lags significantly (e.g., >2000 blocks), ranges are automatically capped to prevent RPC errors -- Each mode has its own configuration options for start block, end block, confirmations, etc. where it makes sense -- The modes come with sensible defaults for example not specify a start block for historic mode automatically sets the start block to the earliest one +- Each mode has its own appropriate configuration options for start block, end block, confirmations +- The modes come with sensible defaults; for example not specifying a start block for historic mode automatically sets the start block to the genesis block. -See integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples. +See the integration tests under `tests/` for concrete examples. ### Scanning Latest Events @@ -233,7 +251,74 @@ The scanner periodically checks the tip to detect reorgs. On reorg, the scanner Notes: - Ensure you create streams via `subscribe()` before calling `start` so listeners are registered. - +- The function returns after delivering the messages; to continuously stream new blocks, use `scan_latest_then_live`. + +### Scanning Latest Events Then Live + +`EventScanner::sync().from_latest(count)` combines the best of both worlds: it fetches recent historical events and then seamlessly transitions to live streaming mode for continuous monitoring. + +**What it does:** + +1. **Historical rewind phase**: Scans backwards from the current chain tip to collect up to `count` most recent matching events +2. **Automatic transition**: Emits `ScannerStatus::SwitchingToLive` to signal the mode change +3. **Live streaming phase**: Continuously monitors and streams new events as they arrive on-chain + +**How it works:** + +The scanner captures the latest block number before starting to establish a clear boundary between phases. The historical phase scans from `Earliest` to `latest_block`, while the live phase starts from `latest_block + 1`. This design prevents duplicate events and handles race conditions where new blocks arrive during setup. + +**Key behaviors:** + +- **No duplicates**: Events are not delivered twice across the phase transition +- **Flexible count**: If fewer than `count` events exist, returns all available events +- **Reorg handling**: Both phases handle reorgs appropriately: + - Historical phase: resets and rescans on reorg detection + - Live phase: resets stream to the first post-reorg block that satisfies the block confirmations set via `with_block_confirmations` +- **Continuous operation**: Live phase continues indefinitely until the scanner is dropped + +**Example:** + +```rust +use alloy::{network::Ethereum, sol_types::SolEvent}; +use event_scanner::{EventFilter, EventScanner, EventScannerMessage}; +use tokio_stream::StreamExt; + +async fn latest_then_live_example( + ws_url: alloy::transports::http::reqwest::Url, + addr: alloy::primitives::Address, +) -> eyre::Result<()> { + let mut client = EventScanner::new().connect_ws::(ws_url).await?; + + let filter = EventFilter::new().with_contract_address(addr); + let mut stream = client.create_event_stream(filter); + + // Fetch the latest 10 events, then stream new events continuously + client.scan_latest_then_live(10).await?; + + while let Some(msg) = stream.next().await { + match msg { + EventScannerMessage::Data(logs) => { + println!("Received {} events", logs.len()); + } + EventScannerMessage::Status(status) => { + println!("Status update: {:?}", status); + // You'll see ScannerStatus::SwitchingToLive when transitioning + } + EventScannerMessage::Error(e) => { + eprintln!("Error: {}", e); + } + } + } + + Ok(()) +} +``` + +**Important notes:** + +- Create event streams via `create_event_stream()` **before** calling `scan_latest_then_live` +- The method returns immediately; events are delivered asynchronously +- The live phase continues indefinitely until the scanner is dropped or encounters an error --- @@ -241,17 +326,17 @@ Notes: - `examples/live_scanning` – minimal live-mode scanner using `EventScanner::live()` - `examples/historical_scanning` – demonstrates replaying historical data using `EventScanner::historic()` +- `examples/sync_scanning` – demonstrates replaying from genesis (block 0) before continuing streaming latest blocks using `EventScanner::sync().from_block(0)` - `examples/latest_events_scanning` – demonstrates scanning the latest events using `EventScanner::latest()` +- `examples/latest_events_then_live_scanning` – demonstrates scanning the latest events before switching to live mode using `EventScanner::sync().from_latest(count)`. Run an example with: ```bash -RUST_LOG=info cargo run -p live_scanning -# or -RUST_LOG=info cargo run -p historical_scanning +RUST_LOG=info cargo run -p live_scanning ``` -Both examples spin up a local `anvil` instance, deploy a demo counter contract, and demonstrate using event streams to process events. +All examples spin up a local `anvil` instance, deploy a demo counter contract, and demonstrate using event streams to process events. --- @@ -265,3 +350,17 @@ Integration tests cover all modes: cargo nextest run --features test-utils ``` +--- + +## Errors + +The scanner surfaces errors via `EventScannerError`: + +- `EventScannerError::BlockRangeScanner(BlockRangeScannerError)` – issues from the underlying block range service (e.g., subscription/channel constraints, historical/sync failures). +- `EventScannerError::Provider(RpcError)` – transport/provider-level failures (e.g., connection problems, RPC errors). + +Status notifications are emitted as `EventScannerMessage::Status(ScannerStatus)`, including: + +- `ScannerStatus::ReorgDetected` +- `ScannerStatus::SwitchingToLive` + diff --git a/examples/latest_events_then_live_scanning/Cargo.toml b/examples/latest_events_then_live_scanning/Cargo.toml new file mode 100644 index 0000000..c7d5411 --- /dev/null +++ b/examples/latest_events_then_live_scanning/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "latest_events_then_live_scanning" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +publish = false + +[[bin]] +name = "latest_events_then_live_scanning" +path = "main.rs" + +[dependencies] +alloy.workspace = true +alloy-node-bindings.workspace = true +tokio.workspace = true +tokio-stream.workspace = true +anyhow.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +event-scanner = { path = "../.." } diff --git a/examples/latest_events_then_live_scanning/main.rs b/examples/latest_events_then_live_scanning/main.rs new file mode 100644 index 0000000..ab14077 --- /dev/null +++ b/examples/latest_events_then_live_scanning/main.rs @@ -0,0 +1,88 @@ +use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; +use alloy_node_bindings::Anvil; +use event_scanner::{ + EventFilter, + event_scanner::{EventScanner, Message}, +}; + +use tokio_stream::StreamExt; +use tracing::{error, info}; +use tracing_subscriber::EnvFilter; + +sol! { + #[allow(missing_docs)] + #[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")] + contract Counter { + uint256 public count; + + event CountIncreased(uint256 newCount); + event CountDecreased(uint256 newCount); + + function increase() public { + count += 1; + emit CountIncreased(count); + } + + function decrease() public { + require(count > 0, "Count cannot be negative"); + count -= 1; + emit CountDecreased(count); + } + + function getCount() public view returns (uint256) { + return count; + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); + + let anvil = Anvil::new().block_time_f64(0.5).try_spawn()?; + let wallet = anvil.wallet(); + let provider = + ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; + let counter_contract = Counter::deploy(provider).await?; + + let contract_address = counter_contract.address(); + + let increase_filter = EventFilter::new() + .contract_address(*contract_address) + .event(Counter::CountIncreased::SIGNATURE); + + let mut client = + EventScanner::sync().from_latest(5).connect_ws::(anvil.ws_endpoint_url()).await?; + + let mut stream = client.subscribe(increase_filter); + + for _ in 0..10 { + _ = counter_contract.increase().send().await?; + } + + client.start().await.expect("failed to start scanner"); + + // emit some events for live mode to pick up + _ = counter_contract.increase().send().await?; + _ = counter_contract.increase().send().await?; + _ = counter_contract.increase().send().await?; + + // only the last 5 events will be streamed before switching to live mode + while let Some(message) = stream.next().await { + match message { + Message::Data(logs) => { + for log in logs { + info!("Callback successfully executed with event {:?}", log.inner.data); + } + } + Message::Error(e) => { + error!("Received error: {}", e); + } + Message::Status(info) => { + info!("Received info: {:?}", info); + } + } + } + + Ok(()) +} diff --git a/examples/sync_scanning/main.rs b/examples/sync_scanning/main.rs index a7c3169..1044c19 100644 --- a/examples/sync_scanning/main.rs +++ b/examples/sync_scanning/main.rs @@ -57,7 +57,8 @@ async fn main() -> anyhow::Result<()> { info!("Historical event {} created", i + 1); } - let mut scanner = EventScanner::sync().connect_ws::(anvil.ws_endpoint_url()).await?; + let mut scanner = + EventScanner::sync().from_block(0).connect_ws::(anvil.ws_endpoint_url()).await?; let mut stream = scanner.subscribe(increase_filter); diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index c541674..b84a62c 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -237,7 +237,6 @@ struct Service { max_block_range: u64, subscriber: Option>, websocket_connected: bool, - processed_count: u64, error_count: u64, command_receiver: mpsc::Receiver, shutdown: bool, @@ -252,7 +251,6 @@ impl Service { max_block_range, subscriber: None, websocket_connected: false, - processed_count: 0, error_count: 0, command_receiver: cmd_rx, shutdown: false, @@ -390,38 +388,49 @@ impl Service { ) -> Result<(), ScannerError> { let sender = self.subscriber.clone().ok_or_else(|| ScannerError::ServiceShutdown)?; + let provider = self.provider.clone(); let max_block_range = self.max_block_range; + let get_start_block = async || -> Result { + let block = match start_height { + BlockNumberOrTag::Number(num) => num, + block_tag => provider + .get_block_by_number(block_tag) + .await? + .ok_or_else(|| ScannerError::BlockNotFound(block_tag))? + .header() + .number(), + }; + Ok(block) + }; + + let get_latest_block = async || -> Result { + let block = provider + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? + .header() + .number(); + Ok(block) + }; + // Step 1: // Fetches the starting block and end block for historical sync in parallel - let (start_block, latest_block) = tokio::try_join!( - self.provider.get_block_by_number(start_height), - self.provider.get_block_by_number(BlockNumberOrTag::Latest) - )?; - - let start_block_num = - start_block.ok_or_else(|| ScannerError::BlockNotFound(start_height))?.header().number(); - let latest_block = latest_block - .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? - .header() - .number(); + let (start_block, latest_block) = tokio::try_join!(get_start_block(), get_latest_block())?; - let confirmed_tip_num = latest_block.saturating_sub(block_confirmations); + let confirmed_tip = latest_block.saturating_sub(block_confirmations); // If start is beyond confirmed tip, skip historical and go straight to live - if start_block_num > confirmed_tip_num { + if start_block > confirmed_tip { info!( - start_block = start_block_num, - confirmed_tip = confirmed_tip_num, + start_block = start_block, + confirmed_tip = confirmed_tip, "Start block is beyond confirmed tip, starting live stream" ); - let sender = self.subscriber.clone().ok_or_else(|| ScannerError::ServiceShutdown)?; - - let provider = self.provider.clone(); tokio::spawn(async move { Self::stream_live_blocks( - start_block_num, + start_block, provider, sender, block_confirmations, @@ -433,22 +442,16 @@ impl Service { return Ok(()); } - info!( - start_block = start_block_num, - end_block = confirmed_tip_num, - "Syncing historical data" - ); + info!(start_block = start_block, end_block = confirmed_tip, "Syncing historical data"); // Step 2: Setup the live streaming buffer // This channel will accumulate while historical sync is running let (live_block_buffer_sender, live_block_buffer_receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); - let provider = self.provider.clone(); - // The cutoff is the last block we have synced historically // Any block > cutoff will come from the live stream - let cutoff = confirmed_tip_num; + let cutoff = confirmed_tip; // This task runs independently, accumulating new blocks while wehistorical data is syncing let live_subscription_task = tokio::spawn(async move { @@ -462,36 +465,35 @@ impl Service { .await; }); - // Step 4: Perform historical synchronization - // This processes blocks from start_block to end_block (cutoff) - // If this fails, we need to abort the live streaming task - if let Err(e) = Self::stream_historical_blocks( - start_block_num, - confirmed_tip_num, - self.max_block_range, - &sender, - ) - .await - { - warn!("aborting live_subscription_task"); - live_subscription_task.abort(); - return Err(ScannerError::HistoricalSyncError(e.to_string())); - } + tokio::spawn(async move { + // Step 4: Perform historical synchronization + // This processes blocks from start_block to end_block (cutoff) + // If this fails, we need to abort the live streaming task + if Self::stream_historical_blocks(start_block, confirmed_tip, max_block_range, &sender) + .await + .is_err() + { + error!("Error during syncing past blocks, stopping stream"); + live_subscription_task.abort(); + return; + } - self.send_to_subscriber(ScannerMessage::Status(ScannerStatus::ChainTipReached)).await; + if !Self::try_send(&sender, ScannerStatus::SwitchingToLive).await { + return; + } - // Step 5: - // Spawn the buffer processor task - // This will: - // 1. Process all buffered blocks, filtering out any ≤ cutoff - // 2. Forward blocks > cutoff to the user - // 3. Continue forwarding until the buffer if exhausted (waits for new blocks from live - // stream) - tokio::spawn(async move { + info!("Successfully transitioned from historical to live data"); + + // Step 5: + // Spawn the buffer processor task + // This will: + // 1. Process all buffered blocks, filtering out any ≤ cutoff + // 2. Forward blocks > cutoff to the user + // 3. Continue forwarding until the buffer if exhausted (waits for new blocks from live + // stream) Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await; }); - info!("Successfully transitioned from historical to live data"); Ok(()) } @@ -500,6 +502,11 @@ impl Service { start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, ) -> Result<(), ScannerError> { + let sender = self.subscriber.take().ok_or_else(|| ScannerError::NoSubscriber)?; + + let provider = self.provider.clone(); + let max_block_range = self.max_block_range; + let (start_block, end_block) = join!( self.provider.get_block_by_number(start_height), self.provider.get_block_by_number(end_height), @@ -514,9 +521,9 @@ impl Service { _ => (end_block, start_block), }; - self.stream_rewind(from, to).await?; - - _ = self.subscriber.take(); + tokio::spawn(async move { + Self::stream_rewind(sender, provider, from, to, max_block_range).await + }); Ok(()) } @@ -529,12 +536,13 @@ impl Service { /// /// Returns an error if the stream fails async fn stream_rewind( - &mut self, + sender: mpsc::Sender, + provider: RootProvider, from: N::BlockResponse, to: N::BlockResponse, + max_block_range: u64, ) -> Result<(), ScannerError> { let mut batch_count = 0; - let max_block_range = self.max_block_range; // for checking whether reorg occurred let mut tip_hash = from.header().hash(); @@ -549,7 +557,9 @@ impl Service { let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to); // stream the range regularly, i.e. from smaller block number to greater - self.send_to_subscriber(Message::Data(batch_to..=batch_from)).await; + if !Self::try_send(&sender, batch_to..=batch_from).await { + break; + } batch_count += 1; if batch_count % 10 == 0 { @@ -562,16 +572,27 @@ impl Service { break; } - if self.reorg_detected(tip_hash).await? { - info!(block_number = %from, hash = %tip_hash, "Reorg detected"); + let reorg_detected = match Self::reorg_detected(&provider, tip_hash).await { + Ok(reorg_detected) => { + info!(block_number = %from, hash = %tip_hash, "Reorg detected"); + reorg_detected + } + Err(e) => { + error!(error = %e, "Failed the reorg check"); + _ = Self::try_send(&sender, e).await; + break; + } + }; - self.send_to_subscriber(Message::Status(ScannerStatus::ReorgDetected)).await; + if reorg_detected { + if !Self::try_send(&sender, ScannerStatus::ReorgDetected).await { + break; + } // restart rewind batch_from = from; // store the updated end block hash - tip_hash = self - .provider + tip_hash = provider .get_block_by_number(from.into()) .await? .expect("Chain should have the same height post-reorg") @@ -589,13 +610,11 @@ impl Service { Ok(()) } - async fn reorg_detected(&self, hash_to_check: B256) -> Result { - Ok(self - .provider - .get_block_by_hash(hash_to_check) - .await - .map_err(ScannerError::from)? - .is_none()) + async fn reorg_detected( + provider: &RootProvider, + hash_to_check: B256, + ) -> Result { + Ok(provider.get_block_by_hash(hash_to_check).await?.is_none()) } async fn stream_historical_blocks( @@ -683,6 +702,12 @@ impl Service { let range_end = confirmed.min(range_start.saturating_add(max_block_range - 1)); + info!( + range_start = range_start, + range_end = range_end, + "Sending live block range" + ); + if sender.send(Message::Data(range_start..=range_end)).await.is_err() { warn!("Downstream channel closed, stopping live blocks task"); return; @@ -757,18 +782,6 @@ impl Service { Ok(ws_stream) } - async fn send_to_subscriber(&mut self, message: Message) { - if let Some(ref sender) = self.subscriber { - if let Err(err) = sender.send(message).await { - warn!(error = %err, "Downstream channel closed, failed sending the message to subscriber"); - self.subscriber = None; - self.websocket_connected = false; - } else { - self.processed_count += 1; - } - } - } - async fn try_send>(sender: &mpsc::Sender, msg: T) -> bool { if let Err(err) = sender.send(msg.into()).await { warn!(error = %err, "Downstream channel closed, stopping stream"); @@ -846,10 +859,10 @@ impl BlockRangeScannerClient { /// # Errors /// /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn stream_historical>( + pub async fn stream_historical( &self, - start_height: N, - end_height: N, + start_height: impl Into, + end_height: impl Into, ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -880,7 +893,7 @@ impl BlockRangeScannerClient { /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. pub async fn stream_from( &self, - start_height: BlockNumberOrTag, + start_height: impl Into, block_confirmations: u64, ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); @@ -888,7 +901,7 @@ impl BlockRangeScannerClient { let command = Command::StreamFrom { sender: blocks_sender, - start_height, + start_height: start_height.into(), block_confirmations, response: response_tx, }; @@ -910,10 +923,10 @@ impl BlockRangeScannerClient { /// # Errors /// /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn rewind>( + pub async fn rewind( &self, - start_height: BN, - end_height: BN, + start_height: impl Into, + end_height: impl Into, ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -1001,50 +1014,6 @@ mod tests { assert_eq!(scanner.max_block_range, max_block_range); } - #[tokio::test] - async fn send_to_subscriber_increments_processed_count() -> anyhow::Result<()> { - let asserter = Asserter::new(); - let provider = mocked_provider(asserter); - let (mut service, _cmd) = Service::new(provider, DEFAULT_MAX_BLOCK_RANGE); - - let (tx, mut rx) = mpsc::channel(1); - service.subscriber = Some(tx); - - let expected_range = 10..=11; - service.send_to_subscriber(Message::Data(expected_range.clone())).await; - - assert_eq!(service.processed_count, 1); - assert!(service.subscriber.is_some()); - - let Message::Data(received) = rx.recv().await.expect("range received") else { - panic!("expected BlockRange message") - }; - assert_eq!(received, expected_range); - - Ok(()) - } - - #[tokio::test] - async fn send_to_subscriber_removes_closed_channel() -> anyhow::Result<()> { - let asserter = Asserter::new(); - let provider = mocked_provider(asserter); - let (mut service, _cmd) = Service::new(provider, DEFAULT_MAX_BLOCK_RANGE); - - let (tx, rx) = mpsc::channel(1); - service.websocket_connected = true; - service.subscriber = Some(tx); - // channel is closed - drop(rx); - - service.send_to_subscriber(Message::Data(15..=15)).await; - - assert!(service.subscriber.is_none()); - assert!(!service.websocket_connected); - assert_eq!(service.processed_count, 0); - - Ok(()) - } - #[test] fn handle_unsubscribe_clears_subscriber() { let asserter = Asserter::new(); @@ -1611,16 +1580,12 @@ mod tests { #[tokio::test] async fn forwards_errors_to_subscribers() -> anyhow::Result<()> { - let asserter = Asserter::new(); - let provider = mocked_provider(asserter); - let (mut service, _cmd) = Service::new(provider, DEFAULT_MAX_BLOCK_RANGE); - let (tx, mut rx) = mpsc::channel(1); - service.subscriber = Some(tx); - service - .send_to_subscriber(Message::Error(ScannerError::WebSocketConnectionFailed(4))) - .await; + let sent = + Service::::try_send(&tx, ScannerError::WebSocketConnectionFailed(4)).await; + + assert!(sent); match rx.recv().await.expect("subscriber should stay open") { Message::Error(ScannerError::WebSocketConnectionFailed(attempts)) => { @@ -1768,9 +1733,8 @@ mod tests { .await? .run()?; - let mut stream = client - .rewind::(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest) - .await?; + let mut stream = + client.rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; assert_next!(stream, 14..=20); assert_next!(stream, 7..=13); diff --git a/src/event_scanner/mod.rs b/src/event_scanner/mod.rs index f33fe5e..9a54a11 100644 --- a/src/event_scanner/mod.rs +++ b/src/event_scanner/mod.rs @@ -8,5 +8,6 @@ pub mod modes; pub use filter::EventFilter; pub use message::Message; pub use modes::{ - EventScanner, HistoricEventScanner, LatestEventScanner, LiveEventScanner, SyncEventScanner, + EventScanner, HistoricEventScanner, LatestEventScanner, LiveEventScanner, + SyncFromBlockEventScanner, SyncFromLatestEventScanner, }; diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index a841f4f..16a81c2 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -10,51 +10,61 @@ use alloy::{ rpc::types::{Filter, Log}, transports::{RpcError, TransportErrorKind}, }; -use tokio::sync::{ - broadcast::{self, Sender, error::RecvError}, - mpsc, +use tokio::{ + sync::{ + broadcast::{self, Sender, error::RecvError}, + mpsc, + }, + task::JoinSet, }; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; +use tokio_stream::{Stream, StreamExt}; use tracing::{error, info, warn}; -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] pub enum ConsumerMode { Stream, CollectLatest { count: usize }, } -pub async fn handle_stream( - mut stream: ReceiverStream, +pub async fn handle_stream + Unpin>( + mut stream: S, provider: &RootProvider, listeners: &[EventListener], mode: ConsumerMode, ) { let (range_tx, _) = broadcast::channel::(MAX_BUFFERED_MESSAGES); - spawn_log_consumers(provider, listeners, &range_tx, mode); + let consumers = spawn_log_consumers(provider, listeners, &range_tx, mode); while let Some(message) = stream.next().await { if let Err(err) = range_tx.send(message) { - error!(error = %err, "No receivers, stopping broadcast"); + warn!(error = %err, "No log consumers, stopping stream"); break; } } + + // Close the channel sender to signal to the log consumers that streaming is done. + drop(range_tx); + + // ensure all consumers finish before they're dropped + consumers.join_all().await; } +#[must_use] pub fn spawn_log_consumers( provider: &RootProvider, listeners: &[EventListener], range_tx: &Sender, mode: ConsumerMode, -) { - for listener in listeners { +) -> JoinSet<()> { + listeners.iter().fold(JoinSet::new(), |mut set, listener| { let provider = provider.clone(); let filter = listener.filter.clone(); let base_filter = Filter::from(&filter); let sender = listener.sender.clone(); let mut sub = range_tx.subscribe(); - tokio::spawn(async move { + set.spawn(async move { // Only used for CollectLatest let mut collected: Vec = match mode { ConsumerMode::CollectLatest { count } => Vec::with_capacity(count), @@ -78,11 +88,13 @@ pub fn spawn_log_consumers( } ConsumerMode::CollectLatest { count } => { let take = count.saturating_sub(collected.len()); + // if we have enough logs, break if take == 0 { break; } // take latest within this range collected.extend(logs.into_iter().rev().take(take)); + // if we have enough logs, break if collected.len() == count { break; } @@ -97,11 +109,13 @@ pub fn spawn_log_consumers( } } Ok(BlockRangeMessage::Error(e)) => { + error!(error = ?e, "Received error message"); if !try_send(&sender, e).await { break; } } Ok(BlockRangeMessage::Status(status)) => { + info!(status = ?status, "Received status message"); if !try_send(&sender, status).await { break; } @@ -119,10 +133,13 @@ pub fn spawn_log_consumers( collected.reverse(); // restore chronological order } + info!("Sending collected logs to consumer"); _ = try_send(&sender, collected).await; } }); - } + + set + }) } async fn get_logs( @@ -166,5 +183,6 @@ async fn try_send>(sender: &mpsc::Sender, msg: T) -> b warn!(error = %err, "Downstream channel closed, stopping stream"); return false; } + info!("Sent message to consumer"); true } diff --git a/src/event_scanner/modes/historic.rs b/src/event_scanner/modes/historic.rs index 02f0838..ad8e5c2 100644 --- a/src/event_scanner/modes/historic.rs +++ b/src/event_scanner/modes/historic.rs @@ -122,13 +122,13 @@ impl HistoricEventScanner { pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?; - handle_stream( - stream, - self.block_range_scanner.provider(), - &self.listeners, - ConsumerMode::Stream, - ) - .await; + + let provider = self.block_range_scanner.provider().clone(); + let listeners = self.listeners.clone(); + + tokio::spawn(async move { + handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await; + }); Ok(()) } } diff --git a/src/event_scanner/modes/latest.rs b/src/event_scanner/modes/latest.rs index 2af3e0a..120889c 100644 --- a/src/event_scanner/modes/latest.rs +++ b/src/event_scanner/modes/latest.rs @@ -148,13 +148,19 @@ impl LatestEventScanner { pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = client.rewind(self.config.from_block, self.config.to_block).await?; - handle_stream( - stream, - self.block_range_scanner.provider(), - &self.listeners, - ConsumerMode::CollectLatest { count: self.config.count }, - ) - .await; + + let provider = self.block_range_scanner.provider().clone(); + let listeners = self.listeners.clone(); + + tokio::spawn(async move { + handle_stream( + stream, + &provider, + &listeners, + ConsumerMode::CollectLatest { count: self.config.count }, + ) + .await; + }); Ok(()) } } diff --git a/src/event_scanner/modes/live.rs b/src/event_scanner/modes/live.rs index 90fd746..8a55617 100644 --- a/src/event_scanner/modes/live.rs +++ b/src/event_scanner/modes/live.rs @@ -118,13 +118,13 @@ impl LiveEventScanner { pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = client.stream_live(self.config.block_confirmations).await?; - handle_stream( - stream, - self.block_range_scanner.provider(), - &self.listeners, - ConsumerMode::Stream, - ) - .await; + + let provider = self.block_range_scanner.provider().clone(); + let listeners = self.listeners.clone(); + + tokio::spawn(async move { + handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await; + }); Ok(()) } } diff --git a/src/event_scanner/modes/mod.rs b/src/event_scanner/modes/mod.rs index 372d8d2..1fa9251 100644 --- a/src/event_scanner/modes/mod.rs +++ b/src/event_scanner/modes/mod.rs @@ -7,7 +7,11 @@ mod sync; pub use historic::{HistoricEventScanner, HistoricScannerBuilder}; pub use latest::{LatestEventScanner, LatestScannerBuilder}; pub use live::{LiveEventScanner, LiveScannerBuilder}; -pub use sync::{SyncEventScanner, SyncScannerBuilder}; +pub use sync::{ + SyncScannerBuilder, + from_block::{SyncFromBlockEventScanner, SyncFromBlockEventScannerBuilder}, + from_latest::{SyncFromLatestEventScanner, SyncFromLatestScannerBuilder}, +}; pub struct EventScanner; diff --git a/src/event_scanner/modes/sync.rs b/src/event_scanner/modes/sync.rs index acbd197..1415963 100644 --- a/src/event_scanner/modes/sync.rs +++ b/src/event_scanner/modes/sync.rs @@ -1,225 +1,124 @@ -use alloy::{ - eips::BlockNumberOrTag, - network::Network, - providers::RootProvider, - transports::{TransportResult, http::reqwest::Url}, -}; +use alloy::eips::BlockNumberOrTag; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; +use crate::event_scanner::modes::common::{ConsumerMode, handle_stream}; -use crate::{ - ScannerError, - block_range_scanner::{ - BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, - MAX_BUFFERED_MESSAGES, - }, - event_scanner::{ - filter::EventFilter, - listener::EventListener, - message::Message, - modes::common::{ConsumerMode, handle_stream}, - }, -}; +pub(crate) mod from_block; +pub(crate) mod from_latest; -pub struct SyncScannerBuilder { - block_range_scanner: BlockRangeScanner, - from_block: BlockNumberOrTag, - block_confirmations: u64, -} +use from_block::SyncFromBlockEventScannerBuilder; +use from_latest::SyncFromLatestScannerBuilder; -pub struct SyncEventScanner { - config: SyncScannerBuilder, - block_range_scanner: ConnectedBlockRangeScanner, - listeners: Vec, -} +pub struct SyncScannerBuilder; impl SyncScannerBuilder { #[must_use] pub(crate) fn new() -> Self { - Self { - block_range_scanner: BlockRangeScanner::new(), - from_block: BlockNumberOrTag::Earliest, - block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, - } + Self } - #[must_use] - pub fn max_block_range(mut self, max_block_range: u64) -> Self { - self.block_range_scanner.max_block_range = max_block_range; - self - } - - #[must_use] - pub fn from_block(mut self, block: impl Into) -> Self { - self.from_block = block.into(); - self - } - - #[must_use] - pub fn block_confirmations(mut self, count: u64) -> Self { - self.block_confirmations = count; - self - } - - /// Connects to the provider via WebSocket. + /// Scans the latest `count` matching events per registered listener, then automatically + /// transitions to live streaming mode. /// - /// Final builder method: consumes the builder and returns the built [`SyncEventScanner`]. + /// This method combines two scanning phases into a single operation: + /// 1. **Latest events phase**: Collects up to `count` most recent events by scanning backwards + /// from the current chain tip + /// 2. **Live streaming phase**: Continuously monitors and streams new events as they arrive + /// on-chain /// - /// # Errors + /// # Two-Phase Operation /// - /// Returns an error if the connection fails - pub async fn connect_ws(self, ws_url: Url) -> TransportResult> { - let block_range_scanner = self.block_range_scanner.connect_ws::(ws_url).await?; - Ok(SyncEventScanner { config: self, block_range_scanner, listeners: Vec::new() }) - } - - /// Connects to the provider via IPC. + /// The method captures the latest block number before starting both phases to establish a + /// clear boundary. The historical phase scans from `Earliest` to `latest_block`, while the + /// live phase uses sync mode starting from `latest_block + 1`. This design prevents duplicate + /// events and handles race conditions where new blocks arrive during setup. /// - /// Final builder method: consumes the builder and returns the built [`SyncEventScanner`]. + /// Between phases, the scanner emits [`ScannerStatus::SwitchingToLive`] to notify listeners + /// of the transition. As previously mentioned, the live phase internally uses sync mode + /// (historical → live) to ensure no events are missed if blocks were mined during the + /// transition or if reorgs occur. /// - /// # Errors + /// # Arguments /// - /// Returns an error if the connection fails - pub async fn connect_ipc( - self, - ipc_path: String, - ) -> TransportResult> { - let block_range_scanner = self.block_range_scanner.connect_ipc::(ipc_path).await?; - Ok(SyncEventScanner { config: self, block_range_scanner, listeners: Vec::new() }) - } - - /// Connects to an existing provider. + /// * `count` - Maximum number of recent events to collect per listener before switching to + /// live. + /// + /// # Example + /// + /// ```no_run + /// # use alloy::network::Ethereum; + /// # use event_scanner::{EventFilter, EventScanner, EventScannerMessage}; + /// # use tokio_stream::StreamExt; + /// # + /// # async fn example() -> Result<(), Box> { + /// # let ws_url = "ws://localhost:8545".parse()?; + /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); + /// let mut scanner = EventScanner::sync() + /// .from_latest(10) + /// .connect_ws::(ws_url) + /// .await?; + /// + /// let filter = EventFilter::new().contract_address(contract_address); + /// let mut stream = scanner.subscribe(filter); + /// + /// scanner.start().await?; + /// + /// while let Some(msg) = stream.next().await { + /// match msg { + /// EventScannerMessage::Data(logs) => { + /// println!("Received {} events", logs.len()); + /// } + /// EventScannerMessage::Status(status) => { + /// println!("Status: {:?}", status); + /// } + /// EventScannerMessage::Error(e) => { + /// eprintln!("Error: {}", e); + /// } + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Edge Cases + /// + /// - **No historical events**: If fewer than `count` events exist (or none at all), the method + /// returns all available events, then transitions to live streaming normally. + /// - **Duplicate prevention**: The boundary at `latest_block` ensures events are never + /// delivered twice across the phase transition. + /// - **Race conditions**: Fetching `latest_block` before setting up streams prevents missing + /// events that arrive during initialization. + /// + /// # Reorg Behavior + /// + /// - **Historical rewind phase**: Reverse-ordered rewind over `Earliest..=latest_block`. On + /// detecting a reorg, emits [`ScannerStatus::ReorgDetected`], resets the rewind start to the + /// new tip, and continues until collectors accumulate `count` logs. Final delivery to + /// listeners preserves chronological order. + /// - **Live streaming phase**: Starts from `latest_block + 1` and respects block confirmations + /// configured via [`with_block_confirmations`](Self::with_block_confirmations). On reorg, + /// emits [`ScannerStatus::ReorgDetected`], adjusts the next confirmed window (possibly + /// re-emitting confirmed portions), and continues streaming. /// - /// Final builder method: consumes the builder and returns the built [`SyncEventScanner`]. + /// # Usage Notes /// - /// # Errors + /// - Call [`subscribe`](Self::subscribe) to register listeners **before** calling this method, + /// otherwise no events will be delivered. + /// - The method returns immediately after spawning the scanning task. Events are delivered + /// asynchronously through the registered streams. + /// - The live phase continues indefinitely until the scanner is dropped or an error occurs. /// - /// Returns an error if the connection fails + /// [`ScannerStatus::ReorgDetected`]: crate::types::ScannerStatus::ReorgDetected + /// [`ScannerStatus::SwitchingToLive`]: crate::types::ScannerStatus::SwitchingToLive #[must_use] - pub fn connect(self, provider: RootProvider) -> SyncEventScanner { - let block_range_scanner = self.block_range_scanner.connect::(provider); - SyncEventScanner { config: self, block_range_scanner, listeners: Vec::new() } + pub fn from_latest(self, count: usize) -> SyncFromLatestScannerBuilder { + SyncFromLatestScannerBuilder::new(count) } -} -impl SyncEventScanner { #[must_use] - pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { - let (sender, receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); - self.listeners.push(EventListener { filter, sender }); - ReceiverStream::new(receiver) - } - - /// Starts the scanner in sync (historical → live) mode. - /// - /// Streams from `from_block` up to the current confirmed tip using the configured - /// `block_confirmations`, then continues streaming new confirmed ranges live. - /// - /// # Reorg behavior - /// - /// - In live mode, emits [`ScannerStatus::ReorgDetected`] and adjusts the next confirmed range - /// using `block_confirmations` to re-emit the confirmed portion. - /// - /// # Errors - /// - /// - `EventScannerMessage::ServiceShutdown` if the service is already shutting down. - pub async fn start(self) -> Result<(), ScannerError> { - let client = self.block_range_scanner.run()?; - let stream = - client.stream_from(self.config.from_block, self.config.block_confirmations).await?; - handle_stream( - stream, - self.block_range_scanner.provider(), - &self.listeners, - ConsumerMode::Stream, - ) - .await; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use alloy::{network::Ethereum, rpc::client::RpcClient, transports::mock::Asserter}; - - #[test] - fn test_sync_scanner_config_defaults() { - let config = SyncScannerBuilder::new(); - - assert!(matches!(config.from_block, BlockNumberOrTag::Earliest)); - assert_eq!(config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); - } - - #[test] - fn test_sync_scanner_builder_pattern() { - let config = SyncScannerBuilder::new() - .max_block_range(25) - .block_confirmations(5) - .from_block(BlockNumberOrTag::Number(50)); - - assert_eq!(config.block_range_scanner.max_block_range, 25); - assert_eq!(config.block_confirmations, 5); - assert!(matches!(config.from_block, BlockNumberOrTag::Number(50))); - } - - #[test] - fn test_sync_scanner_builder_with_different_block_types() { - let config = SyncScannerBuilder::new() - .from_block(BlockNumberOrTag::Earliest) - .block_confirmations(20) - .max_block_range(100); - - assert!(matches!(config.from_block, BlockNumberOrTag::Earliest)); - assert_eq!(config.block_confirmations, 20); - assert_eq!(config.block_range_scanner.max_block_range, 100); - } - - #[test] - fn test_sync_scanner_builder_with_zero_confirmations() { - let config = - SyncScannerBuilder::new().from_block(0).block_confirmations(0).max_block_range(75); - - assert!(matches!(config.from_block, BlockNumberOrTag::Number(0))); - assert_eq!(config.block_confirmations, 0); - assert_eq!(config.block_range_scanner.max_block_range, 75); - } - - #[test] - fn test_sync_scanner_builder_last_call_wins() { - let config = SyncScannerBuilder::new() - .max_block_range(25) - .max_block_range(55) - .max_block_range(105) - .from_block(1) - .from_block(2) - .block_confirmations(5) - .block_confirmations(7); - - assert_eq!(config.block_range_scanner.max_block_range, 105); - assert!(matches!(config.from_block, BlockNumberOrTag::Number(2))); - assert_eq!(config.block_confirmations, 7); - } - - #[test] - fn test_sync_event_stream_listeners_vector_updates() { - let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = SyncScannerBuilder::new().connect::(provider); - assert_eq!(scanner.listeners.len(), 0); - let _stream1 = scanner.subscribe(EventFilter::new()); - assert_eq!(scanner.listeners.len(), 1); - let _stream2 = scanner.subscribe(EventFilter::new()); - let _stream3 = scanner.subscribe(EventFilter::new()); - assert_eq!(scanner.listeners.len(), 3); - } - - #[test] - fn test_sync_event_stream_channel_capacity() { - let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = SyncScannerBuilder::new().connect::(provider); - let _stream = scanner.subscribe(EventFilter::new()); - let sender = &scanner.listeners[0].sender; - assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES); + pub fn from_block( + self, + block: impl Into, + ) -> SyncFromBlockEventScannerBuilder { + SyncFromBlockEventScannerBuilder::new(block.into()) } } diff --git a/src/event_scanner/modes/sync/from_block.rs b/src/event_scanner/modes/sync/from_block.rs new file mode 100644 index 0000000..16ba585 --- /dev/null +++ b/src/event_scanner/modes/sync/from_block.rs @@ -0,0 +1,222 @@ +use alloy::{ + eips::BlockNumberOrTag, + network::Network, + providers::RootProvider, + transports::{TransportResult, http::reqwest::Url}, +}; + +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use crate::{ + ScannerError, + block_range_scanner::{ + BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, + MAX_BUFFERED_MESSAGES, + }, + event_scanner::{ + filter::EventFilter, + listener::EventListener, + message::Message, + modes::common::{ConsumerMode, handle_stream}, + }, +}; + +pub struct SyncFromBlockEventScannerBuilder { + block_range_scanner: BlockRangeScanner, + from_block: BlockNumberOrTag, + block_confirmations: u64, +} + +pub struct SyncFromBlockEventScanner { + config: SyncFromBlockEventScannerBuilder, + block_range_scanner: ConnectedBlockRangeScanner, + listeners: Vec, +} + +impl SyncFromBlockEventScannerBuilder { + #[must_use] + pub(crate) fn new(from_block: impl Into) -> Self { + Self { + block_range_scanner: BlockRangeScanner::new(), + from_block: from_block.into(), + block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, + } + } + + #[must_use] + pub fn max_block_range(mut self, max_block_range: u64) -> Self { + self.block_range_scanner.max_block_range = max_block_range; + self + } + + #[must_use] + pub fn block_confirmations(mut self, count: u64) -> Self { + self.block_confirmations = count; + self + } + + /// Connects to the provider via WebSocket. + /// + /// Final builder method: consumes the builder and returns the built + /// [`SyncFromBlockEventScanner`]. + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ws( + self, + ws_url: Url, + ) -> TransportResult> { + let block_range_scanner = self.block_range_scanner.connect_ws::(ws_url).await?; + Ok(SyncFromBlockEventScanner { config: self, block_range_scanner, listeners: Vec::new() }) + } + + /// Connects to the provider via IPC. + /// + /// Final builder method: consumes the builder and returns the built + /// [`SyncFromBlockEventScanner`]. + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ipc( + self, + ipc_path: String, + ) -> TransportResult> { + let block_range_scanner = self.block_range_scanner.connect_ipc::(ipc_path).await?; + Ok(SyncFromBlockEventScanner { config: self, block_range_scanner, listeners: Vec::new() }) + } + + /// Connects to an existing provider. + /// + /// Final builder method: consumes the builder and returns the built + /// [`SyncFromBlockEventScanner`]. + /// + /// # Errors + /// + /// Returns an error if the connection fails + #[must_use] + pub fn connect(self, provider: RootProvider) -> SyncFromBlockEventScanner { + let block_range_scanner = self.block_range_scanner.connect::(provider); + SyncFromBlockEventScanner { config: self, block_range_scanner, listeners: Vec::new() } + } +} + +impl SyncFromBlockEventScanner { + #[must_use] + pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { + let (sender, receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); + self.listeners.push(EventListener { filter, sender }); + ReceiverStream::new(receiver) + } + + /// Starts the scanner in sync (historical → live) mode. + /// + /// Streams from `from_block` up to the current confirmed tip using the configured + /// `block_confirmations`, then continues streaming new confirmed ranges live. + /// + /// # Reorg behavior + /// + /// - In live mode, emits [`ScannerStatus::ReorgDetected`] and adjusts the next confirmed range + /// using `block_confirmations` to re-emit the confirmed portion. + /// + /// # Errors + /// + /// - `EventScannerMessage::ServiceShutdown` if the service is already shutting down. + pub async fn start(self) -> Result<(), ScannerError> { + let client = self.block_range_scanner.run()?; + let stream = + client.stream_from(self.config.from_block, self.config.block_confirmations).await?; + + let provider = self.block_range_scanner.provider().clone(); + let listeners = self.listeners.clone(); + + tokio::spawn(async move { + handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await; + }); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::{network::Ethereum, rpc::client::RpcClient, transports::mock::Asserter}; + + #[test] + fn test_sync_scanner_config_defaults() { + let config = SyncFromBlockEventScannerBuilder::new(BlockNumberOrTag::Earliest); + + assert!(matches!(config.from_block, BlockNumberOrTag::Earliest)); + assert_eq!(config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); + } + + #[test] + fn test_sync_scanner_builder_pattern() { + let config = + SyncFromBlockEventScannerBuilder::new(50).max_block_range(25).block_confirmations(5); + + assert_eq!(config.block_range_scanner.max_block_range, 25); + assert_eq!(config.block_confirmations, 5); + assert!(matches!(config.from_block, BlockNumberOrTag::Number(50))); + } + + #[test] + fn test_sync_scanner_builder_with_different_block_types() { + let config = SyncFromBlockEventScannerBuilder::new(BlockNumberOrTag::Earliest) + .block_confirmations(20) + .max_block_range(100); + + assert!(matches!(config.from_block, BlockNumberOrTag::Earliest)); + assert_eq!(config.block_confirmations, 20); + assert_eq!(config.block_range_scanner.max_block_range, 100); + } + + #[test] + fn test_sync_scanner_builder_with_zero_confirmations() { + let config = + SyncFromBlockEventScannerBuilder::new(0).block_confirmations(0).max_block_range(75); + + assert!(matches!(config.from_block, BlockNumberOrTag::Number(0))); + assert_eq!(config.block_confirmations, 0); + assert_eq!(config.block_range_scanner.max_block_range, 75); + } + + #[test] + fn test_sync_scanner_builder_last_call_wins() { + let config = SyncFromBlockEventScannerBuilder::new(1) + .max_block_range(25) + .max_block_range(55) + .max_block_range(105) + .block_confirmations(5) + .block_confirmations(7); + + assert_eq!(config.block_range_scanner.max_block_range, 105); + assert!(matches!(config.from_block, BlockNumberOrTag::Number(2))); + assert_eq!(config.block_confirmations, 7); + } + + #[test] + fn test_sync_event_stream_listeners_vector_updates() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let mut scanner = SyncFromBlockEventScannerBuilder::new(BlockNumberOrTag::Earliest) + .connect::(provider); + assert_eq!(scanner.listeners.len(), 0); + let _stream1 = scanner.subscribe(EventFilter::new()); + assert_eq!(scanner.listeners.len(), 1); + let _stream2 = scanner.subscribe(EventFilter::new()); + let _stream3 = scanner.subscribe(EventFilter::new()); + assert_eq!(scanner.listeners.len(), 3); + } + + #[test] + fn test_sync_event_stream_channel_capacity() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let mut scanner = SyncFromBlockEventScannerBuilder::new(BlockNumberOrTag::Earliest) + .connect::(provider); + let _stream = scanner.subscribe(EventFilter::new()); + let sender = &scanner.listeners[0].sender; + assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES); + } +} diff --git a/src/event_scanner/modes/sync/from_latest.rs b/src/event_scanner/modes/sync/from_latest.rs new file mode 100644 index 0000000..98e0905 --- /dev/null +++ b/src/event_scanner/modes/sync/from_latest.rs @@ -0,0 +1,175 @@ +use alloy::{ + consensus::BlockHeader, + eips::BlockNumberOrTag, + network::{BlockResponse, Network}, + providers::{Provider, RootProvider}, + transports::{TransportResult, http::reqwest::Url}, +}; + +use tokio::sync::mpsc; +use tokio_stream::{StreamExt, wrappers::ReceiverStream}; +use tracing::{info, warn}; + +use crate::{ + ScannerError, ScannerStatus, + block_range_scanner::{ + BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, + MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage, + }, + event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, +}; + +use super::{ConsumerMode, handle_stream}; + +pub struct SyncFromLatestScannerBuilder { + block_range_scanner: BlockRangeScanner, + latest_events_count: usize, + block_confirmations: u64, +} + +pub struct SyncFromLatestEventScanner { + config: SyncFromLatestScannerBuilder, + block_range_scanner: ConnectedBlockRangeScanner, + listeners: Vec, +} + +impl SyncFromLatestScannerBuilder { + #[must_use] + pub(crate) fn new(count: usize) -> Self { + Self { + block_range_scanner: BlockRangeScanner::new(), + latest_events_count: count, + block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, + } + } + + #[must_use] + pub fn block_confirmations(mut self, count: u64) -> Self { + self.block_confirmations = count; + self + } + + /// Connects to the provider via WebSocket. + /// + /// Final builder method: consumes the builder and returns the built + /// [`SyncFromLatestEventScanner`]. + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ws( + self, + ws_url: Url, + ) -> TransportResult> { + let block_range_scanner = self.block_range_scanner.connect_ws::(ws_url).await?; + Ok(SyncFromLatestEventScanner { config: self, block_range_scanner, listeners: Vec::new() }) + } + + /// Connects to the provider via IPC. + /// + /// Final builder method: consumes the builder and returns the built + /// [`SyncFromLatestEventScanner`]. + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ipc( + self, + ipc_path: String, + ) -> TransportResult> { + let block_range_scanner = self.block_range_scanner.connect_ipc::(ipc_path).await?; + Ok(SyncFromLatestEventScanner { config: self, block_range_scanner, listeners: Vec::new() }) + } + + /// Connects to an existing provider. + /// + /// Final builder method: consumes the builder and returns the built + /// [`SyncFromLatestEventScanner`]. + /// + /// # Errors + /// + /// Returns an error if the connection fails + #[must_use] + pub fn connect(self, provider: RootProvider) -> SyncFromLatestEventScanner { + let block_range_scanner = self.block_range_scanner.connect::(provider); + SyncFromLatestEventScanner { config: self, block_range_scanner, listeners: Vec::new() } + } +} + +impl SyncFromLatestEventScanner { + #[must_use] + pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { + let (sender, receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); + self.listeners.push(EventListener { filter, sender }); + ReceiverStream::new(receiver) + } + + /// Starts the scanner. + /// + /// # Errors + /// + /// Returns an error if the scanner fails to start. + pub async fn start(self) -> Result<(), ScannerError> { + let count = self.config.latest_events_count; + let provider = self.block_range_scanner.provider().clone(); + let listeners = self.listeners.clone(); + + info!(count = count, "Starting scanner, mode: fetch latest events and switch to live"); + + let client = self.block_range_scanner.run()?; + + // Fetch the latest block number. + // This is used to determine the starting point for the rewind stream and the live + // stream. We do this before starting the streams to avoid a race condition + // where the latest block changes while we're setting up the streams. + let latest_block = provider + .get_block_by_number(BlockNumberOrTag::Latest) + .await? + .ok_or_else(|| ScannerError::BlockNotFound(BlockNumberOrTag::Latest))? + .header() + .number(); + + // Setup rewind and live streams to run in parallel. + let rewind_stream = client.rewind(BlockNumberOrTag::Earliest, latest_block).await?; + // We actually rely on the sync mode for the live stream, to + // ensure that we don't miss any events in case a new block was minted while + // we were setting up the streams or a reorg happens. + let sync_stream = + client.stream_from(latest_block + 1, self.config.block_confirmations).await?; + + // Start streaming... + tokio::spawn(async move { + // Since both rewind and live log consumers are ultimately streaming to the same + // channel, we must ensure that all latest events are streamed before + // consuming the live stream, otherwise the log consumers may send events out + // of order. + handle_stream( + rewind_stream, + &provider, + &listeners, + ConsumerMode::CollectLatest { count }, + ) + .await; + + // Notify the client that we're now streaming live. + info!("Switching to live stream"); + + // Use a one-off channel for the notification. + let (tx, rx) = mpsc::channel::(1); + let stream = ReceiverStream::new(rx); + if tx.send(BlockRangeMessage::Status(ScannerStatus::SwitchingToLive)).await.is_err() { + warn!("No log consumers, stopping stream"); + return; + } + // close the channel to drop log consumers immediately + drop(tx); + + let sync_stream = stream.chain(sync_stream); + + // Start the live (sync) stream. + handle_stream(sync_stream, &provider, &listeners, ConsumerMode::Stream).await; + }); + + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index a5f4456..53cac73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,5 +10,5 @@ pub use types::{ScannerMessage, ScannerStatus}; pub use event_scanner::{ EventFilter, EventScanner, HistoricEventScanner, LatestEventScanner, LiveEventScanner, Message, - SyncEventScanner, + SyncFromBlockEventScanner, SyncFromLatestEventScanner, }; diff --git a/src/types.rs b/src/types.rs index 637b86e..a9930e6 100644 --- a/src/types.rs +++ b/src/types.rs @@ -9,7 +9,7 @@ pub enum ScannerMessage { #[derive(Copy, Debug, Clone, PartialEq)] pub enum ScannerStatus { - ChainTipReached, + SwitchingToLive, ReorgDetected, } diff --git a/tests/common.rs b/tests/common.rs index 542a559..e788e76 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -1,11 +1,13 @@ #![allow(clippy::missing_errors_doc)] #![allow(clippy::missing_panics_doc)] +#![allow(missing_docs)] + use std::sync::Arc; use alloy::{ eips::BlockNumberOrTag, network::Ethereum, - primitives::FixedBytes, + primitives::{FixedBytes, U256}, providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi}, rpc::types::anvil::{ReorgOptions, TransactionData}, sol, @@ -14,7 +16,7 @@ use alloy::{ use alloy_node_bindings::{Anvil, AnvilInstance}; use event_scanner::{ EventFilter, EventScanner, HistoricEventScanner, LatestEventScanner, LiveEventScanner, Message, - SyncEventScanner, + SyncFromBlockEventScanner, SyncFromLatestEventScanner, test_utils::LogMetadata, }; use tokio_stream::wrappers::ReceiverStream; @@ -60,7 +62,8 @@ where pub type LiveScannerSetup

= ScannerSetup, P>; pub type HistoricScannerSetup

= ScannerSetup, P>; -pub type SyncScannerSetup

= ScannerSetup, P>; +pub type SyncScannerSetup

= ScannerSetup, P>; +pub type SyncFromLatestScannerSetup

= ScannerSetup, P>; pub type LatestScannerSetup

= ScannerSetup, P>; pub async fn setup_common( @@ -105,11 +108,32 @@ pub async fn setup_live_scanner( pub async fn setup_sync_scanner( block_interval: Option, filter: Option, + from: impl Into, confirmations: u64, ) -> anyhow::Result + Clone>> { let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?; let mut scanner = EventScanner::sync() + .from_block(from) + .block_confirmations(confirmations) + .connect_ws(anvil.ws_endpoint_url()) + .await?; + + let stream = scanner.subscribe(filter); + + Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) +} + +pub async fn setup_sync_from_latest_scanner( + block_interval: Option, + filter: Option, + latest: usize, + confirmations: u64, +) -> anyhow::Result + Clone>> { + let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?; + + let mut scanner = EventScanner::sync() + .from_latest(latest) .block_confirmations(confirmations) .connect_ws(anvil.ws_endpoint_url()) .await?; @@ -241,7 +265,6 @@ pub async fn build_provider(anvil: &AnvilInstance) -> anyhow::Result(provider: P) -> anyhow::Result> where P: alloy::providers::Provider + Clone, @@ -249,3 +272,16 @@ where let contract = TestCounter::deploy(provider).await?; Ok(contract) } + +pub async fn increase( + contract: &TestCounter::TestCounterInstance>, +) -> anyhow::Result> { + let receipt = contract.increase().send().await?.get_receipt().await?; + let tx_hash = receipt.transaction_hash; + let new_count = receipt.decoded_log::().unwrap().data.newCount; + Ok(LogMetadata { + event: TestCounter::CountIncreased { newCount: U256::from(new_count) }, + address: *contract.address(), + tx_hash, + }) +} diff --git a/tests/historic_mode/basic.rs b/tests/historic_mode/basic.rs index b0bcda9..77beb04 100644 --- a/tests/historic_mode/basic.rs +++ b/tests/historic_mode/basic.rs @@ -1,55 +1,33 @@ -use std::{ - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, - time::Duration, -}; +use alloy::eips::BlockNumberOrTag; +use event_scanner::assert_next; -use event_scanner::Message; -use tokio::time::timeout; -use tokio_stream::StreamExt; - -use crate::common::{TestCounter, setup_historic_scanner}; +use crate::common::{increase, setup_historic_scanner}; #[tokio::test] async fn processes_events_within_specified_historical_range() -> anyhow::Result<()> { let setup = setup_historic_scanner( Some(0.1), None, - alloy::eips::BlockNumberOrTag::Earliest, - alloy::eips::BlockNumberOrTag::Latest, + BlockNumberOrTag::Earliest, + BlockNumberOrTag::Latest, ) .await?; - - let expected_event_count = 4; - - for _ in 0..expected_event_count { - setup.contract.increase().send().await?.watch().await?; - } - + let contract = setup.contract; let scanner = setup.scanner; - let mut stream = setup.stream.take(expected_event_count); + let mut stream = setup.stream; - tokio::spawn(async move { scanner.start().await }); - - let event_count = Arc::new(AtomicUsize::new(0)); - let event_count_clone = Arc::clone(&event_count); - let event_counting = async move { - let mut expected_new_count = 1; - while let Some(Message::Data(logs)) = stream.next().await { - event_count_clone.fetch_add(logs.len(), Ordering::SeqCst); - for log in logs { - let TestCounter::CountIncreased { newCount } = log.log_decode().unwrap().inner.data; - assert_eq!(newCount, expected_new_count); - expected_new_count += 1; - } - } - }; + let expected = &[ + increase(&contract).await?, + increase(&contract).await?, + increase(&contract).await?, + increase(&contract).await?, + increase(&contract).await?, + ]; - _ = timeout(Duration::from_secs(3), event_counting).await; + tokio::spawn(async move { scanner.start().await }); - assert_eq!(event_count.load(Ordering::SeqCst), expected_event_count); + assert_next!(stream, expected); + assert_next!(stream, None); Ok(()) } diff --git a/tests/historic_to_live/basic.rs b/tests/historic_to_live/basic.rs index 5ed9d2a..f902513 100644 --- a/tests/historic_to_live/basic.rs +++ b/tests/historic_to_live/basic.rs @@ -1,11 +1,11 @@ -use alloy::primitives::U256; +use alloy::{eips::BlockNumberOrTag, primitives::U256}; use event_scanner::{assert_next, types::ScannerStatus}; use crate::common::{TestCounter, setup_sync_scanner}; #[tokio::test] async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> { - let setup = setup_sync_scanner(Some(0.1), None, 0).await?; + let setup = setup_sync_scanner(Some(0.1), None, BlockNumberOrTag::Earliest, 0).await?; let contract = setup.contract.clone(); let historical_events = 3; @@ -35,7 +35,7 @@ async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> { ); // chain tip reached - assert_next!(stream, ScannerStatus::ChainTipReached); + assert_next!(stream, ScannerStatus::SwitchingToLive); // live events assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(4) },]); diff --git a/tests/historic_to_live/from_latest.rs b/tests/historic_to_live/from_latest.rs new file mode 100644 index 0000000..f48cd06 --- /dev/null +++ b/tests/historic_to_live/from_latest.rs @@ -0,0 +1,172 @@ +use alloy::providers::ext::AnvilApi; + +use crate::common::{TestCounter, increase, setup_sync_from_latest_scanner}; +use event_scanner::{assert_next, test_utils::LogMetadata, types::ScannerStatus}; + +#[tokio::test] +async fn scan_latest_then_live_happy_path_no_duplicates() -> anyhow::Result<()> { + let setup = setup_sync_from_latest_scanner(None, None, 3, 0).await?; + let contract = setup.contract; + let scanner = setup.scanner; + let mut stream = setup.stream; + + // Historical: produce 6 events total + _ = increase(&contract).await?; + _ = increase(&contract).await?; + _ = increase(&contract).await?; + + let mut expected_latest = vec![]; + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + + // Ask for the latest 3, then live + scanner.start().await?; + + // Latest phase + assert_next!(stream, expected_latest); + // Transition to live + assert_next!(stream, ScannerStatus::SwitchingToLive); + + // Live phase: emit three more, should arrive in order without duplicating latest + let live1 = increase(&contract).await?; + let live2 = increase(&contract).await?; + + assert_next!(stream, &[live1]); + assert_next!(stream, &[live2]); + + Ok(()) +} + +#[tokio::test] +async fn scan_latest_then_live_fewer_historical_then_continues_live() -> anyhow::Result<()> { + let setup = setup_sync_from_latest_scanner(None, None, 5, 0).await?; + let contract = setup.contract; + let scanner = setup.scanner; + let mut stream = setup.stream; + + // Historical: only 2 available + let mut expected_latest = vec![]; + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + + scanner.start().await?; + + // Latest phase returns all available + assert_next!(stream, &expected_latest); + assert_next!(stream, ScannerStatus::SwitchingToLive); + + // Live: two more arrive + let live1 = increase(&contract).await?; + let live2 = increase(&contract).await?; + assert_next!(stream, &[live1]); + assert_next!(stream, &[live2]); + + Ok(()) +} + +#[tokio::test] +async fn scan_latest_then_live_exact_historical_count_then_live() -> anyhow::Result<()> { + let setup = setup_sync_from_latest_scanner(None, None, 4, 0).await?; + let contract = setup.contract; + let scanner = setup.scanner; + let mut stream = setup.stream; + + // Historical: produce exactly 4 events + let mut expected_latest = vec![]; + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + + scanner.start().await?; + + assert_next!(stream, expected_latest); + assert_next!(stream, ScannerStatus::SwitchingToLive); + + // Live continues + let live = increase(&contract).await?; + assert_next!(stream, &[live]); + + Ok(()) +} + +#[tokio::test] +async fn scan_latest_then_live_no_historical_only_live_streams() -> anyhow::Result<()> { + let setup = setup_sync_from_latest_scanner(None, None, 5, 0).await?; + let contract = setup.contract; + let scanner = setup.scanner; + let mut stream = setup.stream; + + scanner.start().await?; + + // Latest is empty + let expected: &[LogMetadata] = &[]; + assert_next!(stream, expected); + assert_next!(stream, ScannerStatus::SwitchingToLive); + + // Live events arrive + let live1 = increase(&contract).await?; + let live2 = increase(&contract).await?; + assert_next!(stream, &[live1]); + assert_next!(stream, &[live2]); + + Ok(()) +} + +#[tokio::test] +async fn scan_latest_then_live_boundary_no_duplication() -> anyhow::Result<()> { + let setup = setup_sync_from_latest_scanner(None, None, 3, 0).await?; + let provider = setup.provider; + let contract = setup.contract; + let scanner = setup.scanner; + let mut stream = setup.stream; + + // Historical: emit 3, mine 1 empty block to form a clear boundary + let mut expected_latest = vec![]; + expected_latest.push(increase(&contract).await?); + + provider.anvil_mine(Some(1), None).await?; + + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + + provider.anvil_mine(Some(1), None).await?; + + scanner.start().await?; + + // Latest phase + assert_next!(stream, &expected_latest); + assert_next!(stream, ScannerStatus::SwitchingToLive); + + // Immediately produce a new live event in a new block + let live = increase(&contract).await?; + assert_next!(stream, &[live]); + + Ok(()) +} + +#[tokio::test] +async fn scan_latest_then_live_waiting_on_live_logs_arriving() -> anyhow::Result<()> { + let setup = setup_sync_from_latest_scanner(None, None, 3, 0).await?; + let contract = setup.contract; + let scanner = setup.scanner; + let mut stream = setup.stream; + + // Historical: emit 3, mine 1 empty block to form a clear boundary + let mut expected_latest = vec![]; + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + expected_latest.push(increase(&contract).await?); + + scanner.start().await?; + + // Latest phase + assert_next!(stream, &expected_latest); + assert_next!(stream, ScannerStatus::SwitchingToLive); + + let inner = stream.into_inner(); + assert!(inner.is_empty()); + + Ok(()) +} diff --git a/tests/historic_to_live/mod.rs b/tests/historic_to_live/mod.rs index b471e59..598273b 100644 --- a/tests/historic_to_live/mod.rs +++ b/tests/historic_to_live/mod.rs @@ -1,2 +1,4 @@ pub mod basic; +pub mod from_latest; pub mod reorg; +pub mod sync_from_future; diff --git a/tests/historic_to_live/reorg.rs b/tests/historic_to_live/reorg.rs index d4a85ab..01686da 100644 --- a/tests/historic_to_live/reorg.rs +++ b/tests/historic_to_live/reorg.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use alloy::providers::ext::AnvilApi; +use alloy::{eips::BlockNumberOrTag, providers::ext::AnvilApi}; use event_scanner::{Message, types::ScannerStatus}; use tokio::{sync::Mutex, time::timeout}; use tokio_stream::StreamExt; @@ -12,7 +12,9 @@ async fn block_confirmations_mitigate_reorgs_historic_to_live() -> anyhow::Resul // any reorg ≤ 5 should be invisible to consumers let block_confirmations = 5; - let setup = setup_sync_scanner(Some(1.0), None, block_confirmations).await?; + let setup = + setup_sync_scanner(Some(1.0), None, BlockNumberOrTag::Earliest, block_confirmations) + .await?; let provider = setup.provider.clone(); let contract = setup.contract.clone(); diff --git a/tests/historic_to_live/sync_from_future.rs b/tests/historic_to_live/sync_from_future.rs new file mode 100644 index 0000000..53ab82c --- /dev/null +++ b/tests/historic_to_live/sync_from_future.rs @@ -0,0 +1,31 @@ +use crate::common::{increase, setup_sync_scanner}; +use event_scanner::assert_next; +use tokio_stream::wrappers::ReceiverStream; + +#[tokio::test] +async fn sync_from_future_block_waits_until_minted() -> anyhow::Result<()> { + let future_start_block = 4; + let setup = setup_sync_scanner(None, None, future_start_block, 0).await?; + let contract = setup.contract; + let scanner = setup.scanner; + + // Start the scanner in sync mode from the future block + scanner.start().await?; + + // Send 2 transactions that should not appear in the stream + _ = increase(&contract).await?; + _ = increase(&contract).await?; + + // Assert: no messages should be received before reaching the start height + let inner = setup.stream.into_inner(); + assert!(inner.is_empty()); + let mut stream = ReceiverStream::new(inner); + + // Act: emit an event that will be mined in block == future_start + let expected = &[increase(&contract).await?]; + + // Assert: the first streamed message arrives and contains the expected event + assert_next!(stream, expected); + + Ok(()) +}