Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b509314
feat: implement gap recovery mechanism for L1 watcher and use in Chai…
jonastheis Oct 30, 2025
0ea4ef7
make sure that there's no deadlock with command receiver as L1Watcher…
jonastheis Oct 30, 2025
5670af8
feat: add skipping logic for duplicate L1 messages and batch commits …
jonastheis Oct 30, 2025
ba20206
remove todo
jonastheis Oct 31, 2025
476d906
use select in watcher main loop
jonastheis Oct 31, 2025
f6eaf09
add test to test reset functionality
jonastheis Oct 31, 2025
21588bc
add test for preventing deadlock if send channel is full
jonastheis Oct 31, 2025
c907bd4
fmt
jonastheis Oct 31, 2025
10bc36c
add initial test setup
jonastheis Nov 4, 2025
51100a5
add L1WatcherHandleTrait for easier testability
jonastheis Nov 4, 2025
46c09f9
fix deadlock in test
jonastheis Nov 4, 2025
b96bda5
add testing of gap recovery for batch
jonastheis Nov 4, 2025
abcc90b
fix lint
jonastheis Nov 5, 2025
937b0e0
fix watcher tests
jonastheis Nov 5, 2025
f15ffb9
add possibility to filter by processed to get_batch_by_index
jonastheis Nov 5, 2025
02fb909
make test easier to debug by failing instead of hanging
jonastheis Nov 5, 2025
49d38e5
Revert "add possibility to filter by processed to get_batch_by_index"
jonastheis Nov 5, 2025
6a23c25
address review comments
jonastheis Nov 5, 2025
dce07df
embed L1Notification channel receiver inside of the L1WatcherHandle
jonastheis Nov 6, 2025
47d35e7
Merge remote-tracking branch 'origin/feat/l1-reorg' into feat/self-he…
jonastheis Nov 12, 2025
f4a999e
fixes after merge
jonastheis Nov 12, 2025
41b3ead
add l1_watcher_command_rx to addons for testing like l1_watcher_tx
jonastheis Nov 12, 2025
90fc085
Merge remote-tracking branch 'origin/feat/l1-reorg' into feat/self-he…
jonastheis Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ alloy-transport.workspace = true
# rollup-node
scroll-db = { workspace = true, features = ["test-utils"] }
rollup-node-primitives = { workspace = true, features = ["arbitrary"] }
rollup-node-watcher = { workspace = true, features = ["test-utils"] }

# scroll
reth-scroll-chainspec.workspace = true
reth-scroll-forks.workspace = true
reth-scroll-node = { workspace = true, features = ["test-utils"] }

# reth
reth-eth-wire-types.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions crates/chain-orchestrator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum ChainOrchestratorError {
/// missing.
#[error("L1 message queue gap detected at index {0}, previous L1 message not found")]
L1MessageQueueGap(u64),
/// A duplicate L1 message was detected at index {0}.
#[error("Duplicate L1 message detected at index {0}")]
DuplicateL1Message(u64),
/// An inconsistency was detected when trying to consolidate the chain.
#[error("Chain inconsistency detected")]
ChainInconsistency,
Expand All @@ -60,6 +63,9 @@ pub enum ChainOrchestratorError {
/// A gap was detected in batch commit events: the previous batch before index {0} is missing.
#[error("Batch commit gap detected at index {0}, previous batch commit not found")]
BatchCommitGap(u64),
/// A duplicate batch commit was detected at index {0}.
#[error("Duplicate batch commit detected at {0}")]
DuplicateBatchCommit(BatchInfo),
/// An error occurred while making a network request.
#[error("Network request error: {0}")]
NetworkRequestError(#[from] reth_network_p2p::error::RequestError),
Expand Down Expand Up @@ -92,6 +98,9 @@ pub enum ChainOrchestratorError {
/// An error occurred while handling rollup node primitives.
#[error("An error occurred while handling rollup node primitives: {0}")]
RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError),
/// An error occurred during gap reset.
#[error("Gap reset error: {0}")]
GapResetError(String),
}

impl CanRetry for ChainOrchestratorError {
Expand Down
136 changes: 125 additions & 11 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use reth_network_api::{BlockDownloaderProvider, FullNetwork};
use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient};
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_scroll_primitives::ScrollBlock;

use reth_tasks::shutdown::Shutdown;
use reth_tokio_util::{EventSender, EventStream};
use rollup_node_primitives::{
Expand All @@ -20,7 +21,7 @@ use rollup_node_primitives::{
use rollup_node_providers::L1MessageProvider;
use rollup_node_sequencer::{Sequencer, SequencerEvent};
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
use rollup_node_watcher::L1Notification;
use rollup_node_watcher::{L1Notification, L1WatcherHandle};
use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
Expand All @@ -35,7 +36,7 @@ use scroll_network::{
BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent,
};
use std::{collections::VecDeque, sync::Arc, time::Instant, vec};
use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver};
use tokio::sync::mpsc::{self, UnboundedReceiver};

mod config;
pub use config::ChainOrchestratorConfig;
Expand Down Expand Up @@ -115,8 +116,8 @@ pub struct ChainOrchestrator<
database: Arc<Database>,
/// The current sync state of the [`ChainOrchestrator`].
sync_state: SyncState,
/// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
l1_notification_rx: Receiver<Arc<L1Notification>>,
/// Handle to send commands to the L1 watcher (e.g., for gap recovery).
l1_watcher_handle: L1WatcherHandle,
/// The network manager that manages the scroll p2p network.
network: ScrollNetwork<N>,
/// The consensus algorithm used by the rollup node.
Expand Down Expand Up @@ -150,7 +151,7 @@ impl<
config: ChainOrchestratorConfig<ChainSpec>,
block_client: Arc<FullBlockClient<<N as BlockDownloaderProvider>::Client>>,
l2_provider: L2P,
l1_notification_rx: Receiver<Arc<L1Notification>>,
l1_watcher_handle: L1WatcherHandle,
network: ScrollNetwork<N>,
consensus: Box<dyn Consensus + 'static>,
engine: Engine<EC>,
Expand All @@ -167,7 +168,7 @@ impl<
database,
config,
sync_state: SyncState::default(),
l1_notification_rx,
l1_watcher_handle,
network,
consensus,
engine,
Expand Down Expand Up @@ -224,7 +225,7 @@ impl<
let res = self.handle_network_event(event).await;
self.handle_outcome(res);
}
Some(notification) = self.l1_notification_rx.recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
Some(notification) = self.l1_watcher_handle.l1_notification_receiver().recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
let res = self.handle_l1_notification(notification).await;
self.handle_outcome(res);
}
Expand Down Expand Up @@ -550,7 +551,40 @@ impl<
metered!(Task::L1Finalization, self, handle_l1_finalized(*block_number))
}
L1Notification::BatchCommit { block_info, data } => {
metered!(Task::BatchCommit, self, handle_batch_commit(*block_info, data.clone()))
match metered!(
Task::BatchCommit,
self,
handle_batch_commit(*block_info, data.clone())
) {
Err(ChainOrchestratorError::BatchCommitGap(batch_index)) => {
// Query database for the L1 block of the last known batch
let reset_block =
self.database.get_last_batch_commit_l1_block().await?.unwrap_or(0);

tracing::warn!(
target: "scroll::chain_orchestrator",
"Batch commit gap detected at index {}, last known batch at L1 block {}",
batch_index,
reset_block
);

// Trigger gap recovery
self.l1_watcher_handle.trigger_gap_recovery(reset_block).await;

// Return no event, recovery will re-process
Ok(None)
}
Err(ChainOrchestratorError::DuplicateBatchCommit(batch_info)) => {
tracing::info!(
target: "scroll::chain_orchestrator",
"Duplicate batch commit detected at {:?}, skipping",
batch_info
);
// Return no event, as the batch has already been processed
Ok(None)
}
result => result,
}
}
L1Notification::BatchRevert { batch_info, block_info } => {
metered!(
Expand All @@ -567,7 +601,40 @@ impl<
)
}
L1Notification::L1Message { message, block_info, block_timestamp: _ } => {
metered!(Task::L1Message, self, handle_l1_message(message.clone(), *block_info))
match metered!(
Task::L1Message,
self,
handle_l1_message(message.clone(), *block_info)
) {
Err(ChainOrchestratorError::L1MessageQueueGap(queue_index)) => {
// Query database for the L1 block of the last known L1 message
let reset_block =
self.database.get_last_l1_message_l1_block().await?.unwrap_or(0);

tracing::warn!(
target: "scroll::chain_orchestrator",
"L1 message queue gap detected at index {}, last known message at L1 block {}",
queue_index,
reset_block
);

// Trigger gap recovery
self.l1_watcher_handle.trigger_gap_recovery(reset_block).await;

// Return no event, recovery will re-process
Ok(None)
}
Err(ChainOrchestratorError::DuplicateL1Message(queue_index)) => {
tracing::info!(
target: "scroll::chain_orchestrator",
"Duplicate L1 message detected at {:?}, skipping",
queue_index
);
// Return no event, as the message has already been processed
Ok(None)
}
result => result,
}
}
L1Notification::Synced => {
tracing::info!(target: "scroll::chain_orchestrator", "L1 is now synced");
Expand Down Expand Up @@ -758,6 +825,21 @@ impl<
return Err(ChainOrchestratorError::BatchCommitGap(batch.index));
}

// Check if batch already exists in DB.
if let Some(existing_batch) = tx.get_batch_by_index(batch.index).await? {
if existing_batch.hash == batch.hash {
// This means we have already processed this batch commit, we will skip
// it.
return Err(ChainOrchestratorError::DuplicateBatchCommit(
BatchInfo::new(batch.index, batch.hash),
));
}
// TODO: once batch reverts are implemented, we need to handle this
// case.
// If we have a batch at the same index in the DB this means we have
// missed a batch revert event.
}

let event = ChainOrchestratorEvent::BatchCommitIndexed {
batch_info: (&batch).into(),
l1_block_number: batch.block_number,
Expand Down Expand Up @@ -866,21 +948,53 @@ impl<
.tx_mut(move |tx| {
let l1_message = l1_message.clone();
async move {
// check for gaps in the L1 message queue
if l1_message.transaction.queue_index > 0 &&
tx.get_n_l1_messages(
Some(L1MessageKey::from_queue_index(
l1_message.transaction.queue_index - 1,
)),
1,
)
.await?
.is_empty()
.await?
.is_empty()
{
return Err(ChainOrchestratorError::L1MessageQueueGap(
l1_message.transaction.queue_index,
));
}

// check if the L1 message already exists in the DB
if let Some(existing_message) = tx
.get_n_l1_messages(
Some(L1MessageKey::from_queue_index(
l1_message.transaction.queue_index,
)),
1,
)
.await?
.pop()
{
if existing_message.transaction.tx_hash() ==
l1_message.transaction.tx_hash()
{
// We have already processed this L1 message, we will skip it.
return Err(ChainOrchestratorError::DuplicateL1Message(
l1_message.transaction.queue_index,
));
}

// This should not happen in normal operation as messages should be
// deleted when a L1 reorg is handled, log warning.
tracing::warn!(
target: "scroll::chain_orchestrator",
"L1 message queue index {} already exists with different hash in DB {:?} vs {:?}",
l1_message.transaction.queue_index,
existing_message.transaction.tx_hash(),
l1_message.transaction.tx_hash()
);
}

tx.insert_l1_message(l1_message.clone()).await?;
tx.insert_l1_block_info(l1_block_info).await?;
Ok::<_, ChainOrchestratorError>(())
Expand Down
16 changes: 16 additions & 0 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,22 @@ impl DatabaseReadOperations for Database {
)
}

async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
metered!(
DatabaseOperation::GetLastBatchCommitL1Block,
self,
tx(|tx| async move { tx.get_last_batch_commit_l1_block().await })
)
}

async fn get_last_l1_message_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
metered!(
DatabaseOperation::GetLastL1MessageL1Block,
self,
tx(|tx| async move { tx.get_last_l1_message_l1_block().await })
)
}

async fn get_n_l1_messages(
&self,
start: Option<L1MessageKey>,
Expand Down
4 changes: 4 additions & 0 deletions crates/database/db/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub(crate) enum DatabaseOperation {
GetFinalizedL1BlockNumber,
GetProcessedL1BlockNumber,
GetL2HeadBlockNumber,
GetLastBatchCommitL1Block,
GetLastL1MessageL1Block,
GetNL1Messages,
GetNL2BlockDataHint,
GetL2BlockAndBatchInfoByHash,
Expand Down Expand Up @@ -128,6 +130,8 @@ impl DatabaseOperation {
Self::GetFinalizedL1BlockNumber => "get_finalized_l1_block_number",
Self::GetProcessedL1BlockNumber => "get_processed_l1_block_number",
Self::GetL2HeadBlockNumber => "get_l2_head_block_number",
Self::GetLastBatchCommitL1Block => "get_last_batch_commit_l1_block",
Self::GetLastL1MessageL1Block => "get_last_l1_message_l1_block",
Self::GetNL1Messages => "get_n_l1_messages",
Self::GetNL2BlockDataHint => "get_n_l2_block_data_hint",
Self::GetL2BlockAndBatchInfoByHash => "get_l2_block_and_batch_info_by_hash",
Expand Down
28 changes: 28 additions & 0 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,12 @@ pub trait DatabaseReadOperations {
/// Get the latest L2 head block info.
async fn get_l2_head_block_number(&self) -> Result<u64, DatabaseError>;

/// Get the L1 block number of the last batch commit in the database.
async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError>;

/// Get the L1 block number of the last L1 message in the database.
async fn get_last_l1_message_l1_block(&self) -> Result<Option<u64>, DatabaseError>;

/// Get a vector of n [`L1MessageEnvelope`]s in the database starting from the provided `start`
/// point.
async fn get_n_l1_messages(
Expand Down Expand Up @@ -1143,6 +1149,28 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
.expect("l2_head_block should always be a valid u64"))
}

async fn get_last_batch_commit_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
Ok(models::batch_commit::Entity::find()
.order_by_desc(models::batch_commit::Column::BlockNumber)
.select_only()
.column(models::batch_commit::Column::BlockNumber)
.into_tuple::<i64>()
.one(self.get_connection())
.await?
.map(|block_number| block_number as u64))
}

async fn get_last_l1_message_l1_block(&self) -> Result<Option<u64>, DatabaseError> {
Ok(models::l1_message::Entity::find()
.order_by_desc(models::l1_message::Column::L1BlockNumber)
.select_only()
.column(models::l1_message::Column::L1BlockNumber)
.into_tuple::<i64>()
.one(self.get_connection())
.await?
.map(|block_number| block_number as u64))
}

async fn get_n_l1_messages(
&self,
start: Option<L1MessageKey>,
Expand Down
8 changes: 8 additions & 0 deletions crates/node/src/add_ons/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use reth_rpc_eth_api::EthApiTypes;
use reth_scroll_node::ScrollNetworkPrimitives;
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
#[cfg(feature = "test-utils")]
use tokio::sync::mpsc::UnboundedReceiver;
#[cfg(feature = "test-utils")]
use tokio::sync::Mutex;
#[cfg(feature = "test-utils")]
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};

/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
Expand All @@ -20,6 +24,10 @@ pub struct ScrollAddOnsHandle<
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
#[cfg(feature = "test-utils")]
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
/// An optional channel used to receive commands from the `RollupNodeManager` to the
/// `L1Watcher`.
#[cfg(feature = "test-utils")]
pub l1_watcher_command_rx: Arc<Mutex<UnboundedReceiver<rollup_node_watcher::L1WatcherCommand>>>,
}

impl<
Expand Down
Loading