diff --git a/CHANGELOG.md b/CHANGELOG.md index 59a710e02..f44f65cb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Replaced blocking-in-async operations in the validator, remote prover, and ntx-builder with `spawn_blocking` to avoid starving the Tokio runtime ([#2041](https://github.com/0xMiden/node/pull/2041)). - Implement persistent RocksDB backend for `AccountStateForest`, improving startup time ([#2020](https://github.com/0xMiden/node/pull/2020)). +- Bounded the ntx-builder's startup load on the store and added a separate `store_sync_checkpoint` watermark so a partial-catch-up crash can no longer leave the builder thinking it's caught up when it isn't ([#1952](https://github.com/0xMiden/node/issues/1952)). ## v0.14.10 (2026-05-29) diff --git a/Cargo.lock b/Cargo.lock index 6dc505da8..ca7aa6bf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3059,7 +3059,7 @@ dependencies = [ [[package]] name = "miden-genesis" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "clap", @@ -3077,7 +3077,7 @@ dependencies = [ [[package]] name = "miden-large-smt-backend-rocksdb" -version = "0.14.10" +version = "0.14.11" dependencies = [ "miden-crypto", "miden-node-rocksdb-cxx-linkage-fix", @@ -3138,7 +3138,7 @@ dependencies = [ [[package]] name = "miden-network-monitor" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "axum", @@ -3166,7 +3166,7 @@ dependencies = [ [[package]] name = "miden-node" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "clap", @@ -3186,7 +3186,7 @@ dependencies = [ [[package]] name = "miden-node-block-producer" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "assert_matches", @@ -3218,7 +3218,7 @@ dependencies = [ [[package]] name = "miden-node-db" -version = "0.14.10" +version = "0.14.11" dependencies = [ "deadpool", "deadpool-diesel", @@ -3231,7 +3231,7 @@ dependencies = [ [[package]] name = "miden-node-grpc-error-macro" -version = "0.14.10" +version = "0.14.11" dependencies = [ "quote", "syn 2.0.117", @@ -3239,7 +3239,7 @@ dependencies = [ [[package]] name = "miden-node-ntx-builder" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "build-rs", @@ -3271,7 +3271,7 @@ dependencies = [ [[package]] name = "miden-node-proto" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "build-rs", @@ -3295,7 +3295,7 @@ dependencies = [ [[package]] name = "miden-node-proto-build" -version = "0.14.10" +version = "0.14.11" dependencies = [ "build-rs", "fs-err", @@ -3306,11 +3306,11 @@ dependencies = [ [[package]] name = "miden-node-rocksdb-cxx-linkage-fix" -version = "0.14.10" +version = "0.14.11" [[package]] name = "miden-node-rpc" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "futures", @@ -3342,7 +3342,7 @@ dependencies = [ [[package]] name = "miden-node-store" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "assert_matches", @@ -3386,7 +3386,7 @@ dependencies = [ [[package]] name = "miden-node-stress-test" -version = "0.14.10" +version = "0.14.11" dependencies = [ "clap", "fs-err", @@ -3414,7 +3414,7 @@ dependencies = [ [[package]] name = "miden-node-utils" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "bytes", @@ -3447,7 +3447,7 @@ dependencies = [ [[package]] name = "miden-node-validator" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "aws-config", @@ -3583,7 +3583,7 @@ dependencies = [ [[package]] name = "miden-remote-prover" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "assert_matches", @@ -3618,7 +3618,7 @@ dependencies = [ [[package]] name = "miden-remote-prover-client" -version = "0.14.10" +version = "0.14.11" dependencies = [ "build-rs", "fs-err", diff --git a/Cargo.toml b/Cargo.toml index e2416f9e8..cbad0e51b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ license = "MIT" readme = "README.md" repository = "https://github.com/0xMiden/node" rust-version = "1.93" -version = "0.14.10" +version = "0.14.11" [workspace.dependencies] # Workspace crates. diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index fa49ccccf..b1a596bc9 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -104,7 +104,7 @@ impl AccountActorContext { validator: ValidatorClient::new(url.clone()), prover: None, chain_state, - store: StoreClient::new(url), + store: StoreClient::new(url, NonZeroUsize::new(1).unwrap()), script_cache: LruCache::new(NonZeroUsize::new(1).unwrap()), max_notes_per_tx: NonZeroUsize::new(1).unwrap(), max_note_attempts: 1, diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index b9b962d8e..43203a883 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::pin::Pin; use std::sync::Arc; @@ -6,6 +7,7 @@ use futures::Stream; use miden_node_proto::domain::account::NetworkAccountId; use miden_node_proto::domain::mempool::MempoolEvent; use miden_protocol::account::delta::AccountUpdateDetails; +use miden_protocol::block::BlockNumber; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio::task::JoinSet; @@ -60,6 +62,36 @@ pub struct NetworkTransactionBuilder { /// We keep database writes centralized so this is how actors communicate /// items to write. actor_request_rx: mpsc::Receiver, + /// Bookkeeping for the startup catch-up phase. Owned by the run loop and consulted on every + /// `BlockCommitted` event to decide whether to advance the store-sync watermark. + catch_up: CatchUpState, +} + +/// State that drives the ntx-builder's startup catch-up. +pub(crate) struct CatchUpState { + /// Persisted store-sync checkpoint from the previous run, or `None` on first-ever startup + /// (or before any successful catch-up). Scopes the gap that startup catch-up needs to + /// bridge. + prev_local_block: Option, + /// Account IDs that had inflight rows at startup and therefore must be reconciled against + /// the store before normal operation: their inflight tx may have landed in a block during + /// downtime, leaving locally-committed state stale. + inflight_affected: Vec, + /// Keeps the state of the catch up process. + complete: bool, +} + +impl CatchUpState { + pub(crate) fn new( + prev_local_block: Option, + inflight_affected: Vec, + ) -> Self { + Self { + prev_local_block, + inflight_affected, + complete: false, + } + } } impl NetworkTransactionBuilder { @@ -73,6 +105,7 @@ impl NetworkTransactionBuilder { actor_context: AccountActorContext, mempool_events: MempoolEventStream, actor_request_rx: mpsc::Receiver, + catch_up: CatchUpState, ) -> Self { Self { config, @@ -83,6 +116,7 @@ impl NetworkTransactionBuilder { actor_context, mempool_events, actor_request_rx, + catch_up, } } @@ -127,17 +161,40 @@ impl NetworkTransactionBuilder { /// Runs the main event loop. async fn run_event_loop(mut self) -> anyhow::Result<()> { - // Spawn a background task to load network accounts from the store. - // Accounts are sent through a channel and processed in the main event loop. + // Reconcile inflight-affected accounts. + // + // For each account that had an inflight row at startup, do a full hydration + // (account-details + unconsumed-notes) from the store. The inflight tx may have + // landed in a block we didn't witnessed, so locally-committed state cannot be + // trusted for these specific accounts. The set is bounded by `max_concurrent_txs` + // so this is small and we run it sequentially before opening the main loop. + let inflight_set: HashSet = + self.catch_up.inflight_affected.iter().copied().collect(); + for account_id in std::mem::take(&mut self.catch_up.inflight_affected) { + if let Err(err) = self.handle_loaded_account(account_id).await { + tracing::error!( + %account_id, + error = %err, + "failed to reconcile inflight-affected account; will retry on next event" + ); + } + } + + // Setup hydration channels. let (account_tx, mut account_rx) = mpsc::channel::(self.config.account_channel_capacity); - let account_loader_store = self.store.clone(); - let mut account_loader_handle = tokio::spawn(async move { - account_loader_store - .stream_network_account_ids(account_tx) - .await - .context("failed to load network accounts from store") - }); + let (note_refresh_done_tx, mut note_refresh_done_rx) = + mpsc::channel::(self.config.account_channel_capacity); + let (catch_up_done_tx, mut catch_up_done_rx) = mpsc::channel::(1); + + let prev_local_block = self.catch_up.prev_local_block; + let mut catch_up_started = false; + let mut account_loader_handle: tokio::task::JoinHandle> = + tokio::spawn(async move { Ok(()) }); + // Take the senders out of `self` so we can move them into the kickoff closure. + let mut account_tx_holder = Some(account_tx); + let mut note_refresh_done_tx_holder = Some(note_refresh_done_tx); + let mut catch_up_done_tx_holder = Some(catch_up_done_tx); // Main event loop. loop { @@ -155,14 +212,63 @@ impl NetworkTransactionBuilder { .context("mempool event stream ended")? .context("mempool event stream failed")?; + let kickoff_block = if catch_up_started { + None + } else { + match &event { + MempoolEvent::BlockCommitted { header, .. } => { + Some(header.block_num()) + }, + _ => None, + } + }; + self.handle_mempool_event(event).await?; + + if let Some(m) = kickoff_block { + catch_up_started = true; + let account_tx = account_tx_holder.take().expect("kickoff runs once"); + let note_refresh_done_tx = + note_refresh_done_tx_holder.take().expect("kickoff runs once"); + let catch_up_done_tx = + catch_up_done_tx_holder.take().expect("kickoff runs once"); + account_loader_handle = Self::kickoff_catch_up( + self.store.clone(), + self.db.clone(), + prev_local_block, + m, + &inflight_set, + account_tx, + note_refresh_done_tx, + catch_up_done_tx, + ) + .await?; + } }, - // Handle account batches loaded from the store. - // Once all accounts are loaded, the channel closes and this branch - // becomes inactive (recv returns None and we stop matching). + // Gap discovery: a new account ID created during downtime. Full 2-call hydration. Some(account_id) = account_rx.recv() => { self.handle_loaded_account(account_id).await?; }, + // Per-known-account note refresh complete: spawn its actor. + Some(account_id) = note_refresh_done_rx.recv() => { + self.coordinator + .spawn_actor(AccountOrigin::store(account_id), &self.actor_context); + }, + // Store catch-up finished: flip the flag and bump store_sync_checkpoint to + // `block_number`. + Some(block_number) = catch_up_done_rx.recv() => { + self.catch_up.complete = true; + if let Err(err) = self.db.set_store_sync_checkpoint(block_number).await { + tracing::error!( + error = %err, + "failed to persist store_sync_checkpoint after catch-up" + ); + } + tracing::info!( + catch_up_to = %block_number, + "ntx-builder catch-up complete; sync watermark set" + ); + }, // Handle requests from actors. Some(request) = self.actor_request_rx.recv() => { self.handle_actor_request(request).await?; @@ -182,6 +288,113 @@ impl NetworkTransactionBuilder { } } + /// Spawns the store catch-up tasks (gap discovery + per-account note refresh) and + /// returns the join handle for the gap-discovery task. + /// + /// For each locally-known committed account that is NOT in `inflight_set` (those have + /// already been reconciled by the inflight-reconcile pass), spawns a task that fetches + /// the unconsumed-notes delta as of block `M` and writes it. On task completion, the account + /// ID flows through `note_refresh_done_tx` so the main loop can spawn the actor. + /// + /// A coordinator task waits for both gap discovery and all note-refresh tasks to + /// finish, then signals via `catch_up_done_tx` so the main loop can flip the + /// catch-up flag and persist the watermark. + #[expect(clippy::too_many_arguments)] + async fn kickoff_catch_up( + store: StoreClient, + db: Db, + prev_local_block: Option, + catch_up_target: BlockNumber, + inflight_set: &HashSet, + account_tx: mpsc::Sender, + note_refresh_done_tx: mpsc::Sender, + catch_up_done_tx: mpsc::Sender, + ) -> anyhow::Result>> { + let gap_from = match prev_local_block { + Some(prev) if prev < catch_up_target => { + Some(BlockNumber::from(prev.as_u32().saturating_add(1))) + }, + Some(_) => None, + None => Some(BlockNumber::GENESIS), + }; + + // Spawn the gap-discovery loader (or a no-op task if there's nothing to bridge). + let account_loader_handle: tokio::task::JoinHandle> = + if let Some(from_block) = gap_from { + let loader_store = store.clone(); + tokio::spawn(async move { + loader_store + .stream_network_account_ids(from_block, account_tx) + .await + .context("failed to load network accounts from store") + }) + } else { + drop(account_tx); + tokio::spawn(async move { Ok::<(), anyhow::Error>(()) }) + }; + + // Per-account note refresh for locally-known committed accounts (excluding inflight). + let known_accounts = db + .list_committed_account_ids() + .await + .context("failed to list committed accounts")?; + let known_to_refresh: Vec<_> = + known_accounts.into_iter().filter(|id| !inflight_set.contains(id)).collect(); + + tracing::info!( + ?gap_from, + catch_up_target = %catch_up_target, + inflight_reconciled = inflight_set.len(), + known_to_refresh = known_to_refresh.len(), + "ntx-builder store catch-up kicked off" + ); + + let mut refresh_handles: Vec> = + Vec::with_capacity(known_to_refresh.len()); + for account_id in known_to_refresh { + let task_store = store.clone(); + let task_db = db.clone(); + let done_tx = note_refresh_done_tx.clone(); + refresh_handles.push(tokio::spawn(async move { + // The hydration semaphore is acquired inside `get_unconsumed_network_notes`. + match task_store + .get_unconsumed_network_notes(account_id, catch_up_target.as_u32()) + .await + { + Ok(notes) => { + if let Err(err) = task_db.upsert_committed_notes(notes).await { + tracing::error!( + %account_id, + error = %err, + "failed to persist refreshed notes" + ); + } + }, + Err(err) => { + tracing::warn!( + %account_id, + error = %err, + "note refresh failed; spawning actor with possibly stale notes" + ); + }, + } + // Always signal completion so the actor still spawns even if refresh failed. + let _ = done_tx.send(account_id).await; + })); + } + // Drop the local sender so the channel closes once all spawned refresh tasks finish. + drop(note_refresh_done_tx); + + tokio::spawn(async move { + for handle in refresh_handles { + let _ = handle.await; + } + let _ = catch_up_done_tx.send(catch_up_target).await; + }); + + Ok(account_loader_handle) + } + /// Handles account IDs loaded from the store by syncing state to DB and spawning actors. #[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))] async fn handle_loaded_account( @@ -221,7 +434,7 @@ impl NetworkTransactionBuilder { MempoolEvent::TransactionAdded { account_delta, .. } => { // Write event effects to DB first. self.coordinator - .write_event(&event) + .write_event(&event, self.catch_up.complete) .await .context("failed to write TransactionAdded to DB")?; @@ -248,7 +461,7 @@ impl NetworkTransactionBuilder { // Write event effects to DB first. let result = self .coordinator - .write_event(&event) + .write_event(&event, self.catch_up.complete) .await .context("failed to write BlockCommitted to DB")?; @@ -263,7 +476,7 @@ impl NetworkTransactionBuilder { // Write event effects to DB first. let result = self .coordinator - .write_event(&event) + .write_event(&event, self.catch_up.complete) .await .context("failed to write TransactionsReverted to DB")?; diff --git a/crates/ntx-builder/src/clients/store.rs b/crates/ntx-builder/src/clients/store.rs index 49bebb522..60c4711d8 100644 --- a/crates/ntx-builder/src/clients/store.rs +++ b/crates/ntx-builder/src/clients/store.rs @@ -1,5 +1,7 @@ use std::collections::BTreeSet; +use std::num::NonZeroUsize; use std::ops::RangeInclusive; +use std::sync::Arc; use std::time::Duration; use miden_node_proto::clients::{Builder, StoreNtxBuilderClient}; @@ -44,11 +46,14 @@ use crate::COMPONENT; #[derive(Clone, Debug)] pub struct StoreClient { inner: StoreNtxBuilderClient, + /// Caps concurrent in-flight startup-hydration RPCs (account details + unconsumed notes) so + /// the ntx-builder doesn't hammers the store on restart. + hydration_permits: Arc, } impl StoreClient { /// Creates a new store client with a lazy connection. - pub fn new(store_url: Url) -> Self { + pub fn new(store_url: Url, hydration_concurrency: NonZeroUsize) -> Self { info!(target: COMPONENT, store_endpoint = %store_url, "Initializing store client"); let store = Builder::new(store_url) @@ -59,7 +64,10 @@ impl StoreClient { .with_otel_context_injection() .connect_lazy::(); - Self { inner: store } + Self { + inner: store, + hydration_permits: Arc::new(tokio::sync::Semaphore::new(hydration_concurrency.get())), + } } /// Returns the block header and MMR peaks at the current chain tip. @@ -130,6 +138,13 @@ impl StoreClient { &self, account_id: NetworkAccountId, ) -> Result, StoreError> { + let _permit = self + .hydration_permits + .clone() + .acquire_owned() + .await + .expect("hydration semaphore is never closed"); + let request = proto::account::AccountId::from(account_id.inner()); let store_response = self @@ -204,6 +219,15 @@ impl StoreClient { // Upper bound of each note is ~10KB. Limit page size to ~10MB. const PAGE_SIZE: u64 = 1024; + // Hold one hydration permit for the entire call: an account's note refresh counts as one + // in-flight slot regardless of how many pages it spans. + let _permit = self + .hydration_permits + .clone() + .acquire_owned() + .await + .expect("hydration semaphore is never closed"); + let mut all_notes = Vec::new(); let mut page_token: Option = None; @@ -241,9 +265,10 @@ impl StoreClient { /// without waiting for all accounts to be preloaded. pub async fn stream_network_account_ids( &self, + from_block: BlockNumber, sender: tokio::sync::mpsc::Sender, ) -> Result<(), StoreError> { - let mut block_range = BlockNumber::GENESIS..=BlockNumber::MAX; + let mut block_range = from_block..=BlockNumber::MAX; while let Some(next_start) = self.load_accounts_page(block_range, &sender).await? { block_range = next_start..=BlockNumber::MAX; diff --git a/crates/ntx-builder/src/coordinator.rs b/crates/ntx-builder/src/coordinator.rs index 87aa5edcd..0e60f346a 100644 --- a/crates/ntx-builder/src/coordinator.rs +++ b/crates/ntx-builder/src/coordinator.rs @@ -289,9 +289,14 @@ impl Coordinator { /// /// This must be called BEFORE sending notifications to actors. Returns a [`WriteEventResult`] /// with the accounts to notify and cancel. + /// + /// `advance_store_sync_checkpoint` is forwarded to `handle_block_committed` so the builder + /// can defer advancing the store-sync watermark while startup catch-up is still running. It + /// has no effect for non-`BlockCommitted` events. pub async fn write_event( &self, event: &MempoolEvent, + advance_store_sync_checkpoint: bool, ) -> Result { match event { MempoolEvent::TransactionAdded { @@ -317,6 +322,7 @@ impl Coordinator { txs.clone(), header.block_num(), header.as_ref().clone(), + advance_store_sync_checkpoint, ) .await?; Ok(WriteEventResult { accounts_to_notify: affected_accounts }) diff --git a/crates/ntx-builder/src/db/migrations/2026050800000_store_sync_checkpoint/down.sql b/crates/ntx-builder/src/db/migrations/2026050800000_store_sync_checkpoint/down.sql new file mode 100644 index 000000000..e69de29bb diff --git a/crates/ntx-builder/src/db/migrations/2026050800000_store_sync_checkpoint/up.sql b/crates/ntx-builder/src/db/migrations/2026050800000_store_sync_checkpoint/up.sql new file mode 100644 index 000000000..76aa4f4e1 --- /dev/null +++ b/crates/ntx-builder/src/db/migrations/2026050800000_store_sync_checkpoint/up.sql @@ -0,0 +1,7 @@ +-- Store-sync checkpoint: the highest block whose state has been fully ingested into the local +-- DB by the startup catch-up sync. +-- +-- NULL means "never sync'd": treated as a full GENESIS-onward sync on first boot after upgrade. +ALTER TABLE chain_state ADD COLUMN store_sync_checkpoint INTEGER + CHECK (store_sync_checkpoint IS NULL + OR store_sync_checkpoint BETWEEN 0 AND 0xFFFFFFFF); diff --git a/crates/ntx-builder/src/db/mod.rs b/crates/ntx-builder/src/db/mod.rs index 28f1fef93..f052a2af5 100644 --- a/crates/ntx-builder/src/db/mod.rs +++ b/crates/ntx-builder/src/db/mod.rs @@ -141,15 +141,28 @@ impl Db { /// Handles a `BlockCommitted` mempool event by committing transaction effects. /// /// Returns the list of affected account IDs that should be notified. + /// + /// `advance_store_sync_checkpoint` controls whether the store-sync checkpoint is advanced + /// alongside the chain tip. During startup catch-up the caller passes `false` so a partial + /// crash can't leave the watermark inflated past what was actually sync'd from the store. In + /// steady-state operation the caller passes `true`: the mempool stream is delivering all the + /// data we'd otherwise need to fetch from the store, so the checkpoint can follow the chain + /// tip. pub async fn handle_block_committed( &self, txs: Vec, block_num: BlockNumber, header: BlockHeader, + advance_store_sync_checkpoint: bool, ) -> Result> { self.inner .transact("handle_block_committed", move |conn| { - queries::commit_block(conn, &txs, block_num, &header) + let affected = queries::commit_block_effects(conn, &txs)?; + queries::upsert_chain_state(conn, block_num, &header)?; + if advance_store_sync_checkpoint { + queries::set_store_sync_checkpoint(conn, block_num)?; + } + Ok(affected) }) .await } @@ -173,6 +186,51 @@ impl Db { self.inner.transact("purge_inflight", queries::purge_inflight).await } + /// Returns the persisted store-sync checkpoint, or `None` on first-ever startup / before any + /// successful catch-up has run. + pub async fn read_store_sync_checkpoint(&self) -> Result> { + self.inner + .query("read_store_sync_checkpoint", queries::read_store_sync_checkpoint) + .await + } + + /// Monotonically advances the store-sync checkpoint to `block_num`. + pub async fn set_store_sync_checkpoint(&self, block_num: BlockNumber) -> Result<()> { + self.inner + .transact("set_store_sync_checkpoint", move |conn| { + queries::set_store_sync_checkpoint(conn, block_num) + }) + .await + } + + /// Returns all account IDs with a committed row in the local DB. + pub async fn list_committed_account_ids(&self) -> Result> { + self.inner + .query("list_committed_account_ids", queries::list_committed_account_ids) + .await + } + + /// Returns the distinct account IDs that have at least one inflight row at the time of the + /// call. Intended for use *before* `purge_inflight` so the caller can reconcile those + /// accounts' committed state from the store. + pub async fn list_inflight_account_ids(&self) -> Result> { + self.inner + .query("list_inflight_account_ids", queries::list_inflight_account_ids) + .await + } + + /// Replaces committed note rows for the given set of notes. Account state is left untouched. + /// + /// Used by the per-account note refresh that runs at startup to catch up on notes that + /// landed during the builder's downtime. + pub async fn upsert_committed_notes(&self, notes: Vec) -> Result<()> { + self.inner + .transact("upsert_committed_notes", move |conn| { + queries::insert_committed_notes(conn, ¬es) + }) + .await + } + /// Inserts or replaces the singleton chain state row. pub async fn upsert_chain_state( &self, @@ -247,3 +305,85 @@ impl Db { (db, dir) } } + +#[cfg(test)] +mod tests { + use miden_protocol::block::BlockNumber; + + use super::*; + use crate::test_utils::mock_block_header; + + /// `handle_block_committed` advances the chain tip but NOT the `store_sync_checkpoint` when + /// `advance_store_sync_checkpoint=false`. + #[tokio::test] + async fn handle_block_committed_does_not_advance_checkpoint_when_gated() { + let (db, _dir) = Db::test_setup().await; + + // Seed chain_state at block 0 so the row exists. + let zero = BlockNumber::from(0u32); + db.upsert_chain_state(zero, mock_block_header(zero)).await.unwrap(); + + let block_num = BlockNumber::from(5u32); + let header = mock_block_header(block_num); + db.handle_block_committed(vec![], block_num, header, false).await.unwrap(); + + let checkpoint = db.read_store_sync_checkpoint().await.unwrap(); + assert!( + checkpoint.is_none(), + "checkpoint must remain unset while catch-up is gated; got {checkpoint:?}", + ); + } + + /// `handle_block_committed` advances both the chain tip AND the `store_sync_checkpoint` when + /// `advance_store_sync_checkpoint=true`. This is the steady-state path: the mempool stream + /// is delivering all the data we'd otherwise need from the store, so the watermark can + /// follow the chain tip. + #[tokio::test] + async fn handle_block_committed_advances_checkpoint_when_caught_up() { + let (db, _dir) = Db::test_setup().await; + + let zero = BlockNumber::from(0u32); + db.upsert_chain_state(zero, mock_block_header(zero)).await.unwrap(); + + let block_num = BlockNumber::from(5u32); + let header = mock_block_header(block_num); + db.handle_block_committed(vec![], block_num, header, true).await.unwrap(); + + let checkpoint = db.read_store_sync_checkpoint().await.unwrap(); + assert_eq!(checkpoint, Some(block_num)); + } + + /// Going from gated → caught-up → gated again, the checkpoint should advance only when the + /// flag is true and never regress when it's false. + #[tokio::test] + async fn handle_block_committed_checkpoint_is_monotone_across_modes() { + let (db, _dir) = Db::test_setup().await; + + let zero = BlockNumber::from(0u32); + db.upsert_chain_state(zero, mock_block_header(zero)).await.unwrap(); + + // Gated: chain tip moves to 5, checkpoint stays unset. + db.handle_block_committed(vec![], BlockNumber::from(5u32), mock_block_header(zero), false) + .await + .unwrap(); + assert_eq!(db.read_store_sync_checkpoint().await.unwrap(), None); + + // Caught up: checkpoint advances to 7. + db.handle_block_committed(vec![], BlockNumber::from(7u32), mock_block_header(zero), true) + .await + .unwrap(); + assert_eq!(db.read_store_sync_checkpoint().await.unwrap(), Some(BlockNumber::from(7u32))); + + // Gated again (shouldn't normally happen, but the guard must hold): checkpoint stays at 7. + db.handle_block_committed(vec![], BlockNumber::from(9u32), mock_block_header(zero), false) + .await + .unwrap(); + assert_eq!(db.read_store_sync_checkpoint().await.unwrap(), Some(BlockNumber::from(7u32))); + + // Caught up at higher block — advances to 10. + db.handle_block_committed(vec![], BlockNumber::from(10u32), mock_block_header(zero), true) + .await + .unwrap(); + assert_eq!(db.read_store_sync_checkpoint().await.unwrap(), Some(BlockNumber::from(10u32))); + } +} diff --git a/crates/ntx-builder/src/db/models/queries/accounts.rs b/crates/ntx-builder/src/db/models/queries/accounts.rs index 7d52c6554..a99eb45c7 100644 --- a/crates/ntx-builder/src/db/models/queries/accounts.rs +++ b/crates/ntx-builder/src/db/models/queries/accounts.rs @@ -103,6 +103,46 @@ pub fn get_account( .transpose() } +/// Returns the IDs of all accounts that have a committed row (`transaction_id IS NULL`). +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT account_id FROM accounts WHERE transaction_id IS NULL +/// ``` +pub fn list_committed_account_ids( + conn: &mut SqliteConnection, +) -> Result, DatabaseError> { + let rows: Vec> = schema::accounts::table + .filter(schema::accounts::transaction_id.is_null()) + .select(schema::accounts::account_id) + .load(conn)?; + rows.iter() + .map(|bytes| conversions::network_account_id_from_bytes(bytes)) + .collect() +} + +/// Returns the distinct IDs of all accounts that have at least one inflight row at the time of +/// the call. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT DISTINCT account_id FROM accounts WHERE transaction_id IS NOT NULL +/// ``` +pub fn list_inflight_account_ids( + conn: &mut SqliteConnection, +) -> Result, DatabaseError> { + let rows: Vec> = schema::accounts::table + .filter(schema::accounts::transaction_id.is_not_null()) + .select(schema::accounts::account_id) + .distinct() + .load(conn)?; + rows.iter() + .map(|bytes| conversions::network_account_id_from_bytes(bytes)) + .collect() +} + /// Returns `true` when an inflight account row exists with the given `transaction_id`. /// /// # Raw SQL diff --git a/crates/ntx-builder/src/db/models/queries/chain_state.rs b/crates/ntx-builder/src/db/models/queries/chain_state.rs index 9b529cadc..23e2544e5 100644 --- a/crates/ntx-builder/src/db/models/queries/chain_state.rs +++ b/crates/ntx-builder/src/db/models/queries/chain_state.rs @@ -7,40 +7,91 @@ use miden_protocol::block::{BlockHeader, BlockNumber}; use crate::db::models::conv as conversions; use crate::db::schema; -// MODELS -// ================================================================================================ - -#[derive(Debug, Clone, Insertable)] -#[diesel(table_name = schema::chain_state)] -#[diesel(check_for_backend(diesel::sqlite::Sqlite))] -pub struct ChainStateInsert { - /// Singleton row ID. Always `0` to satisfy the `CHECK (id = 0)` constraint. - pub id: i32, - pub block_num: i64, - pub block_header: Vec, -} - // QUERIES // ================================================================================================ /// Inserts or replaces the singleton chain state row. /// +/// Preserves the existing `store_sync_checkpoint` if a row already exists, so the mempool-driven +/// chain tip update doesn't clobber the sync watermark. +/// /// # Raw SQL /// /// ```sql -/// INSERT OR REPLACE INTO chain_state (id, block_num, block_header) +/// INSERT INTO chain_state (id, block_num, block_header) /// VALUES (0, ?1, ?2) +/// ON CONFLICT(id) DO UPDATE SET +/// block_num = excluded.block_num, +/// block_header = excluded.block_header /// ``` pub fn upsert_chain_state( conn: &mut SqliteConnection, block_num: BlockNumber, block_header: &BlockHeader, ) -> Result<(), DatabaseError> { - let row = ChainStateInsert { - id: 0, - block_num: conversions::block_num_to_i64(block_num), - block_header: conversions::block_header_to_bytes(block_header), - }; - diesel::replace_into(schema::chain_state::table).values(&row).execute(conn)?; + use diesel::sql_types::{BigInt, Binary}; + + let block_num_val = conversions::block_num_to_i64(block_num); + let header_bytes = conversions::block_header_to_bytes(block_header); + + diesel::sql_query( + "INSERT INTO chain_state (id, block_num, block_header) \ + VALUES (0, ?1, ?2) \ + ON CONFLICT(id) DO UPDATE SET \ + block_num = excluded.block_num, \ + block_header = excluded.block_header", + ) + .bind::(block_num_val) + .bind::(&header_bytes) + .execute(conn)?; + Ok(()) +} + +/// Returns the persisted store-sync checkpoint, or `None` if either: +/// - the row hasn't been initialized (first-ever startup), or +/// - the row exists but has never run a successful catch-up. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT store_sync_checkpoint FROM chain_state WHERE id = 0 +/// ``` +pub fn read_store_sync_checkpoint( + conn: &mut SqliteConnection, +) -> Result, DatabaseError> { + let row: Option> = schema::chain_state::table + .filter(schema::chain_state::id.eq(0)) + .select(schema::chain_state::store_sync_checkpoint) + .first(conn) + .optional()?; + Ok(row.flatten().map(conversions::block_num_from_i64)) +} + +/// Monotonically advances `store_sync_checkpoint` to the given block number. +/// +/// Returns `Ok(())` regardless of whether the row was updated. +/// +/// # Raw SQL +/// +/// ```sql +/// UPDATE chain_state +/// SET store_sync_checkpoint = ?1 +/// WHERE id = 0 +/// AND (store_sync_checkpoint IS NULL OR store_sync_checkpoint < ?1) +/// ``` +pub fn set_store_sync_checkpoint( + conn: &mut SqliteConnection, + block_num: BlockNumber, +) -> Result<(), DatabaseError> { + let block_num_val = conversions::block_num_to_i64(block_num); + diesel::update( + schema::chain_state::table.filter(schema::chain_state::id.eq(0)).filter( + schema::chain_state::store_sync_checkpoint + .is_null() + .or(schema::chain_state::store_sync_checkpoint.lt(block_num_val)), + ), + ) + .set(schema::chain_state::store_sync_checkpoint.eq(Some(block_num_val))) + .execute(conn)?; Ok(()) } diff --git a/crates/ntx-builder/src/db/models/queries/mod.rs b/crates/ntx-builder/src/db/models/queries/mod.rs index 3d71f3a94..7f2389beb 100644 --- a/crates/ntx-builder/src/db/models/queries/mod.rs +++ b/crates/ntx-builder/src/db/models/queries/mod.rs @@ -6,7 +6,6 @@ use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::Nullifier; use miden_protocol::transaction::TransactionId; use miden_protocol::utils::serde::Serializable; @@ -177,7 +176,13 @@ pub fn add_transaction( Ok(()) } -/// Handles a `BlockCommitted` event by committing transaction effects. +/// Handles the per-transaction effects of a `BlockCommitted` event without writing to the +/// `chain_state` row. +/// +/// The chain-tip and store-sync-checkpoint writes are the caller's responsibility (see +/// [`upsert_chain_state`] and [`set_store_sync_checkpoint`]). Splitting them out lets the builder +/// gate the store-sync-checkpoint advance during startup catch-up so a partial-catch-up crash +/// can't leave the watermark inflated. /// /// # Raw SQL /// @@ -200,13 +205,9 @@ pub fn add_transaction( /// -- Promote inflight-created notes to committed /// UPDATE notes SET created_by = NULL WHERE created_by = ?1 /// ``` -/// -/// Finally updates chain state (see [`upsert_chain_state`]). -pub fn commit_block( +pub fn commit_block_effects( conn: &mut SqliteConnection, tx_ids: &[TransactionId], - block_num: BlockNumber, - block_header: &BlockHeader, ) -> Result, DatabaseError> { let mut affected_accounts = HashSet::new(); @@ -261,12 +262,22 @@ pub fn commit_block( .execute(conn)?; } - // Update chain state. - upsert_chain_state(conn, block_num, block_header)?; - Ok(affected_accounts.into_iter().collect()) } +/// Convenience wrapper that runs [`commit_block_effects`] and then updates the chain state. +#[cfg(test)] +pub fn commit_block( + conn: &mut SqliteConnection, + tx_ids: &[TransactionId], + block_num: miden_protocol::block::BlockNumber, + block_header: &miden_protocol::block::BlockHeader, +) -> Result, DatabaseError> { + let affected = commit_block_effects(conn, tx_ids)?; + upsert_chain_state(conn, block_num, block_header)?; + Ok(affected) +} + /// Handles a `TransactionsReverted` event by undoing transaction effects. /// /// Returns all affected account IDs (for notification). Accounts whose creation was fully diff --git a/crates/ntx-builder/src/db/models/queries/tests.rs b/crates/ntx-builder/src/db/models/queries/tests.rs index 48ec57341..f21ade93a 100644 --- a/crates/ntx-builder/src/db/models/queries/tests.rs +++ b/crates/ntx-builder/src/db/models/queries/tests.rs @@ -251,6 +251,76 @@ fn block_committed_promotes_inflight_account_to_committed() { assert_eq!(count_inflight_accounts(conn), 0); } +// LIST COMMITTED / INFLIGHT ACCOUNT IDS +// ================================================================================================ + +#[test] +fn list_committed_account_ids_returns_only_committed_accounts() { + let (conn, _dir) = &mut test_conn(); + + let committed_a = mock_network_account_id(); + let committed_b = mock_network_account_id_seeded(1); + let inflight_only = mock_network_account_id_seeded(2); + + upsert_committed_account(conn, committed_a, &mock_account(committed_a)).unwrap(); + upsert_committed_account(conn, committed_b, &mock_account(committed_b)).unwrap(); + + // Insert an inflight-only account (no committed row). + let tx_id = mock_tx_id(1); + let row = AccountInsert { + account_id: conversions::network_account_id_to_bytes(inflight_only), + transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), + account_data: conversions::account_to_bytes(&mock_account(inflight_only)), + }; + diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); + + let listed: std::collections::HashSet<_> = + list_committed_account_ids(conn).unwrap().into_iter().collect(); + let expected: std::collections::HashSet<_> = [committed_a, committed_b].into_iter().collect(); + assert_eq!(listed, expected, "should return only committed account IDs"); +} + +#[test] +fn list_inflight_account_ids_returns_distinct_inflight_accounts() { + let (conn, _dir) = &mut test_conn(); + + let account_a = mock_network_account_id(); + let account_b = mock_network_account_id_seeded(1); + let committed_only = mock_network_account_id_seeded(2); + + // Account A has two inflight rows (different tx_ids) — should be returned once. + let tx_id_1 = mock_tx_id(1); + let tx_id_2 = mock_tx_id(2); + for tx_id in [&tx_id_1, &tx_id_2] { + let row = AccountInsert { + account_id: conversions::network_account_id_to_bytes(account_a), + transaction_id: Some(conversions::transaction_id_to_bytes(tx_id)), + account_data: conversions::account_to_bytes(&mock_account(account_a)), + }; + diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); + } + + // Account B has one inflight row. + let tx_id_3 = mock_tx_id(3); + let row_b = AccountInsert { + account_id: conversions::network_account_id_to_bytes(account_b), + transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id_3)), + account_data: conversions::account_to_bytes(&mock_account(account_b)), + }; + diesel::insert_into(schema::accounts::table) + .values(&row_b) + .execute(conn) + .unwrap(); + + // committed_only has only a committed row. + upsert_committed_account(conn, committed_only, &mock_account(committed_only)).unwrap(); + + let listed: std::collections::HashSet<_> = + list_inflight_account_ids(conn).unwrap().into_iter().collect(); + let expected: std::collections::HashSet<_> = [account_a, account_b].into_iter().collect(); + assert_eq!(listed, expected, "should return distinct inflight account IDs"); +} + // HANDLE TRANSACTIONS REVERTED TESTS // ================================================================================================ @@ -493,6 +563,92 @@ fn get_note_error_returns_none_for_unknown_note() { // CHAIN STATE TESTS // ================================================================================================ +#[test] +fn read_store_sync_checkpoint_returns_none_on_empty_db() { + let (conn, _dir) = &mut test_conn(); + + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert!(checkpoint.is_none(), "fresh DB should report no checkpoint"); +} + +#[test] +fn read_store_sync_checkpoint_returns_none_when_chain_state_exists_but_checkpoint_unset() { + let (conn, _dir) = &mut test_conn(); + + let block_num = BlockNumber::from(7u32); + let header = mock_block_header(block_num); + upsert_chain_state(conn, block_num, &header).unwrap(); + + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert!( + checkpoint.is_none(), + "checkpoint should be NULL after only chain_state has been written" + ); +} + +#[test] +fn set_store_sync_checkpoint_persists_value() { + let (conn, _dir) = &mut test_conn(); + + // Seed chain_state so the row exists. + let block_num = BlockNumber::from(0u32); + let header = mock_block_header(block_num); + upsert_chain_state(conn, block_num, &header).unwrap(); + + let target = BlockNumber::from(42u32); + set_store_sync_checkpoint(conn, target).unwrap(); + + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert_eq!(checkpoint, Some(target)); +} + +#[test] +fn set_store_sync_checkpoint_does_not_regress() { + let (conn, _dir) = &mut test_conn(); + + let block_num = BlockNumber::from(0u32); + let header = mock_block_header(block_num); + upsert_chain_state(conn, block_num, &header).unwrap(); + + set_store_sync_checkpoint(conn, BlockNumber::from(100u32)).unwrap(); + + // Attempt to regress to a lower block — should be a no-op. + set_store_sync_checkpoint(conn, BlockNumber::from(50u32)).unwrap(); + + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert_eq!(checkpoint, Some(BlockNumber::from(100u32)), "checkpoint must not regress"); + + // Equal value — also a no-op. + set_store_sync_checkpoint(conn, BlockNumber::from(100u32)).unwrap(); + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert_eq!(checkpoint, Some(BlockNumber::from(100u32))); + + // Higher value — advances. + set_store_sync_checkpoint(conn, BlockNumber::from(150u32)).unwrap(); + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert_eq!(checkpoint, Some(BlockNumber::from(150u32))); +} + +#[test] +fn upsert_chain_state_preserves_store_sync_checkpoint() { + let (conn, _dir) = &mut test_conn(); + + let block_num_1 = BlockNumber::from(10u32); + upsert_chain_state(conn, block_num_1, &mock_block_header(block_num_1)).unwrap(); + set_store_sync_checkpoint(conn, BlockNumber::from(5u32)).unwrap(); + + // Mempool advances chain tip; checkpoint must NOT be clobbered. + let block_num_2 = BlockNumber::from(20u32); + upsert_chain_state(conn, block_num_2, &mock_block_header(block_num_2)).unwrap(); + + let checkpoint = read_store_sync_checkpoint(conn).unwrap(); + assert_eq!( + checkpoint, + Some(BlockNumber::from(5u32)), + "upsert_chain_state must preserve store_sync_checkpoint" + ); +} + #[test] fn upsert_chain_state_updates_singleton() { let (conn, _dir) = &mut test_conn(); diff --git a/crates/ntx-builder/src/db/schema.rs b/crates/ntx-builder/src/db/schema.rs index c52ca5f53..712f7c098 100644 --- a/crates/ntx-builder/src/db/schema.rs +++ b/crates/ntx-builder/src/db/schema.rs @@ -14,6 +14,7 @@ diesel::table! { id -> Integer, block_num -> BigInt, block_header -> Binary, + store_sync_checkpoint -> Nullable, } } diff --git a/crates/ntx-builder/src/lib.rs b/crates/ntx-builder/src/lib.rs index 4eb8c6386..f08e90dba 100644 --- a/crates/ntx-builder/src/lib.rs +++ b/crates/ntx-builder/src/lib.rs @@ -5,7 +5,7 @@ use std::time::Duration; use actor::AccountActorContext; use anyhow::Context; -use builder::MempoolEventStream; +use builder::{CatchUpState, MempoolEventStream}; use chain_state::SharedChainState; use clients::{BlockProducerClient, StoreClient, ValidatorClient}; use coordinator::Coordinator; @@ -54,6 +54,13 @@ const DEFAULT_MAX_BLOCK_COUNT: usize = 4; /// Default channel capacity for account loading from the store. const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000; +/// Default cap on concurrent in-flight startup-hydration RPCs to the store. +/// +/// Bounds the burst of `GetNetworkAccountDetailsById` and `GetUnconsumedNetworkNotes` calls the +/// ntx-builder fires when catching up after a restart. +const DEFAULT_STORE_HYDRATION_CONCURRENCY: NonZeroUsize = + NonZeroUsize::new(4).expect("literal is non-zero"); + /// Default maximum number of attempts to execute a failing note before dropping it. const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30; @@ -114,6 +121,12 @@ pub struct NtxBuilderConfig { /// Channel capacity for loading accounts from the store during startup. pub account_channel_capacity: usize, + /// Maximum number of concurrent in-flight startup-hydration RPCs to the store. + /// + /// Caps the burst of account-details and unconsumed-notes calls the ntx-builder fires when + /// catching up missing state after a restart. Does not affect steady-state actor RPCs. + pub store_hydration_concurrency: NonZeroUsize, + /// Duration after which an idle network account will deactivate. /// /// An account is considered idle once it has no viable notes to consume. @@ -153,6 +166,7 @@ impl NtxBuilderConfig { max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS, max_block_count: DEFAULT_MAX_BLOCK_COUNT, account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY, + store_hydration_concurrency: DEFAULT_STORE_HYDRATION_CONCURRENCY, idle_timeout: DEFAULT_IDLE_TIMEOUT, max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES, max_cycles: DEFAULT_MAX_TX_CYCLES, @@ -221,6 +235,13 @@ impl NtxBuilderConfig { self } + /// Sets the cap on concurrent in-flight startup-hydration RPCs to the store. + #[must_use] + pub fn with_store_hydration_concurrency(mut self, concurrency: NonZeroUsize) -> Self { + self.store_hydration_concurrency = concurrency; + self + } + /// Sets the idle timeout for actors. /// /// Actors that remain idle (no viable notes) for this duration will be deactivated. @@ -259,6 +280,19 @@ impl NtxBuilderConfig { // Set up the database (bootstrap + connection pool). let db = Db::setup(self.database_filepath.clone()).await?; + // Snapshot the previous store-sync watermark and any inflight-affected accounts BEFORE + // we purge inflight state. Those accounts need to be reconciled against the store at + // startup because their inflight tx may have landed in a block during downtime, and the + // mempool replay only carries currently-inflight (uncommitted) txs. + let prev_local_block = db + .read_store_sync_checkpoint() + .await + .context("failed to read store sync checkpoint")?; + let inflight_affected = db + .list_inflight_account_ids() + .await + .context("failed to list inflight-affected accounts")?; + // Purge inflight state from previous run. db.purge_inflight().await.context("failed to purge inflight state")?; @@ -266,7 +300,7 @@ impl NtxBuilderConfig { let coordinator = Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, db.clone()); - let store = StoreClient::new(self.store_url.clone()); + let store = StoreClient::new(self.store_url.clone(), self.store_hydration_concurrency); let block_producer = BlockProducerClient::new(self.block_producer_url.clone()); let validator = ValidatorClient::new(self.validator_url.clone()); let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new); @@ -285,7 +319,8 @@ impl NtxBuilderConfig { .await? .context("store should contain a latest block")?; - // Store the chain tip in the DB. + // Store the chain tip in the DB. This is the mempool-driven tip, which is distinct from + // `store_sync_checkpoint`, which only advances after a successful catch-up. db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone()) .await .context("failed to upsert chain state")?; @@ -318,6 +353,7 @@ impl NtxBuilderConfig { actor_context, mempool_events, actor_request_rx, + CatchUpState::new(prev_local_block, inflight_affected), )) } }