Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 55 additions & 21 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use alloy_eips::Encodable2718;
use alloy_primitives::{b256, bytes::Bytes, keccak256, B256};
use alloy_provider::Provider;
use alloy_rpc_types_engine::ExecutionPayloadV1;
use futures::StreamExt;
use futures::{stream, StreamExt};
use reth_chainspec::EthChainSpec;
use reth_network_api::{BlockDownloaderProvider, FullNetwork};
use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient};
Expand Down Expand Up @@ -87,6 +87,17 @@ const HEADER_FETCH_COUNT: u64 = 100;
/// The size of the event channel used to broadcast events to listeners.
const EVENT_CHANNEL_SIZE: usize = 5000;

/// The maximum number of concurrent tasks that can be executed.
#[cfg(not(any(test, feature = "test-utils")))]
const CONCURRENCY_LIMIT: usize = 10;
#[cfg(any(test, feature = "test-utils"))]
const CONCURRENCY_LIMIT: usize = 1;
/// The batch size for batch validation.
#[cfg(not(any(test, feature = "test-utils")))]
const BATCH_SIZE: usize = 100;
#[cfg(any(test, feature = "test-utils"))]
const BATCH_SIZE: usize = 1;

/// The [`ChainOrchestrator`] is responsible for orchestrating the progression of the L2 chain
/// based on data consolidated from L1 and the data received over the p2p network.
#[derive(Debug)]
Expand Down Expand Up @@ -1082,28 +1093,51 @@ impl<
if head_block_number == safe_block_number {
tracing::trace!(target: "scroll::chain_orchestrator", "No unsafe blocks to consolidate");
} else {
let start_block_number = safe_block_number + 1;
// TODO: Make fetching parallel but ensure concurrency limits are respected.
let mut blocks_to_validate = vec![];
for block_number in start_block_number..=head_block_number {
let block = self
.l2_client
.get_block_by_number(block_number.into())
.full()
.await?
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))?
.into_consensus()
.map_transactions(|tx| tx.inner.into_inner());
blocks_to_validate.push(block);
}
let block_stream = stream::iter(safe_block_number + 1..=head_block_number)
.map(|block_number| {
let client = self.l2_client.clone();

self.validate_l1_messages(&blocks_to_validate).await?;
async move {
client
.get_block_by_number(block_number.into())
.full()
.await?
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))
.map(|b| {
b.into_consensus().map_transactions(|tx| tx.inner.into_inner())
})
}
})
.buffer_unordered(CONCURRENCY_LIMIT);

self.database
.update_l1_messages_from_l2_blocks(
blocks_to_validate.into_iter().map(|b| (&b).into()).collect(),
)
.await?;
let mut buffered = vec![];
futures::pin_mut!(block_stream);

while let Some(block_result) = block_stream.next().await {
let block = block_result?;
buffered.push(block);

if buffered.len() >= BATCH_SIZE {
let blocks_to_validate = std::mem::take(&mut buffered);
self.validate_l1_messages(blocks_to_validate.as_slice()).await?;
self.database
.update_l1_messages_from_l2_blocks(
blocks_to_validate.as_slice().iter().map(|b| b.into()).collect(),
)
.await?;
}
}

// Process any remaining blocks
if !buffered.is_empty() {
let blocks_to_validate = std::mem::take(&mut buffered);
self.validate_l1_messages(blocks_to_validate.as_slice()).await?;
self.database
.update_l1_messages_from_l2_blocks(
blocks_to_validate.as_slice().iter().map(|b| b.into()).collect(),
)
.await?;
}
};

// send a notification to the network that the chain is synced such that it accepts
Expand Down
Loading