From 38beddf4580c218f9a33bccd4e64a90cd4eb058a Mon Sep 17 00:00:00 2001 From: varun-doshi Date: Mon, 27 Oct 2025 21:14:24 +0530 Subject: [PATCH 1/3] feat(orchestrator): parallelize consolidation pattern --- crates/chain-orchestrator/src/lib.rs | 76 ++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 0845c18c..5e6ddc14 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -5,7 +5,7 @@ use alloy_eips::Encodable2718; use alloy_primitives::{b256, bytes::Bytes, keccak256, B256}; use alloy_provider::Provider; use alloy_rpc_types_engine::ExecutionPayloadV1; -use futures::StreamExt; +use futures::{stream, StreamExt}; use reth_chainspec::EthChainSpec; use reth_network_api::{BlockDownloaderProvider, FullNetwork}; use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient}; @@ -87,6 +87,17 @@ const HEADER_FETCH_COUNT: u64 = 100; /// The size of the event channel used to broadcast events to listeners. const EVENT_CHANNEL_SIZE: usize = 5000; +/// The maximum number of concurrent tasks that can be executed. +#[cfg(not(any(test, feature = "test-utils")))] +const CONCURRENCY_LIMIT: usize = 10; +#[cfg(any(test, feature = "test-utils"))] +const CONCURRENCY_LIMIT: usize = 1; +/// The batch size for batch validation. +#[cfg(not(any(test, feature = "test-utils")))] +const BATCH_SIZE: usize = 100; +#[cfg(any(test, feature = "test-utils"))] +const BATCH_SIZE: usize = 1; + /// The [`ChainOrchestrator`] is responsible for orchestrating the progression of the L2 chain /// based on data consolidated from L1 and the data received over the p2p network. #[derive(Debug)] @@ -1082,28 +1093,51 @@ impl< if head_block_number == safe_block_number { tracing::trace!(target: "scroll::chain_orchestrator", "No unsafe blocks to consolidate"); } else { - let start_block_number = safe_block_number + 1; - // TODO: Make fetching parallel but ensure concurrency limits are respected. - let mut blocks_to_validate = vec![]; - for block_number in start_block_number..=head_block_number { - let block = self - .l2_client - .get_block_by_number(block_number.into()) - .full() - .await? - .ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))? - .into_consensus() - .map_transactions(|tx| tx.inner.into_inner()); - blocks_to_validate.push(block); - } + let block_stream = stream::iter(safe_block_number + 1..=head_block_number) + .map(|block_number| { + let client = self.l2_client.clone(); - self.validate_l1_messages(&blocks_to_validate).await?; + async move { + client + .get_block_by_number(block_number.into()) + .full() + .await? + .ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number)) + .map(|b| { + b.into_consensus().map_transactions(|tx| tx.inner.into_inner()) + }) + } + }) + .buffer_unordered(CONCURRENCY_LIMIT); - self.database - .update_l1_messages_from_l2_blocks( - blocks_to_validate.into_iter().map(|b| (&b).into()).collect(), - ) - .await?; + let mut buffered = vec![]; + futures::pin_mut!(block_stream); + + while let Some(block_result) = block_stream.next().await { + let block = block_result?; + buffered.push(block); + + if buffered.len() >= BATCH_SIZE { + let blocks_to_validate = std::mem::take(&mut buffered); + self.validate_l1_messages(blocks_to_validate.as_slice()).await?; + self.database + .update_l1_messages_from_l2_blocks( + blocks_to_validate.as_slice().into_iter().map(|b| b.into()).collect(), + ) + .await?; + } + } + + // Process any remaining blocks + if !buffered.is_empty() { + let blocks_to_validate = std::mem::take(&mut buffered); + self.validate_l1_messages(blocks_to_validate.as_slice()).await?; + self.database + .update_l1_messages_from_l2_blocks( + blocks_to_validate.as_slice().into_iter().map(|b| b.into()).collect(), + ) + .await?; + } }; // send a notification to the network that the chain is synced such that it accepts From 788646ea418e69967a313896bf4cce65f77f6f16 Mon Sep 17 00:00:00 2001 From: varun-doshi Date: Mon, 27 Oct 2025 23:42:14 +0530 Subject: [PATCH 2/3] fix: lint --- crates/chain-orchestrator/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 5e6ddc14..ccc1992c 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -1122,7 +1122,7 @@ impl< self.validate_l1_messages(blocks_to_validate.as_slice()).await?; self.database .update_l1_messages_from_l2_blocks( - blocks_to_validate.as_slice().into_iter().map(|b| b.into()).collect(), + blocks_to_validate.as_slice().iter().map(|b| b.into()).collect(), ) .await?; } @@ -1134,7 +1134,7 @@ impl< self.validate_l1_messages(blocks_to_validate.as_slice()).await?; self.database .update_l1_messages_from_l2_blocks( - blocks_to_validate.as_slice().into_iter().map(|b| b.into()).collect(), + blocks_to_validate.as_slice().iter().map(|b| b.into()).collect(), ) .await?; } From 25edd8ed940b3cb00b9d2746793ad6584ff5c794 Mon Sep 17 00:00:00 2001 From: varun-doshi Date: Thu, 30 Oct 2025 00:38:35 +0530 Subject: [PATCH 3/3] fix: use buffered chunks --- crates/chain-orchestrator/src/lib.rs | 48 +++++++++++----------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index ccc1992c..32496073 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -5,7 +5,7 @@ use alloy_eips::Encodable2718; use alloy_primitives::{b256, bytes::Bytes, keccak256, B256}; use alloy_provider::Provider; use alloy_rpc_types_engine::ExecutionPayloadV1; -use futures::{stream, StreamExt}; +use futures::{stream, StreamExt, TryStreamExt}; use reth_chainspec::EthChainSpec; use reth_network_api::{BlockDownloaderProvider, FullNetwork}; use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient}; @@ -87,11 +87,6 @@ const HEADER_FETCH_COUNT: u64 = 100; /// The size of the event channel used to broadcast events to listeners. const EVENT_CHANNEL_SIZE: usize = 5000; -/// The maximum number of concurrent tasks that can be executed. -#[cfg(not(any(test, feature = "test-utils")))] -const CONCURRENCY_LIMIT: usize = 10; -#[cfg(any(test, feature = "test-utils"))] -const CONCURRENCY_LIMIT: usize = 1; /// The batch size for batch validation. #[cfg(not(any(test, feature = "test-utils")))] const BATCH_SIZE: usize = 100; @@ -1108,33 +1103,26 @@ impl< }) } }) - .buffer_unordered(CONCURRENCY_LIMIT); - - let mut buffered = vec![]; - futures::pin_mut!(block_stream); - - while let Some(block_result) = block_stream.next().await { - let block = block_result?; - buffered.push(block); - - if buffered.len() >= BATCH_SIZE { - let blocks_to_validate = std::mem::take(&mut buffered); - self.validate_l1_messages(blocks_to_validate.as_slice()).await?; - self.database - .update_l1_messages_from_l2_blocks( - blocks_to_validate.as_slice().iter().map(|b| b.into()).collect(), - ) - .await?; + .buffered(BATCH_SIZE); + + let mut block_chunks = block_stream.try_chunks(BATCH_SIZE); + + while let Some(blocks_result) = block_chunks.next().await { + let blocks_to_validate = + blocks_result.map_err(|_| ChainOrchestratorError::InvalidBlock)?; + + if let Err(e) = self.validate_l1_messages(&blocks_to_validate).await { + tracing::error!( + target: "scroll::chain_orchestrator", + error = ?e, + "Validation failed — purging all L1→L2 message mappings" + ); + self.database.purge_l1_message_to_l2_block_mappings(None).await?; + return Err(e); } - } - - // Process any remaining blocks - if !buffered.is_empty() { - let blocks_to_validate = std::mem::take(&mut buffered); - self.validate_l1_messages(blocks_to_validate.as_slice()).await?; self.database .update_l1_messages_from_l2_blocks( - blocks_to_validate.as_slice().iter().map(|b| b.into()).collect(), + blocks_to_validate.iter().map(|b| b.into()).collect(), ) .await?; }