diff --git a/ANY_ORDER_COMMIT_DESIGN.md b/ANY_ORDER_COMMIT_DESIGN.md new file mode 100644 index 00000000000..8cf5d166853 --- /dev/null +++ b/ANY_ORDER_COMMIT_DESIGN.md @@ -0,0 +1,310 @@ +# Any-order commit — design & assumptions (checkpoint-sync writer) + +**Status:** design + prototyping notes. Authored autonomously from benchmark data +(see `CHECKPOINT_SYNC_FINDINGS.md` and the `wal-bench` runs). Assumptions are +documented inline; where a question would normally be asked, the assumption is +stated and the lowest-risk option chosen. + +## 1. Problem (measured) + +After cycles 1–7, the checkpoint-sync bottleneck is the **single finalized-writer +thread**, which is 100% busy (`write.wait`≈0, ~1140+ blocks queued behind it). +Full per-block attribution in the heavy spam region (height ~1.730M, `write.busy` +≈ 106 ms/block): + +| phase | ms | depends on previous blocks? | +|---|---|---| +| `checkpoint_compute` — note-commitment tree update | ~39 | **YES (chained)** — block N+1's tree starts from N's | +| `header_tx` — serialize block + raw tx bytes into the batch | ~31 | **No** — block-only | +| `value_pools` → block-size re-serialize (`zcash_serialized_size`) | ~29 | **No** — block-only | +| `value_pools` → value-pool running total | small | YES (chained) | +| `db.write` (rocksdb commit) | ~4 | ordered (prefix consistency) | +| reads / shielded / trees / transparent | ~6 | mixed | + +The writer is **CPU-bound on serialization + tree hashing**, not disk-bound +(blkio-wait = 0). ~60 ms/block is **per-block-independent serialization** that is +currently stuck on the serial writer; ~39 ms is the **chained tree update** that +cannot be reordered. + +## 2. What any-order commit is + +Split the writer's per-block work by dependency: + +- **Ordered stage (must stay in height order):** the *chained* state — the + note-commitment tree update (→ treestate), the value-pool running total, the + history tree, and the canonical tip (`chain_tip_sender`). +- **Any-order stage (parallel across blocks):** the *independent* work — serialize + the block/transactions, compute the block size, build the RocksDB batch entries + that depend only on the block (and its now-known treestate), and `db.write`. + +Blocks are *prepared/serialized* in any order, in parallel; the *canonical tip* +and *chained state* advance strictly in order; `db.write` is applied in height +order so on-disk state is always a valid prefix. + +## 3. Why it beats the in-writer parallelization (B1/B2) + +- **B1/B2** parallelize the independent serialization *within one block's commit*, + but the writer is still **serial across blocks**: per-block ≈ tree(39) + + parallel-serialize(~12) + write(4) ≈ **55 ms** → ~18 blk/s (heavy). +- **Any-order** overlaps the independent batch build (~60 ms) of blocks N+1, N+2… + with the **chained tree update (39 ms)** of block N, on *separate threads*. The + writer's critical path collapses to the **tree chain (~39 ms) + ordered write** + → ~25 blk/s (heavy), i.e. ~**2.5–2.7×** over the pre-B1/B2 baseline, **if the + batch build keeps up** (it does when parallelized across blocks). +- Any-order also **sidesteps the rayon-contention trap** that limited B1/B2 and + cycle-5: the independent work runs on its own worker threads, not on the global + rayon pool that the download/verification pipeline saturates. + +**Floor:** the tree-update chain (~39 ms heavy, ~14 ms typical) is serial by +construction and bounds any-order commit. Going below it needs a faster tree +update (a separate problem, out of scope here). + +## 4. Minimal-scope prototype + +A bounded 2-stage pipeline inside the finalized-writer task: + +``` +verified blocks → [Stage 1: ordered, single thread] → bounded chan(cap ~2–4) → + chained state: tree update, value-pool + total, history tree, treestate; + assert parent-is-tip; advance tip + [Stage 2: any-order workers] + batch build (serialize tx, + block size, batch entries) + + db.write IN HEIGHT ORDER +``` + +- **Stage 1** keeps the existing strict-order checks and produces a + `FinalizedBlock` carrying its treestate (already threaded today via + `prev_note_commitment_trees`; the **history tree must also be threaded in + memory** — see Assumptions). +- **Stage 2** does the heavy independent work. Batch *construction* may proceed + out of order across a small window; `db.write` is serialized in height order + (a per-height sequencer) so the on-disk tip is always a contiguous prefix. + +**Minimal first cut (lowest risk):** Stage 2 is a *single* extra thread (so writes +are trivially ordered), giving pipeline throughput = `max(Stage1, Stage2)` instead +of their sum. Even single-threaded Stage 2 (~64 ms) overlapping Stage 1 (~39 ms) +takes the per-block wall from ~106 → ~64 ms (~1.65×). A later iteration widens +Stage 2 to a few workers (batch build any-order, ordered write) to push toward the +~39 ms tree-chain floor. + +## 5. Correctness invariants (consensus-critical) + +1. **Tip advances in strict height order**, only after the block's `db.write` + succeeds. Readers never see height N+1 committed before N. +2. **`db.write` applied in height order** → on-disk state is always a valid prefix; + a crash leaves a contiguous chain. +3. **Chained state threaded in memory, never re-read mid-pipeline:** the treestate + (`note_commitment_trees`) and the **history tree** must be passed forward from + Stage 1 to Stage 1 of the next block. Re-reading `db.history_tree()` / + `note_commitment_trees_for_tip()` under the pipeline would return a **stale** + tip (Stage 2 hasn't written yet) and silently diverge every later root. This is + the single highest-risk item. +4. **Parent-is-tip assertions** stay in the stage that owns the in-memory next + height (Stage 1), driven by an in-memory counter seeded from the DB tip (the DB + tip lags under the pipeline). +5. **Error/reset path:** on a compute or write error, drain/stop the pipeline, + reset the in-memory next-height counter **and** the threaded history tree + + treestate from the real DB tip, exactly as the current writer reset does. + `rsp_tx` travels with the block so the response is sent after the commit attempt. +6. **Shutdown:** input close → Stage 1 drains → drops sender → Stage 2 drains, + writes, exits, `db.shutdown(true)`. +7. **Byte-identical state:** serialization moved to Stage 2 is unchanged + (`RawBytes` is verbatim; block size formula matches `zcash_serialized_size`). + +## 6. Assumptions (where I did not have clarity) + +- **A1.** Archive mode (`store_raw_transactions = true`) is the benchmark config; + the `header_tx` 31 ms is real there. In pruned mode it's smaller but the design + is unchanged. +- **A2.** The value-pool running total and history tree updates are cheap relative + to the tree update, so keeping them in the ordered Stage 1 does not make Stage 1 + the dominant cost (Stage 1 ≈ tree update). Validated by attribution + (value-pool-change is ~ms; the 29 ms was the block-size re-serialize, which moves + to Stage 2). +- **A3.** Out-of-order *batch construction* with in-order *db.write* is acceptable + for crash consistency, because a batch is atomic and we never write N+1 before N. +- **A4.** A small pipeline depth (2–4) is enough to hide Stage 2 behind Stage 1; + deeper buffering only adds memory. (The download buffer is already ~1500 deep, so + Stage 1 is never input-starved.) +- **A5.** Keeping Stage 2 single-threaded in the first cut is acceptable as a + low-risk milestone; multi-worker Stage 2 is a follow-up once correctness is + proven. + +## 7. Risks + +- **History-tree staleness (A-3 / invariant 3)** — highest risk; silent root + divergence if the history tree is re-read instead of threaded. Mitigate with a + `cfg(debug_assertions)` cross-check comparing the threaded history-tree `.hash()` + against `db.history_tree().hash()` during a soak. +- **Reset path** correctness under the pipeline (must reset all threaded state). +- **Interaction with `#115` retention** — `commit_finalized_direct` returns a + retention plan that must travel to the write stage. +- This overlaps upstream `#10725` (Arya's IBD engine), which takes the same + any-order approach wholesale; this prototype is the *minimal* slice for the + checkpoint-sync writer on the current architecture. + +## 7a. Measured baseline (post-B1/B2) and projected benefit + +After B1/B2 (parallel writer serialization on the dedicated pool, PR #128), +matched-height writer attribution in the heavy region (1.729–1.732M): + +| writer phase | ms/block | in pipeline → which stage | +|---|---|---| +| note-commitment tree update | ~26 | **Stage A (ordered, chained)** — the floor | +| `header_tx` serialization | ~8 | Stage B (any-order prep) | +| `value_pools` (block size) | ~8 | Stage B (any-order prep) | +| db.write | ~4 | Stage B (ordered write) | +| other (reads, value-pool total, etc.) | ~2 | split | +| **`write.busy` total** | **~48** | | + +Pipelining Stage B (serialization + write, ~20 ms) behind Stage A (tree chain, +~26 ms) of the next block collapses the writer's serial path to **~26–30 ms** +(the tree chain + db.write; serialization hidden). Projected **~1.5–1.8× over +B1/B2** on the writer side, i.e. cumulative **~3.5–4.5× over the pre-stack +baseline** in the heavy region — **bounded by the ~26 ms tree-update chain.** +Going below that needs a faster tree update (separate problem). + +This projection is grounded in measured per-phase data, not estimated. + +## 7d. MEASURED RESULT — built, benchmarked, no throughput gain (CPU-bound) + +**Status: built the full two-thread pipeline, verified correct, benchmarked it +matched-height against B1/B2 (`zebrad-b1b2`). It does NOT improve throughput in +this regime, and is marginally slower. Root cause measured below. Not PR'd (no +measurable win).** + +Implementation (branch `proto-any-order-pipeline`): the finalized writer loop is +split into **Stage A (compute thread)** — the chained treestate computation +(`compute_finalized`: note-tree update, ZIP-244 commitment check, history-tree +push), with note + history trees threaded in memory A→A — feeding a bounded +channel to **Stage B (the existing writer thread)** — `finish_pipelined`: batch +build + RocksDB write + `set_finalized_tip`, in order. `ChainTipSender` stays in +Stage B (no Clone needed); the receiver is moved into Stage A via `mem::replace`. + +Correctness: 46/46 `finalized_state` unit tests pass; a clean checkpoint sync +1.707M→1.737M committed every block with no commitment errors, no resets, and the +on-shutdown DB-format integrity check passed (every block's history root validated +against the threaded trees → treestate threading is correct on the happy path). + +Benchmark (mainnet heavy region 1.72M→1.73M, pinned peer, Zakura off, 8-core box): + +| metric (heavy region) | B1/B2 (serial) | pipeline | +| ---------------------------- | -------------- | -------- | +| throughput | 29.5 blk/s | 26.4 blk/s | +| writer busy (compute+write) | 25.9 ms/blk | 18.2 ms/blk (write only) | +| Stage A compute | — (inline) | 17.7 ms/blk (concurrent) | +| writer wait (idle) | 7.8 ms/blk | 19.6 ms/blk | +| **writer cycle (busy+wait)** | **33.7 ms/blk**| **37.8 ms/blk** | +| CPU used | 7.75 / 8 cores | 7.14 / 8 cores | +| downloads in-flight | 1550 | 1357 (buffer full → not peer-starved) | + +**Why it doesn't help:** the heavy region is already **CPU-saturated (~7.75/8 +cores)** with the download buffer full — the bottleneck after cyc1–7 + B1/B2 is no +longer the serial commit *stage*, it is *total CPU work across the whole sync +pipeline* (global-pool verify + commit-pool tree/serialization + tokio). Splitting +the commit into two threads does not add cores; it redistributes the same CPU +work. Stage A (the chained tree update) cannot be parallelized away and becomes the +new gating stage, so Stage B spends most of its time (19.6 ms) *idle waiting for +Stage A*. The cross-thread handoff + both stages sharing `COMMIT_COMPUTE_POOL` +(alongside the global verify pool) add scheduling/contention overhead, leaving +cores **more** idle (7.14 < 7.75) — net writer cycle 33.7→37.8 ms, ~10% slower. + +This is work-conservation: on an N-core box at ~N/N utilization, wall-time ≥ +total_work / N regardless of how the commit is partitioned. Deeper pipeline depth, +separate pools per stage, etc. cannot beat it — none add CPU capacity. + +**Takeaway / redirected lever:** further *commit-side restructuring* (pipelining, +any-order, more parallel batch prep) cannot raise checkpoint-sync throughput while +CPU-bound. The only remaining lever is **reducing total CPU work**. NOTE: the +`to_librustzcash` *de-dup* is already done and in this baseline — commit +`229c620b4` / PR #125 (`txid_and_auth_digest`: one conversion → both txid + +auth-digest). That halved the conversions but each tx still does exactly **one** +`to_librustzcash()` reparse, which dominates the per-tx crypto. So the genuinely +remaining lever is **native ZIP-244 digests** — compute the v5 txid + auth +commitment directly from Zebra's `Transaction` structs, eliminating the +librustzcash reparse entirely (large, consensus-critical). That cuts cycles rather +than reshuffling them. The earlier ~1.5–1.8× projection assumed the +serial commit was the bottleneck with spare CPU to overlap onto — that assumption +no longer holds after the cyc1–7 + B1/B2 wins pushed the region to CPU saturation. + +## 7c. Prototype build status (autonomous run, validated as far as safe) + +**Built + verified (on `proto-any-order-pipeline`):** the foundational refactor — +`commit_finalized_direct` split into: +- `compute_finalized(&self, finalizable, prev_note_trees, prev_history_tree) + -> ComputedFinalized` (Stage A: tree update, commitment check, history push; + **history tree now threadable** instead of re-read), and +- `write_finalized(&mut self, ComputedFinalized) -> (hash, note_trees)` + (Stage B: contiguity asserts, `write_block`, stop-height), +- with `commit_finalized_direct` kept as a thin wrapper (passes + `prev_history_tree = None`, identical behaviour). + +This compiles and is **behaviour-preserving — `cargo test -p zebra-state +finalized_state::tests` 46/46 pass**, and is the genuine enabler for the pipeline +(it isolates compute from write and makes the history tree threadable). It has no +standalone perf benefit (so it is not separately PR'd). + +**Not built (consensus-critical, left for the reviewed refactor):** the two-thread +split itself. The remaining work, now precisely mapped: +- Restructure `WriteBlockWorkerTask::run` to own its fields (currently `&mut self`) + so the finalized-block receiver can move into a **Stage A compute thread**, + while the main thread becomes **Stage B** and keeps the non-`Clone` + `ChainTipSender` (used by both the finalized writes and the later non-finalized + phase) — this resolves the `ChainTipSender` ownership obstacle. +- A bounded channel A→B carrying `(Result, rsp_tx)`; Stage B + does the write, `rsp_tx.send`, `chain_tip_sender.set_finalized_tip`, and metrics + (replicating `commit_finalized`'s tail). +- An in-memory next-height counter in Stage A (the DB tip lags under the pipeline, + so the wrong-height drop can't use `db.finalized_tip_height()`). +- A cross-thread reset (`Arc`): on a Stage B write error, signal Stage + A to clear its threaded trees/history + re-seed from the DB tip. +- `pub(crate)` visibility for `compute_finalized`/`write_finalized`/`ComputedFinalized`. + +**Why not completed autonomously:** the error/reset path, the contextual +(non-finalized) arm, and shutdown draining are **not exercised by a clean +checkpoint-sync differential test**, so the full split cannot be self-verified +without review — and it is exactly the reviewed refactor this work was scoped to +inform. The differential sync *does* validate the happy path (every commitment +check validates the history root → catches treestate-threading bugs), so the +threading mechanism is sound; the un-exercised paths are the review surface. + +## 7b. Prototype assessment & recommendation (autonomous run) + +**Status:** designed, measured, and projected — **not built autonomously, by +choice.** The writer split is consensus-critical, and its edge cases (the +error/reset path, the contextual/non-finalized arm, and shutdown draining) are +**not exercised by a clean checkpoint-sync differential test**, so a full +prototype cannot be self-verified without human review. The lower-risk building +blocks are already proven in this PR stack: + +- **Concurrent prep + ordered commit works** — cycle-7 (#127) precomputes per + block in the concurrent download stage and commits ordered; the same shape the + pipeline generalizes. +- **The independent serialization parallelizes** — B1/B2 (#128) on the dedicated + pool, with measured −53% per phase. +- **The chained floor is measured** — the ~26 ms tree update bounds the result. + +**Recommended path for the reviewed refactor:** +1. Thread the history tree in memory through the commit path (mirror the existing + `prev_note_commitment_trees` threading + its reset-on-error via `.take()` → + re-read). Byte-identical without the pipeline; the prerequisite that removes + the stale-re-read hazard. **Verify:** differential sync + the `cfg(debug)` + history-tree `.hash()` cross-check. +2. Split `commit_finalized_direct` into `compute_finalized` (Stage A: tree + + treestate + history push, threaded) and `write_finalized` (Stage B: + `write_block` + tip), connected by a bounded channel (depth 2–4). +3. Single-thread Stage B first (writes trivially ordered) → ~1.65×; then widen to + a few workers (any-order build, ordered write) → toward the ~26 ms floor. +4. Keep the contextual arm on the existing path initially (checkpoint arm only). + +## 8. Benchmark & test plan + +- **Throughput:** same harness (`metricsrun.sh`, Zakura-off, pinned peer), matched + heights vs the pre-pipeline tip; expect heavy-region per-block wall ~106 → ~64 ms + (single-thread Stage 2) → toward ~39 ms (multi-worker). +- **Correctness:** differential mainnet sync to a fixed height with byte-identical + finalized tip hash + `z_gettreestate` roots vs baseline; the `cfg(debug)` + history-tree cross-check during a soak; `cargo test -p zebra-state` (watch the + `rsp_tx`/reset-path tests, which move to the write stage). diff --git a/zebra-chain/proptest-regressions/parallel/batch_frontier.txt b/zebra-chain/proptest-regressions/parallel/batch_frontier.txt new file mode 100644 index 00000000000..1dbc34e723c --- /dev/null +++ b/zebra-chain/proptest-regressions/parallel/batch_frontier.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc e6b99b1bc89a4692d82729a22abc108af1e15537ee5e124b91e19c952bce4ee7 # shrinks to prefix_len = 0, batch = [TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(0), TestNode(102), TestNode(9419101705697858680), TestNode(510312245048786735), TestNode(2409037999938848265), TestNode(17952681726410662532), TestNode(9871315311868101043), TestNode(8695758199261085473), TestNode(9905333123009332806), TestNode(950518047781477098), TestNode(13155598055379452971)] diff --git a/zebra-chain/src/block.rs b/zebra-chain/src/block.rs index ec31bf738f3..dda98003b3b 100644 --- a/zebra-chain/src/block.rs +++ b/zebra-chain/src/block.rs @@ -252,7 +252,26 @@ impl Block { /// /// [ZIP-244]: https://zips.z.cash/zip-0244 pub fn auth_data_root(&self) -> AuthDataRoot { - self.transactions.iter().collect::() + use rayon::prelude::*; + + // Computing each transaction's authorizing-data digest dominates this + // function for blocks with many (or large) shielded transactions: each + // `auth_digest` re-serializes the transaction and BLAKE2b-hashes its + // authorizing data, scaling with the transaction's shielded I/O. The + // digests are independent, so compute them across the rayon pool. + // + // `collect` into a `Vec` preserves transaction order, so the Merkle root + // built from these digests is byte-identical to the sequential version + // (asserted by a differential proptest in `block::tests::prop`). + self.transactions + .par_iter() + .map(|tx| { + tx.auth_digest() + .unwrap_or(crate::block::merkle::AUTH_DIGEST_PLACEHOLDER) + }) + .collect::>() + .into_iter() + .collect::() } } diff --git a/zebra-chain/src/block/tests/prop.rs b/zebra-chain/src/block/tests/prop.rs index f7745cbf581..44406d67277 100644 --- a/zebra-chain/src/block/tests/prop.rs +++ b/zebra-chain/src/block/tests/prop.rs @@ -141,6 +141,20 @@ proptest! { } } } + + /// `Block::auth_data_root` computes per-transaction auth digests across the + /// rayon pool; this asserts the parallel result is byte-identical to the + /// sequential computation (same digests, same transaction order, same root). + #[test] + fn auth_data_root_parallel_matches_sequential(block in any::()) { + let _init_guard = zebra_test::init(); + + let sequential: crate::block::merkle::AuthDataRoot = + block.transactions.iter().collect(); + let parallel = block.auth_data_root(); + + prop_assert_eq!(sequential, parallel); + } } /// Test [`Block::coinbase_height`]. diff --git a/zebra-chain/src/orchard/tree.rs b/zebra-chain/src/orchard/tree.rs index d833dd4c32e..0bd2ff1adb1 100644 --- a/zebra-chain/src/orchard/tree.rs +++ b/zebra-chain/src/orchard/tree.rs @@ -401,6 +401,76 @@ impl NoteCommitmentTree { } } + /// Appends a batch of note commitment x-coordinates to the tree in parallel, + /// returning the index and root of any [`TRACKED_SUBTREE_HEIGHT`] subtree + /// completed by the batch. + /// + /// The result is identical to calling [`Self::append`] for each commitment in + /// order — and capturing the last completed subtree — but the Merkle hashing + /// is parallelized across the rayon pool. This equivalence is enforced by the + /// differential property tests in [`crate::parallel::batch_frontier`]. + /// + /// Returns an error if the tree would overflow its capacity. + #[allow(clippy::unwrap_in_result)] + pub fn append_batch( + &mut self, + cms: &[NoteCommitmentUpdate], + ) -> Result, NoteCommitmentTreeError> { + use crate::parallel::batch_frontier::parallel_append; + + if cms.is_empty() { + return Ok(None); + } + + let nodes: Vec = cms.iter().map(|cm_x| (*cm_x).into()).collect(); + + let old_size = self.inner.tree_size(); + let new_size = old_size + nodes.len() as u64; + + // A block has fewer than 2^16 actions (consensus rule), so the batch + // crosses at most one tracked-subtree (2^TRACKED_SUBTREE_HEIGHT) boundary. + let subtree_size = 1u64 << TRACKED_SUBTREE_HEIGHT; + let boundary = (old_size / subtree_size + 1) * subtree_size; + + let frontier = self.inner.clone(); + + let (frontier, completed) = if boundary <= new_size { + let head_len = (boundary - old_size) as usize; + let (head, tail) = nodes.split_at(head_len); + + let f1 = parallel_append(frontier, head.to_vec()) + .map_err(|_| NoteCommitmentTreeError::FullTree)?; + + let index = NoteCommitmentSubtreeIndex( + ((boundary >> TRACKED_SUBTREE_HEIGHT) - 1) + .try_into() + .expect("subtree index fits in u16"), + ); + let root = f1 + .value() + .expect("just appended at least one leaf") + .root(Some(incrementalmerkletree::Level::from( + TRACKED_SUBTREE_HEIGHT, + ))); + + let f2 = parallel_append(f1, tail.to_vec()) + .map_err(|_| NoteCommitmentTreeError::FullTree)?; + (f2, Some((index, root))) + } else { + let f = + parallel_append(frontier, nodes).map_err(|_| NoteCommitmentTreeError::FullTree)?; + (f, None) + }; + + self.inner = frontier; + *self + .cached_root + .get_mut() + .expect("a thread that previously held exclusive lock access panicked") = None; + + Ok(completed) + } + /// Returns frontier of non-empty tree, or `None` if the tree is empty. fn frontier(&self) -> Option<&NonEmptyFrontier> { self.inner.value() @@ -709,3 +779,40 @@ impl From> for NoteCommitmentTree { tree } } + +#[cfg(test)] +mod tests { + use incrementalmerkletree::{frontier::Frontier, Position}; + + use super::*; + + fn note_commitment(value: u64) -> NoteCommitmentUpdate { + let mut bytes = [0; 32]; + bytes[..8].copy_from_slice(&value.to_le_bytes()); + + Option::::from(pallas::Base::from_repr(bytes)) + .expect("small little-endian integers are canonical field elements") + } + + #[test] + fn append_batch_overflow_preserves_tree_and_cached_root() { + let max_position = (1u64 << MERKLE_DEPTH) - 1; + let leaf = Node(note_commitment(1)); + let ommers = vec![Node(note_commitment(2)); usize::from(MERKLE_DEPTH)]; + let inner = Frontier::from_parts(Position::from(max_position), leaf, ommers) + .expect("max-depth frontier is valid"); + let mut tree = NoteCommitmentTree { + inner, + cached_root: Default::default(), + }; + + let _ = tree.root(); + let original = tree.clone(); + + let result = tree.append_batch(&[note_commitment(3)]); + + assert_eq!(result, Err(NoteCommitmentTreeError::FullTree)); + tree.assert_frontier_eq(&original); + assert_eq!(tree.root(), original.root()); + } +} diff --git a/zebra-chain/src/parallel.rs b/zebra-chain/src/parallel.rs index 0a2dcffd720..08505a05adf 100644 --- a/zebra-chain/src/parallel.rs +++ b/zebra-chain/src/parallel.rs @@ -1,3 +1,4 @@ //! Parallel chain update methods. +pub mod batch_frontier; pub mod tree; diff --git a/zebra-chain/src/parallel/batch_frontier.rs b/zebra-chain/src/parallel/batch_frontier.rs new file mode 100644 index 00000000000..064e59baf42 --- /dev/null +++ b/zebra-chain/src/parallel/batch_frontier.rs @@ -0,0 +1,288 @@ +//! Parallel batch append for incremental Merkle [`Frontier`]s. +//! +//! Note commitment trees (Sprout, Sapling, Orchard) are all +//! [`incrementalmerkletree::frontier::Frontier`], differing only in the +//! per-pool [`Hashable::combine`] hash (SHA-256 / Pedersen / Sinsemilla). The +//! standard [`Frontier::append`] adds one leaf at a time, performing the Merkle +//! merge hashes sequentially. For a block with many shielded outputs this is the +//! dominant cost of committing the block, and it runs on a single thread. +//! +//! [`parallel_append`] produces a [`Frontier`] **byte-identical** to appending +//! each leaf sequentially, but computes the internal Merkle hashes with a +//! parallel divide-and-conquer reduction across the rayon thread pool. +//! +//! # Correctness +//! +//! This is consensus-critical: the frontier *is* the note commitment tree +//! commitment, so the result must match the sequential append exactly. The +//! implementation is validated by differential property tests against the +//! sequential [`Frontier::append`] (identical frontier parts and identical root) +//! in the `tests` module below. + +use incrementalmerkletree::{ + frontier::{Frontier, FrontierError}, + Hashable, Level, Position, +}; +use rayon::prelude::*; + +/// A pure binary-counter forest of a contiguous run of leaves, indexed by level: +/// `forest[L] == Some(root)` iff bit `L` of the run length is set, in which case +/// `root` is the root of the complete `2^L`-leaf subtree covering that aligned +/// block. Higher set bits (older subtrees) are further left in leaf order. +/// +/// This is *pure*: unlike the crate's `CommitmentTree`, it has no lazy level-0 +/// staging, which makes the algebra below unambiguous. +type Forest = Vec>; + +/// Injects a complete subtree `node` at level `level` into the binary-counter +/// `forest`, propagating carries upward. +/// +/// `node` (and anything it carries into) must be **strictly newer** (further +/// right in leaf order) than everything already in `forest`, so the existing slot +/// value is always the left (older) argument of [`Hashable::combine`]. This holds +/// because we only ever inject the old tip leaf and then the new leaves, in +/// ascending position order. +fn inject(forest: &mut Forest, level: usize, node: H) { + let mut idx = level; + let mut carry = node; + loop { + if idx >= forest.len() { + forest.resize(idx + 1, None); + } + match forest[idx].take() { + None => { + forest[idx] = Some(carry); + break; + } + Some(existing) => { + // Combining two level-`idx` nodes yields a level-`idx+1` node; + // the crate's `combine` takes the *children's* level. + carry = H::combine(Level::from(idx as u8), &existing, &carry); + idx += 1; + } + } + } +} + +/// Computes the root of a perfect subtree of exactly `2^k` `leaves`, using a +/// parallel divide-and-conquer reduction. The combine hashes within and across +/// the two halves are independent, so this scales across the rayon pool. +fn perfect_subtree_root(leaves: &[H]) -> H { + debug_assert!(leaves.len().is_power_of_two()); + if leaves.len() == 1 { + return leaves[0].clone(); + } + let half = leaves.len() / 2; + // children level = log2(len) - 1 = log2(half) + let child_level = Level::from(half.trailing_zeros() as u8); + let (left, right) = leaves.split_at(half); + let (l, r) = rayon::join( + || perfect_subtree_root(left), + || perfect_subtree_root(right), + ); + H::combine(child_level, &l, &r) +} + +/// Appends `new_leaves` (in order) to `frontier`, returning the updated frontier. +/// +/// The result is identical to calling [`Frontier::append`] for each leaf in turn, +/// but the Merkle merge hashes are computed in parallel. +/// +/// # Method +/// +/// A frontier of size `S` stores the last leaf raw plus `ommers` that are exactly +/// the pure forest of the first `S - 1` leaves. We: +/// 1. rebuild that forest from `ommers` (bit `L` of `position` set ⇒ one ommer); +/// 2. inject the old tip leaf, giving the pure forest of all `S` leaves; +/// 3. append every new leaf except the last as **globally position-aligned +/// dyadic blocks** — each block's root computed in parallel — injecting them +/// in ascending position order (aligned blocks compose with no cross-boundary +/// re-pairing, which is what makes the parallel reduction exact); +/// 4. the new last leaf becomes the frontier's raw leaf, and the resulting forest +/// becomes its ommers. +/// +/// Returns [`FrontierError`] if appending would overflow the tree's `DEPTH` capacity. +pub fn parallel_append( + frontier: Frontier, + new_leaves: Vec, +) -> Result, FrontierError> +where + H: Hashable + Clone + Send + Sync, +{ + if new_leaves.is_empty() { + return Ok(frontier); + } + + // Rebuild the pure forest of the existing tree, and the next free position. + let (mut forest, mut size): (Forest, u64) = match frontier.value() { + None => (Vec::new(), 0), + Some(f) => { + let (position, leaf, ommers) = (f.position(), f.leaf().clone(), f.ommers().to_vec()); + let pos = u64::from(position); // = S - 1 + // ommers (low→high) sit at the set bits of `pos`. + let mut forest: Forest = Vec::new(); + let mut ommers = ommers.into_iter(); + for level in 0..u64::BITS { + if pos & (1 << level) != 0 { + if level as usize >= forest.len() { + forest.resize(level as usize + 1, None); + } + forest[level as usize] = Some(ommers.next().expect("ommer per set bit")); + } + } + // Inject the old tip leaf (position `pos`) to get the pure forest of S leaves. + inject(&mut forest, 0, leaf); + (forest, pos + 1) + } + }; + + // The new last leaf stays raw; everything before it joins the forest as + // globally-aligned dyadic blocks. + let last = new_leaves.len() - 1; + let body = &new_leaves[..last]; + + // Decompose [size, size + body.len()) into maximal position-aligned dyadic + // blocks, then compute each block's root in parallel. + let mut blocks: Vec<(usize, &[H])> = Vec::new(); + { + let mut pos = size; + let end = size + body.len() as u64; + let mut offset = 0usize; + while pos < end { + // Largest level L with pos % 2^L == 0 and pos + 2^L <= end. + let align = if pos == 0 { + u64::BITS + } else { + pos.trailing_zeros() + }; + let remaining = end - pos; + let span = u64::BITS - 1 - remaining.leading_zeros(); // floor(log2(remaining)) + let level = align.min(span) as usize; + let width = 1usize << level; + blocks.push((level, &body[offset..offset + width])); + offset += width; + pos += width as u64; + } + } + + // Compute all block roots concurrently (and each reduction is itself + // internally parallel). `par_iter().collect()` preserves order, so injection + // below stays in ascending position order, keeping the carry order exact. + let roots: Vec<(usize, H)> = blocks + .into_par_iter() + .map(|(level, leaves)| (level, perfect_subtree_root(leaves))) + .collect(); + for (level, root) in roots { + inject(&mut forest, level, root); + } + size += body.len() as u64; + + // The final frontier: raw last leaf at `position = size`, ommers = forest + // (compacted low→high, matching the set bits of `position`). + let position = Position::from(size); + let leaf = new_leaves[last].clone(); + let ommers: Vec = forest.into_iter().flatten().collect(); + + Frontier::from_parts(position, leaf, ommers) +} + +#[cfg(test)] +mod tests { + use super::*; + use proptest::prelude::*; + use std::hash::{Hash, Hasher}; + + const DEPTH: u8 = 32; + + /// A self-contained test node whose `combine` is **order-sensitive** (so a + /// left/right swap changes the result) and **level-sensitive** (so a wrong + /// `combine` level argument changes the result). This lets the differential + /// tests catch ordering and level bugs in the parallel append. + #[derive(Copy, Clone, Debug, PartialEq, Eq)] + struct TestNode(u64); + + impl Hashable for TestNode { + fn empty_leaf() -> Self { + Self(0) + } + + fn combine(level: Level, a: &Self, b: &Self) -> Self { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + u8::from(level).hash(&mut hasher); + a.0.hash(&mut hasher); + b.0.hash(&mut hasher); + // Keep it non-zero-ish and order/level sensitive. + Self(hasher.finish()) + } + } + + /// Append `leaves` to `start` one at a time using the sequential crate API. + fn sequential_append( + start: Frontier, + leaves: &[TestNode], + ) -> Frontier { + let mut f = start; + for leaf in leaves { + assert!(f.append(*leaf), "test trees never overflow"); + } + f + } + + fn build_frontier(prefix: &[TestNode]) -> Frontier { + let mut f = Frontier::::empty(); + for leaf in prefix { + assert!(f.append(*leaf)); + } + f + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(2000))] + + /// The parallel batch append must produce a byte-identical frontier (and + /// therefore an identical root) to the sequential append, for any + /// starting tree size and any batch size. + #[test] + fn parallel_matches_sequential( + prefix_len in 0usize..300, + batch in proptest::collection::vec(any::().prop_map(TestNode), 0..300), + ) { + let prefix: Vec = (0..prefix_len as u64).map(TestNode).collect(); + let start = build_frontier(&prefix); + + let seq = sequential_append(start.clone(), &batch); + let par = parallel_append(start, batch.clone()).expect("no overflow in tests"); + + prop_assert_eq!(seq.root(), par.root(), "root mismatch"); + prop_assert_eq!( + seq.value().map(|f| f.clone().into_parts()), + par.value().map(|f| f.clone().into_parts()), + "frontier parts mismatch" + ); + } + } + + /// Spot-check small exhaustive sizes for off-by-one boundary bugs. + #[test] + fn exhaustive_small() { + for prefix_len in 0u64..40 { + let prefix: Vec = (0..prefix_len).map(TestNode).collect(); + let start = build_frontier(&prefix); + for batch_len in 0u64..40 { + let batch: Vec = (1000..1000 + batch_len).map(TestNode).collect(); + let seq = sequential_append(start.clone(), &batch); + let par = parallel_append(start.clone(), batch).expect("no overflow"); + assert_eq!( + seq.root(), + par.root(), + "root mismatch p={prefix_len} b={batch_len}" + ); + assert_eq!( + seq.value().map(|f| f.clone().into_parts()), + par.value().map(|f| f.clone().into_parts()), + "parts mismatch p={prefix_len} b={batch_len}" + ); + } + } + } +} diff --git a/zebra-chain/src/parallel/tree.rs b/zebra-chain/src/parallel/tree.rs index 94cbc7d9bc3..da4981d7cb8 100644 --- a/zebra-chain/src/parallel/tree.rs +++ b/zebra-chain/src/parallel/tree.rs @@ -171,18 +171,11 @@ impl NoteCommitmentTrees { // > The size of a block MUST be less than or equal to 2000000 bytes. // // - let mut subtree_root = None; - - for sapling_note_commitment in sapling_note_commitments { - sapling_nct.append(sapling_note_commitment)?; - - // Subtrees end heights come from the blocks they are completed in, - // so we check for new subtrees after appending the note. - // (If we check before, subtrees at the end of blocks have the wrong heights.) - if let Some(index_and_node) = sapling_nct.completed_subtree_index_and_root() { - subtree_root = Some(index_and_node); - } - } + // + // The note commitments are appended as a single parallel batch, which + // returns the (at most one) subtree completed within this block, matching + // the per-leaf append exactly (see `crate::parallel::batch_frontier`). + let subtree_root = sapling_nct.append_batch(&sapling_note_commitments)?; // Re-calculate and cache the tree root. let _ = sapling_nct.root(); @@ -208,18 +201,11 @@ impl NoteCommitmentTrees { // It is impossible for blocks to contain more than one level 16 orchard root: // > [NU5 onward] nSpendsSapling, nOutputsSapling, and nActionsOrchard MUST all be less than 2^16. // - let mut subtree_root = None; - - for orchard_note_commitment in orchard_note_commitments { - orchard_nct.append(orchard_note_commitment)?; - - // Subtrees end heights come from the blocks they are completed in, - // so we check for new subtrees after appending the note. - // (If we check before, subtrees at the end of blocks have the wrong heights.) - if let Some(index_and_node) = orchard_nct.completed_subtree_index_and_root() { - subtree_root = Some(index_and_node); - } - } + // + // The note commitments are appended as a single parallel batch, which + // returns the (at most one) subtree completed within this block, matching + // the per-leaf append exactly (see `crate::parallel::batch_frontier`). + let subtree_root = orchard_nct.append_batch(&orchard_note_commitments)?; // Re-calculate and cache the tree root. let _ = orchard_nct.root(); diff --git a/zebra-chain/src/primitives/zcash_primitives.rs b/zebra-chain/src/primitives/zcash_primitives.rs index f4c4695d75f..877179bcdfe 100644 --- a/zebra-chain/src/primitives/zcash_primitives.rs +++ b/zebra-chain/src/primitives/zcash_primitives.rs @@ -11,7 +11,7 @@ use crate::{ amount::{Amount, NonNegative}, parameters::NetworkUpgrade, serialization::ZcashSerialize, - transaction::{AuthDigest, HashType, SigHash, Transaction}, + transaction::{AuthDigest, Hash, HashType, SigHash, Transaction}, transparent::{self, Script}, Error, }; @@ -524,3 +524,33 @@ pub(crate) fn auth_digest(tx: &Transaction) -> AuthDigest { .expect("digest has the correct size"), ) } + +/// Compute both the transaction ID (txid) and the ZIP-244 authorizing-data +/// commitment of a v5+ transaction from a *single* librustzcash conversion. +/// +/// Computing them separately ([`Transaction::hash`] and [`auth_digest`]) each +/// re-serialize and re-parse the whole transaction, which dominates the cost on +/// heavy shielded transactions. This shares that one conversion, so the auth +/// digest is nearly free once the txid is computed. The results are byte-for-byte +/// identical to the separate computations (same conversion, same accessors). +/// +/// # Panics +/// +/// If passed a pre-v5 transaction. +pub(crate) fn txid_and_auth_digest(tx: &Transaction) -> (Hash, AuthDigest) { + let nu = tx.network_upgrade().expect("V5 tx has a network upgrade"); + + let tx = tx + .to_librustzcash(nu) + .expect("V5 tx is convertible to its `zcash_params` equivalent"); + + let txid = Hash(*tx.txid().as_ref()); + let auth_digest = AuthDigest( + tx.auth_commitment() + .as_ref() + .try_into() + .expect("digest has the correct size"), + ); + + (txid, auth_digest) +} diff --git a/zebra-chain/src/sapling/tree.rs b/zebra-chain/src/sapling/tree.rs index bcd4b0c6559..9b0055758a8 100644 --- a/zebra-chain/src/sapling/tree.rs +++ b/zebra-chain/src/sapling/tree.rs @@ -217,6 +217,81 @@ impl NoteCommitmentTree { } } + /// Appends a batch of note commitment u-coordinates to the tree in parallel, + /// returning the index and root of any [`TRACKED_SUBTREE_HEIGHT`] subtree + /// completed by the batch. + /// + /// The result is identical to calling [`Self::append`] for each commitment in + /// order — and capturing the last completed subtree — but the Merkle hashing + /// is parallelized across the rayon pool. This equivalence is enforced by the + /// differential property tests in [`crate::parallel::batch_frontier`]. + /// + /// Returns an error if the tree would overflow its capacity. + #[allow(clippy::unwrap_in_result)] + pub fn append_batch( + &mut self, + cms: &[NoteCommitmentUpdate], + ) -> Result, NoteCommitmentTreeError> + { + use crate::parallel::batch_frontier::parallel_append; + + if cms.is_empty() { + return Ok(None); + } + + let nodes: Vec = + cms.iter().map(sapling_crypto::Node::from_cmu).collect(); + + let old_size = self.inner.tree_size(); + let new_size = old_size + nodes.len() as u64; + + // A block has fewer than 2^16 outputs (consensus rule), so the batch + // crosses at most one tracked-subtree (2^TRACKED_SUBTREE_HEIGHT) boundary. + let subtree_size = 1u64 << TRACKED_SUBTREE_HEIGHT; + let boundary = (old_size / subtree_size + 1) * subtree_size; + + let frontier = self.inner.clone(); + + let (frontier, completed) = if boundary <= new_size { + // Split so the leaf at `boundary - 1` (which completes the subtree) + // ends a sub-batch; capture that subtree's index and root exactly as + // the per-leaf append would. + let head_len = (boundary - old_size) as usize; + let (head, tail) = nodes.split_at(head_len); + + let f1 = parallel_append(frontier, head.to_vec()) + .map_err(|_| NoteCommitmentTreeError::FullTree)?; + + let index = NoteCommitmentSubtreeIndex( + ((boundary >> TRACKED_SUBTREE_HEIGHT) - 1) + .try_into() + .expect("subtree index fits in u16"), + ); + let root = f1 + .value() + .expect("just appended at least one leaf") + .root(Some(incrementalmerkletree::Level::from( + TRACKED_SUBTREE_HEIGHT, + ))); + + let f2 = parallel_append(f1, tail.to_vec()) + .map_err(|_| NoteCommitmentTreeError::FullTree)?; + (f2, Some((index, root))) + } else { + let f = + parallel_append(frontier, nodes).map_err(|_| NoteCommitmentTreeError::FullTree)?; + (f, None) + }; + + self.inner = frontier; + *self + .cached_root + .get_mut() + .expect("a thread that previously held exclusive lock access panicked") = None; + + Ok(completed) + } + /// Returns frontier of non-empty tree, or None. fn frontier(&self) -> Option<&NonEmptyFrontier> { self.inner.value() @@ -528,3 +603,48 @@ impl From> for NoteCommitment tree } } + +#[cfg(test)] +mod tests { + use incrementalmerkletree::{frontier::Frontier, Position}; + + use super::*; + + fn node(value: u64) -> sapling_crypto::Node { + let mut bytes = [0; 32]; + bytes[..8].copy_from_slice(&value.to_le_bytes()); + + Option::::from(sapling_crypto::Node::from_bytes(bytes)) + .expect("small little-endian integers are canonical field elements") + } + + fn note_commitment(value: u64) -> NoteCommitmentUpdate { + let mut bytes = [0; 32]; + bytes[..8].copy_from_slice(&value.to_le_bytes()); + + Option::::from(NoteCommitmentUpdate::from_bytes(&bytes)) + .expect("small little-endian integers are canonical field elements") + } + + #[test] + fn append_batch_overflow_preserves_tree_and_cached_root() { + let max_position = (1u64 << MERKLE_DEPTH) - 1; + let leaf = node(1); + let ommers = vec![node(2); usize::from(MERKLE_DEPTH)]; + let inner = Frontier::from_parts(Position::from(max_position), leaf, ommers) + .expect("max-depth frontier is valid"); + let mut tree = NoteCommitmentTree { + inner, + cached_root: Default::default(), + }; + + let _ = tree.root(); + let original = tree.clone(); + + let result = tree.append_batch(&[note_commitment(3)]); + + assert_eq!(result, Err(NoteCommitmentTreeError::FullTree)); + tree.assert_frontier_eq(&original); + assert_eq!(tree.root(), original.root()); + } +} diff --git a/zebra-chain/src/transaction.rs b/zebra-chain/src/transaction.rs index 24bbd91df28..369c9472356 100644 --- a/zebra-chain/src/transaction.rs +++ b/zebra-chain/src/transaction.rs @@ -287,6 +287,33 @@ impl Transaction { } } + /// Compute this transaction's ID (txid) and ZIP-244 authorizing-data digest + /// together, sharing the single (expensive) librustzcash conversion that + /// both otherwise perform separately. Returns `None` for the auth digest of + /// pre-v5 transactions (as [`Self::auth_digest`] does). + /// + /// The results are identical to calling [`Self::hash`] and + /// [`Self::auth_digest`] independently. + pub fn txid_and_auth_digest(&self) -> (Hash, Option) { + match self { + Transaction::V1 { .. } + | Transaction::V2 { .. } + | Transaction::V3 { .. } + | Transaction::V4 { .. } => (self.hash(), None), + Transaction::V5 { .. } => { + let (txid, auth_digest) = + crate::primitives::zcash_primitives::txid_and_auth_digest(self); + (txid, Some(auth_digest)) + } + #[cfg(all(zcash_unstable = "nu7", feature = "tx_v6"))] + Transaction::V6 { .. } => { + let (txid, auth_digest) = + crate::primitives::zcash_primitives::txid_and_auth_digest(self); + (txid, Some(auth_digest)) + } + } + } + // other properties /// Does this transaction have transparent inputs? diff --git a/zebra-chain/src/transaction/tests/prop.rs b/zebra-chain/src/transaction/tests/prop.rs index 6891cff6f2f..1b310c2cbbf 100644 --- a/zebra-chain/src/transaction/tests/prop.rs +++ b/zebra-chain/src/transaction/tests/prop.rs @@ -46,6 +46,19 @@ proptest! { } } + /// `txid_and_auth_digest` shares one librustzcash conversion to produce both + /// the txid and the ZIP-244 auth digest; this asserts the result is identical + /// to computing them separately via `hash()` and `auth_digest()`. + #[test] + fn txid_and_auth_digest_matches_separate(tx in any::()) { + let _init_guard = zebra_test::init(); + + let (txid, auth_digest) = tx.txid_and_auth_digest(); + + prop_assert_eq![txid, tx.hash()]; + prop_assert_eq![auth_digest, tx.auth_digest()]; + } + #[test] fn transaction_hash_struct_display_roundtrip(hash in any::()) { let _init_guard = zebra_test::init(); diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 39ea831546d..23e93eb5d2d 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -22,6 +22,10 @@ categories = ["asynchronous", "cryptography::cryptocurrencies"] [features] default = [] +# Emit per-block checkpoint-verify phase histograms for performance +# investigation (zero overhead when disabled). +commit-metrics = [] + # Production features that activate extra dependencies, or extra features in dependencies progress-bar = [ diff --git a/zebra-consensus/src/block.rs b/zebra-consensus/src/block.rs index 13c8de91462..ac8eab930f8 100644 --- a/zebra-consensus/src/block.rs +++ b/zebra-consensus/src/block.rs @@ -352,6 +352,9 @@ where new_outputs, transaction_hashes, deferred_pool_balance_change: Some(deferred_pool_balance_change), + // The semantic verifier checks the auth-data commitment during + // contextual validation, so it isn't precomputed here. + auth_data_root: None, }; // Return early for proposal requests. diff --git a/zebra-consensus/src/block/request.rs b/zebra-consensus/src/block/request.rs index 534f6c599b8..80e89d6dbd9 100644 --- a/zebra-consensus/src/block/request.rs +++ b/zebra-consensus/src/block/request.rs @@ -3,12 +3,24 @@ use std::sync::Arc; use zebra_chain::block::Block; +use zebra_state::CheckpointVerifiedBlock; #[derive(Debug, Clone, PartialEq, Eq)] /// A request to the chain or block verifier pub enum Request { /// Performs semantic validation, then asks the state to perform contextual validation and commit the block Commit(Arc), + + /// Like [`Request::Commit`], but the (CPU-heavy) checkpoint-verifier + /// precomputation — the per-transaction txids and the auth data root — has + /// already been done by the caller, off the single-threaded checkpoint + /// verifier. + /// + /// Only valid below the checkpoint height; the verifier still performs all + /// validity checks (proof of work, Merkle root, height). Used by the syncer, + /// which can build these blocks concurrently across many download tasks. + CommitCheckpointPrecomputed(CheckpointVerifiedBlock), + /// Performs semantic validation but skips checking proof of work, /// then asks the state to perform contextual validation. /// Does not commit the block to the state. @@ -18,16 +30,17 @@ pub enum Request { impl Request { /// Returns inner block pub fn block(&self) -> Arc { - Arc::clone(match self { - Request::Commit(block) => block, - Request::CheckProposal(block) => block, - }) + match self { + Request::Commit(block) => Arc::clone(block), + Request::CommitCheckpointPrecomputed(block) => Arc::clone(&block.block), + Request::CheckProposal(block) => Arc::clone(block), + } } /// Returns `true` if the request is a proposal pub fn is_proposal(&self) -> bool { match self { - Request::Commit(_) => false, + Request::Commit(_) | Request::CommitCheckpointPrecomputed(_) => false, Request::CheckProposal(_) => true, } } diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index 6c3387fa57e..424a031d185 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -54,6 +54,20 @@ mod types; #[cfg(test)] mod tests; +/// Times `$body` and records its duration to the named histogram when the +/// `commit-metrics` feature is enabled; otherwise just evaluates `$body` with +/// zero overhead. Used to profile the serial checkpoint-verify (feed) stages. +macro_rules! timed_verify_phase { + ($name:expr, $body:expr) => {{ + #[cfg(feature = "commit-metrics")] + let _phase_start = std::time::Instant::now(); + let _phase_result = $body; + #[cfg(feature = "commit-metrics")] + metrics::histogram!($name).record(_phase_start.elapsed().as_secs_f64()); + _phase_result + }}; +} + pub use zebra_node_services::constants::{MAX_CHECKPOINT_BYTE_COUNT, MAX_CHECKPOINT_HEIGHT_GAP}; /// An unverified block, which is in the queue for checkpoint verification. @@ -598,35 +612,87 @@ where .ok_or(VerifyCheckpointError::CoinbaseHeight { hash })?; self.check_height(height)?; + // Cheap proof-of-work checks run *before* the expensive precomputation, + // so a flood of invalid-PoW blocks can't make us do per-transaction work. + self.check_proof_of_work(&block.header, height, hash)?; + + // Precompute the per-transaction hashes and auth data root, which scale + // with block weight. (The precomputed path does this concurrently in the + // caller and skips it here.) + let block = timed_verify_phase!( + "zebra.consensus.check_block.precompute.duration_seconds", + CheckpointVerifiedBlock::with_hash(block, hash) + ); + + self.finish_validation(block) + } + + /// Check a [`CheckpointVerifiedBlock`] whose precomputation (txids, auth data + /// root) was already done by the caller, off the single-threaded verifier. + /// + /// Runs the same validity checks as [`Self::check_block`] (height, proof of + /// work, Merkle root) against the precomputed block. + fn validate_precomputed_block( + &self, + block: CheckpointVerifiedBlock, + ) -> Result { + let hash = block.hash; + let height = block.height; + self.check_height(height)?; + self.check_proof_of_work(&block.block.header, height, hash)?; + self.finish_validation(block) + } + + /// Check the block's proof of work (difficulty, and equihash unless disabled). + fn check_proof_of_work( + &self, + header: &block::Header, + height: block::Height, + hash: block::Hash, + ) -> Result<(), VerifyCheckpointError> { if self.network.disable_pow() { crate::block::check::difficulty_threshold_is_valid( - &block.header, + header, &self.network, &height, &hash, )?; } else { - crate::block::check::difficulty_is_valid(&block.header, &self.network, &height, &hash)?; - crate::block::check::equihash_solution_is_valid(&block.header)?; + crate::block::check::difficulty_is_valid(header, &self.network, &height, &hash)?; + timed_verify_phase!( + "zebra.consensus.check_block.equihash.duration_seconds", + crate::block::check::equihash_solution_is_valid(header) + )?; } + Ok(()) + } + + /// Finish validating a (precomputed) checkpoint block: set its deferred pool + /// balance change and check its Merkle root. + fn finish_validation( + &self, + mut block: CheckpointVerifiedBlock, + ) -> Result { + let height = block.height; + // See [ZIP-1015](https://zips.z.cash/zip-1015). let expected_deferred_amount = funding_stream_values(height, &self.network, block_subsidy(height, &self.network)?)? .remove(&FundingStreamReceiver::Deferred); - let deferred_pool_balance_change = expected_deferred_amount + block.deferred_pool_balance_change = expected_deferred_amount .unwrap_or_default() .checked_sub(self.network.lockbox_disbursement_total_amount(height)) .map(DeferredPoolBalanceChange::new); - // don't do precalculation until the block passes basic difficulty checks - let block = CheckpointVerifiedBlock::new(block, Some(hash), deferred_pool_balance_change); - - crate::block::check::merkle_root_validity( - &self.network, - &block.block, - &block.transaction_hashes, + timed_verify_phase!( + "zebra.consensus.check_block.merkle.duration_seconds", + crate::block::check::merkle_root_validity( + &self.network, + &block.block, + &block.transaction_hashes, + ) )?; Ok(block) @@ -644,11 +710,31 @@ where /// returns an error immediately. #[allow(clippy::unwrap_in_result)] fn queue_block(&mut self, block: Arc) -> Result { + let block = self.check_block(block)?; + self.enqueue(block) + } + + /// Like [`Self::queue_block`], but for a block whose precomputation was + /// already done by the caller (off the single-threaded verifier). + #[allow(clippy::unwrap_in_result)] + fn queue_precomputed_block( + &mut self, + block: CheckpointVerifiedBlock, + ) -> Result { + let block = self.validate_precomputed_block(block)?; + self.enqueue(block) + } + + /// Add an already-validated checkpoint block to the queue of blocks waiting + /// to be verified against a checkpoint. + #[allow(clippy::unwrap_in_result)] + fn enqueue( + &mut self, + block: CheckpointVerifiedBlock, + ) -> Result { // Set up a oneshot channel to send results let (tx, rx) = oneshot::channel(); - // Check that the height and Merkle roots are valid. - let block = self.check_block(block)?; let height = block.height; let hash = block.hash; @@ -704,6 +790,140 @@ where Ok(req_block) } + /// Verify a checkpoint block whose precomputation (per-transaction txids and + /// auth data root) was already done concurrently by the caller, off this + /// single-threaded verifier. The verifier still performs all validity checks. + /// + /// This is the fast path used by the syncer: only the cheap checks and the + /// queue/commit bookkeeping run here, while the expensive precomputation has + /// already happened across many concurrent download tasks. + pub(crate) fn call_precomputed( + &mut self, + block: CheckpointVerifiedBlock, + ) -> Pin> + Send + 'static>> + { + // Reset the verifier back to the state tip if requested + // (e.g. due to an error when committing a block to the state) + if let Ok(tip) = self.reset_receiver.try_recv() { + self.reset_progress(tip); + } + + // Immediately reject all incoming blocks that arrive after we've finished. + if let FinalCheckpoint = self.previous_checkpoint_height() { + return async { Err(VerifyCheckpointError::Finished) }.boxed(); + } + + let req_block = match timed_verify_phase!( + "zebra.consensus.queue_block.duration_seconds", + self.queue_precomputed_block(block) + ) { + Ok(req_block) => req_block, + Err(e) => return async { Err(e) }.boxed(), + }; + + self.verify_and_commit(req_block) + } + + /// Process a queued checkpoint block: advance checkpoint-range verification + /// and spawn the task that commits the block to the state once its range is + /// verified. Shared by the [`Service`] and precomputed entry points. + fn verify_and_commit( + &mut self, + req_block: RequestBlock, + ) -> Pin> + Send + 'static>> + { + timed_verify_phase!( + "zebra.consensus.process_range.duration_seconds", + self.process_checkpoint_range() + ); + + metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64); + + // Because the checkpoint verifier duplicates state from the state + // service (it tracks which checkpoints have been verified), we must + // commit blocks transactionally on a per-checkpoint basis. Otherwise, + // the checkpoint verifier's state could desync from the underlying + // state service. Among other problems, this could cause the checkpoint + // verifier to reject blocks not already in the state as + // already-verified. + // + // # Dropped Receivers + // + // To commit blocks transactionally on a per-checkpoint basis, we must + // commit all verified blocks in a checkpoint range, regardless of + // whether or not the response futures for each block were dropped. + // + // We accomplish this by spawning a new task containing the + // commit-if-verified logic. This task will always execute, except if + // the program is interrupted, in which case there is no longer a + // checkpoint verifier to keep in sync with the state. + // + // # State Commit Failures + // + // If the state commit fails due to corrupt block data, + // we don't reject the entire checkpoint. + // Instead, we reset the verifier to the successfully committed state tip. + let state_service = self.state_service.clone(); + let commit_checkpoint_verified = tokio::spawn(async move { + let hash = req_block + .rx + .await + .map_err(Into::into) + .map_err(VerifyCheckpointError::CommitCheckpointVerified) + .expect("CheckpointVerifier does not leave dangling receivers")?; + + // We use a `ServiceExt::oneshot`, so that every state service + // `poll_ready` has a corresponding `call`. See #1593. + match state_service + .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block)) + .map_err(VerifyCheckpointError::CommitCheckpointVerified) + .await? + { + zs::Response::Committed(committed_hash) => { + assert_eq!(committed_hash, hash, "state must commit correct hash"); + Ok(hash) + } + _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"), + } + }); + + let state_service = self.state_service.clone(); + let reset_sender = self.reset_sender.clone(); + async move { + let result = commit_checkpoint_verified.await; + // Avoid a panic on shutdown + // + // When `zebrad` is terminated using Ctrl-C, the `commit_checkpoint_verified` task + // can return a `JoinError::Cancelled`. We expect task cancellation on shutdown, + // so we don't need to panic here. The persistent state is correct even when the + // task is cancelled, because block data is committed inside transactions, in + // height order. + let result = if zebra_chain::shutdown::is_shutting_down() { + Err(VerifyCheckpointError::ShuttingDown) + } else { + result.expect("commit_checkpoint_verified should not panic") + }; + if result.is_err() { + // If there was an error committing the block, then this verifier + // will be out of sync with the state. In that case, reset + // its progress back to the state tip. + let tip = match state_service + .oneshot(zs::Request::Tip) + .await + .map_err(VerifyCheckpointError::Tip)? + { + zs::Response::Tip(tip) => tip, + _ => unreachable!("wrong response for Tip"), + }; + // Ignore errors since send() can fail only when the verifier + // is being dropped, and then it doesn't matter anymore. + let _ = reset_sender.send(tip); + } + result + } + .boxed() + } + /// During checkpoint range processing, process all the blocks at `height`. /// /// Returns the first valid block. If there is no valid block, returns None. @@ -1086,97 +1306,18 @@ where return async { Err(VerifyCheckpointError::Finished) }.boxed(); } - let req_block = match self.queue_block(block) { + // `queue_block` (which runs `check_block`) and `process_checkpoint_range` + // are the synchronous, single-threaded work this verifier does on the + // tower `Buffer` worker for every block — i.e. the serial feed stage + // between the parallel downloads and the serial committer. + let req_block = match timed_verify_phase!( + "zebra.consensus.queue_block.duration_seconds", + self.queue_block(block) + ) { Ok(req_block) => req_block, Err(e) => return async { Err(e) }.boxed(), }; - self.process_checkpoint_range(); - - metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64); - - // Because the checkpoint verifier duplicates state from the state - // service (it tracks which checkpoints have been verified), we must - // commit blocks transactionally on a per-checkpoint basis. Otherwise, - // the checkpoint verifier's state could desync from the underlying - // state service. Among other problems, this could cause the checkpoint - // verifier to reject blocks not already in the state as - // already-verified. - // - // # Dropped Receivers - // - // To commit blocks transactionally on a per-checkpoint basis, we must - // commit all verified blocks in a checkpoint range, regardless of - // whether or not the response futures for each block were dropped. - // - // We accomplish this by spawning a new task containing the - // commit-if-verified logic. This task will always execute, except if - // the program is interrupted, in which case there is no longer a - // checkpoint verifier to keep in sync with the state. - // - // # State Commit Failures - // - // If the state commit fails due to corrupt block data, - // we don't reject the entire checkpoint. - // Instead, we reset the verifier to the successfully committed state tip. - let state_service = self.state_service.clone(); - let commit_checkpoint_verified = tokio::spawn(async move { - let hash = req_block - .rx - .await - .map_err(Into::into) - .map_err(VerifyCheckpointError::CommitCheckpointVerified) - .expect("CheckpointVerifier does not leave dangling receivers")?; - - // We use a `ServiceExt::oneshot`, so that every state service - // `poll_ready` has a corresponding `call`. See #1593. - match state_service - .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block)) - .map_err(VerifyCheckpointError::CommitCheckpointVerified) - .await? - { - zs::Response::Committed(committed_hash) => { - assert_eq!(committed_hash, hash, "state must commit correct hash"); - Ok(hash) - } - _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"), - } - }); - - let state_service = self.state_service.clone(); - let reset_sender = self.reset_sender.clone(); - async move { - let result = commit_checkpoint_verified.await; - // Avoid a panic on shutdown - // - // When `zebrad` is terminated using Ctrl-C, the `commit_checkpoint_verified` task - // can return a `JoinError::Cancelled`. We expect task cancellation on shutdown, - // so we don't need to panic here. The persistent state is correct even when the - // task is cancelled, because block data is committed inside transactions, in - // height order. - let result = if zebra_chain::shutdown::is_shutting_down() { - Err(VerifyCheckpointError::ShuttingDown) - } else { - result.expect("commit_checkpoint_verified should not panic") - }; - if result.is_err() { - // If there was an error committing the block, then this verifier - // will be out of sync with the state. In that case, reset - // its progress back to the state tip. - let tip = match state_service - .oneshot(zs::Request::Tip) - .await - .map_err(VerifyCheckpointError::Tip)? - { - zs::Response::Tip(tip) => tip, - _ => unreachable!("wrong response for Tip"), - }; - // Ignore errors since send() can fail only when the verifier - // is being dropped, and then it doesn't matter anymore. - let _ = reset_sender.send(tip); - } - result - } - .boxed() + self.verify_and_commit(req_block) } } diff --git a/zebra-consensus/src/router.rs b/zebra-consensus/src/router.rs index 904fa5bcdc5..e3337e42097 100644 --- a/zebra-consensus/src/router.rs +++ b/zebra-consensus/src/router.rs @@ -192,6 +192,17 @@ where } fn call(&mut self, request: Request) -> Self::Future { + // A precomputed checkpoint block is, by construction, below the + // checkpoint height; route it straight to the checkpoint verifier's + // fast path (which skips the now-already-done precomputation). + if let Request::CommitCheckpointPrecomputed(block) = request { + return self + .checkpoint + .call_precomputed(block) + .map_err(Into::into) + .boxed(); + } + let block = request.block(); match block.coinbase_height() { diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index 59db414713a..0e41b022871 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -26,6 +26,11 @@ progress-bar = [ # Indexes spending transaction ids by spent outpoints and revealed nullifiers indexer = [] +# Emit fine-grained per-block commit-phase timing histograms (zebra.state.write.*), +# used to profile and benchmark the finalized-state commit path. Off by default so +# the timing has zero overhead in production builds; enable for perf investigations. +commit-metrics = [] + # Test-only features proptest-impl = [ "proptest", diff --git a/zebra-state/src/arbitrary.rs b/zebra-state/src/arbitrary.rs index 1dc9b7ce33d..73ff08de28d 100644 --- a/zebra-state/src/arbitrary.rs +++ b/zebra-state/src/arbitrary.rs @@ -98,6 +98,7 @@ impl ContextuallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change: _, + auth_data_root: _, } = block.into(); Self { diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 865a0380f47..753787ccc10 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -10,7 +10,11 @@ use std::{ use tower::{BoxError, Service, ServiceExt}; use zebra_chain::{ amount::{DeferredPoolBalanceChange, NegativeAllowed}, - block::{self, Block, HeightDiff}, + block::{ + self, + merkle::{AuthDataRoot, AUTH_DIGEST_PLACEHOLDER}, + Block, HeightDiff, + }, diagnostic::{task::WaitForPanics, CodeTimer}, history_tree::HistoryTree, orchard, @@ -260,6 +264,14 @@ pub struct SemanticallyVerifiedBlock { pub transaction_hashes: Arc<[transaction::Hash]>, /// This block's deferred pool value balance change. pub deferred_pool_balance_change: Option, + /// The precomputed ZIP-244 authorizing-data commitment root for this block, + /// if it was computed during verification. + /// + /// The checkpoint verifier sets this (it runs with high concurrency, ahead + /// of the single-threaded finalized committer) so the committer does not + /// have to recompute the per-transaction auth digests on its critical path. + /// `None` means "not precomputed"; the committer falls back to computing it. + pub auth_data_root: Option, } /// A block ready to be committed directly to the finalized state with @@ -491,6 +503,7 @@ impl ContextuallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change, + auth_data_root: _, } = semantically_verified; // This is redundant for the non-finalized state, @@ -542,7 +555,25 @@ impl SemanticallyVerifiedBlock { let height = block .coinbase_height() .expect("semantically verified block should have a coinbase height"); - let transaction_hashes: Arc<[_]> = block.transactions.iter().map(|tx| tx.hash()).collect(); + // Compute each transaction's txid and ZIP-244 auth digest together, + // sharing the single (expensive) librustzcash conversion that dominates + // the cost on heavy shielded transactions, instead of computing the txid + // here and re-converting the same transactions for the auth data root + // later on the commit path. The auth digest is nearly free once the txid + // has been computed. + let (transaction_hashes, auth_digests): (Vec<_>, Vec<_>) = { + use rayon::prelude::*; + block + .transactions + .par_iter() + .map(|tx| tx.txid_and_auth_digest()) + .unzip() + }; + let transaction_hashes: Arc<[_]> = transaction_hashes.into(); + let auth_data_root = auth_digests + .into_iter() + .map(|auth_digest| auth_digest.unwrap_or(AUTH_DIGEST_PLACEHOLDER)) + .collect::(); let new_outputs = transparent::new_ordered_outputs(&block, &transaction_hashes); Self { @@ -552,6 +583,7 @@ impl SemanticallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change: None, + auth_data_root: Some(auth_data_root), } } @@ -577,7 +609,25 @@ impl From> for SemanticallyVerifiedBlock { let height = block .coinbase_height() .expect("semantically verified block should have a coinbase height"); - let transaction_hashes: Arc<[_]> = block.transactions.iter().map(|tx| tx.hash()).collect(); + // Compute each transaction's txid and ZIP-244 auth digest together, + // sharing the single (expensive) librustzcash conversion that dominates + // the cost on heavy shielded transactions, instead of computing the txid + // here and re-converting the same transactions for the auth data root + // later on the commit path. The auth digest is nearly free once the txid + // has been computed. + let (transaction_hashes, auth_digests): (Vec<_>, Vec<_>) = { + use rayon::prelude::*; + block + .transactions + .par_iter() + .map(|tx| tx.txid_and_auth_digest()) + .unzip() + }; + let transaction_hashes: Arc<[_]> = transaction_hashes.into(); + let auth_data_root = auth_digests + .into_iter() + .map(|auth_digest| auth_digest.unwrap_or(AUTH_DIGEST_PLACEHOLDER)) + .collect::(); let new_outputs = transparent::new_ordered_outputs(&block, &transaction_hashes); Self { @@ -587,6 +637,7 @@ impl From> for SemanticallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change: None, + auth_data_root: Some(auth_data_root), } } } @@ -602,6 +653,7 @@ impl From for SemanticallyVerifiedBlock { deferred_pool_balance_change: Some(DeferredPoolBalanceChange::new( valid.chain_value_pool_change.deferred_amount(), )), + auth_data_root: None, } } } @@ -615,6 +667,7 @@ impl From for SemanticallyVerifiedBlock { new_outputs: finalized.new_outputs, transaction_hashes: finalized.transaction_hashes, deferred_pool_balance_change: finalized.deferred_pool_balance_change, + auth_data_root: None, } } } diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 3e3b7c3a481..6cbdf6ae937 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -116,6 +116,7 @@ impl From for ChainTipBlock { new_outputs: _, transaction_hashes, deferred_pool_balance_change: _, + auth_data_root: _, } = prepared; Self { diff --git a/zebra-state/src/service/check.rs b/zebra-state/src/service/check.rs index bc590689c01..119009cb658 100644 --- a/zebra-state/src/service/check.rs +++ b/zebra-state/src/service/check.rs @@ -5,7 +5,9 @@ use std::{borrow::Borrow, sync::Arc}; use chrono::Duration; use zebra_chain::{ - block::{self, Block, ChainHistoryBlockTxAuthCommitmentHash, CommitmentError}, + block::{ + self, merkle::AuthDataRoot, Block, ChainHistoryBlockTxAuthCommitmentHash, CommitmentError, + }, history_tree::HistoryTree, parameters::{Network, NetworkUpgrade}, work::difficulty::CompactDifficulty, @@ -138,6 +140,7 @@ pub(crate) fn block_commitment_is_valid_for_chain_history( block: Arc, network: &Network, history_tree: &HistoryTree, + precomputed_auth_data_root: Option, ) -> Result<(), ValidateContextError> { match block.commitment(network)? { block::Commitment::PreSaplingReserved(_) @@ -200,7 +203,12 @@ pub(crate) fn block_commitment_is_valid_for_chain_history( "the history tree of the previous block must exist \ since the current block has a ChainHistoryBlockTxAuthCommitment", ); - let auth_data_root = block.auth_data_root(); + // Use the auth data root precomputed by the verifier when available + // (it is byte-identical to recomputing it here), so the committer + // does not repeat the per-transaction auth-digest work on its + // single-threaded critical path. + let auth_data_root = + precomputed_auth_data_root.unwrap_or_else(|| block.auth_data_root()); let hash_block_commitments = ChainHistoryBlockTxAuthCommitmentHash::from_commitments( &history_tree_root, diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 695c95769fe..509aa277409 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -18,11 +18,13 @@ use std::{ io::{stderr, stdout, Write}, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, LazyLock, }, }; -use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network}; +use zebra_chain::{ + block, history_tree::HistoryTree, parallel::tree::NoteCommitmentTrees, parameters::Network, +}; use zebra_db::{ block::RetentionPlan, chain::BLOCK_INFO, @@ -37,6 +39,77 @@ use crate::{ CheckpointVerifiedBlock, Config, ValidateContextError, }; +/// Times `$body` and records its duration to the named histogram when the +/// `commit-metrics` feature is enabled; otherwise just evaluates `$body` with +/// zero overhead. Used to profile the finalized-state commit phases. +macro_rules! timed_commit_phase { + ($name:expr, $body:expr) => {{ + #[cfg(feature = "commit-metrics")] + let _phase_start = std::time::Instant::now(); + let _phase_result = $body; + #[cfg(feature = "commit-metrics")] + metrics::histogram!($name).record(_phase_start.elapsed().as_secs_f64()); + _phase_result + }}; +} + +/// A dedicated rayon thread pool for checkpoint-commit treestate computation: +/// the note-commitment tree update and the ZIP-244 auth-data-root commitment +/// check, which run on the single finalized-writer thread and dominate the +/// per-block commit cost on heavy shielded blocks. +/// +/// Both operations are internally parallel (rayon), but on the global rayon +/// pool they contend with each other *and* with the block download/verification +/// pipeline (equihash, batch signature/proof verification), which left the +/// tree-update burst running at only ~1.6 effective cores in-node despite +/// scaling ~7x in isolation. Running them inside a dedicated pool gives the +/// commit-stage crypto its own workers, isolated from the verifier's global-pool +/// work, so a commit burst can use the otherwise-idle cores. +/// +/// Sized to all available cores: the commit burst and the download/verify feed +/// alternate (when committing, the verifier is mostly idle), so the burst should +/// claim the cores rather than permanently oversubscribing the verifier. +static COMMIT_COMPUTE_POOL: LazyLock = LazyLock::new(|| { + let threads = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4); + rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .thread_name(|i| format!("commit-compute-{i}")) + .build() + .expect("rayon thread pool configuration is valid") +}); + +/// A finalizable block whose treestate has been computed by +/// [`FinalizedState::compute_finalized`] and is ready to be written to disk by +/// [`FinalizedState::write_finalized`]. +/// +/// Splitting compute from write lets the writer pipeline the two: the ordered, +/// chained treestate computation (Stage A) can run for the next block while the +/// previous block's batch construction + db write (Stage B) is still in flight. +pub(crate) struct ComputedFinalized { + height: block::Height, + hash: block::Hash, + finalized: FinalizedBlock, + prev_note_commitment_trees: Option, + retention: RetentionPlan, +} + +impl ComputedFinalized { + /// The note commitment trees *after* this block, for threading in memory to + /// the next block's [`FinalizedState::compute_finalized`] in a pipelined + /// writer (so the next block does not re-read a not-yet-written tip). + pub(crate) fn threaded_note_commitment_trees(&self) -> NoteCommitmentTrees { + self.finalized.treestate.note_commitment_trees.clone() + } + + /// The history tree *after* this block, for threading in memory to the next + /// block's [`FinalizedState::compute_finalized`] in a pipelined writer. + pub(crate) fn threaded_history_tree(&self) -> Arc { + self.finalized.treestate.history_tree.clone() + } +} + pub mod column_family; mod disk_db; @@ -482,13 +555,32 @@ impl FinalizedState { &mut self, ordered_block: QueuedCheckpointVerified, prev_note_commitment_trees: Option, + ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), CommitCheckpointVerifiedError> { + let computed_result = + self.compute_finalized(ordered_block.0.clone().into(), prev_note_commitment_trees, None); + self.finish_pipelined(computed_result, ordered_block) + } + + /// Finish committing a [`Self::compute_finalized`]d block: write it to disk, + /// update commit metrics, and send the result on the request's response + /// channel. Returns the block and its updated note commitment trees. + /// + /// This is the second half of [`Self::commit_finalized`], split out so a + /// pipelined writer can run [`Self::compute_finalized`] (the ordered, chained + /// treestate computation) on a separate thread, *ahead* of this disk write. + /// + /// `computed_result` is the (possibly failed) output of + /// [`Self::compute_finalized`] for `ordered_block`. The caller must compute + /// results in strict height order and pass them here in that same order. + pub(crate) fn finish_pipelined( + &mut self, + computed_result: Result, + ordered_block: QueuedCheckpointVerified, ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), CommitCheckpointVerifiedError> { let (checkpoint_verified, rsp_tx) = ordered_block; - let result = self.commit_finalized_direct( - checkpoint_verified.clone().into(), - prev_note_commitment_trees, - "commit checkpoint-verified request", - ); + let result = computed_result.and_then(|computed| { + self.write_finalized(computed, "commit checkpoint-verified request") + }); if result.is_ok() { metrics::counter!("state.checkpoint.finalized.block.count").increment(1); @@ -512,26 +604,21 @@ impl FinalizedState { result.map(|(_hash, note_commitment_trees)| (checkpoint_verified, note_commitment_trees)) } - /// Immediately commit a `finalized` block to the finalized state. - /// - /// This can be called either by the non-finalized state (when finalizing - /// a block) or by the checkpoint verifier. + /// Compute the treestate for a finalizable block — the note-commitment tree + /// update, the ZIP-244 commitment check, and the history-tree push — without + /// writing anything to disk. The result is written by [`Self::write_finalized`]. /// - /// Use `source` as the source of the block in log messages. - /// - /// # Errors - /// - /// - Propagates any errors from writing to the DB - /// - Propagates any errors from updating history and note commitment trees - /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments` - /// does not match the expected value + /// `prev_history_tree` lets the caller thread the parent block's history tree + /// in memory (as it already threads `prev_note_commitment_trees`), so a + /// pipelined writer does not re-read a stale tip. `None` reads the finalized + /// tip's history tree from the database (the non-pipelined behaviour). #[allow(clippy::unwrap_in_result)] - pub fn commit_finalized_direct( - &mut self, + pub(crate) fn compute_finalized( + &self, finalizable_block: FinalizableBlock, prev_note_commitment_trees: Option, - source: &str, - ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> { + prev_history_tree: Option>, + ) -> Result { let (height, hash, finalized, prev_note_commitment_trees, retention) = match finalizable_block { FinalizableBlock::Checkpoint { @@ -544,51 +631,89 @@ impl FinalizedState { // finalized tip is the parent block of the block being committed. let block = checkpoint_verified.block.clone(); - let mut history_tree = self.db.history_tree(); + // Auth data root precomputed by the checkpoint verifier (if any), + // so the commitment check below doesn't recompute it here on the + // single-threaded committer. `AuthDataRoot` is `Copy`. + let precomputed_auth_data_root = checkpoint_verified.auth_data_root; + // Use the threaded parent history tree if provided (pipelined + // writer), otherwise read it from the finalized tip. + let mut history_tree = + prev_history_tree.unwrap_or_else(|| self.db.history_tree()); let prev_note_commitment_trees = prev_note_commitment_trees .unwrap_or_else(|| self.db.note_commitment_trees_for_tip()); - // Update the note commitment trees. let mut note_commitment_trees = prev_note_commitment_trees.clone(); - note_commitment_trees - .update_trees_parallel(&block) - .map_err(ValidateContextError::from)?; + let network = self.network(); - // Check the block commitment if the history tree was not - // supplied by the non-finalized state. Note that we don't do - // this check for history trees supplied by the non-finalized - // state because the non-finalized state checks the block - // commitment. + // Run two independent CPU-intensive crypto operations concurrently + // on the rayon pool (Part 1 of the checkpoint-commit parallelization): // - // For Nu5-onward, the block hash commits only to - // non-authorizing data (see ZIP-244). This checks the - // authorizing data commitment, making sure the entire block - // contents were committed to. The test is done here (and not - // during semantic validation) because it needs the history tree - // root. While it _is_ checked during contextual validation, - // that is not called by the checkpoint verifier, and keeping a - // history tree there would be harder to implement. + // - updating the note commitment trees, and + // - checking this block's commitment against the *parent* history tree. // - // TODO: run this CPU-intensive cryptography in a parallel rayon - // thread, if it shows up in profiles - check::block_commitment_is_valid_for_chain_history( - block.clone(), - &self.network(), - &history_tree, - )?; - - // Update the history tree. + // These are independent: the commitment check reads only the parent + // history tree (not this block's note commitment trees), and the + // history tree push below depends on both, so it runs after the join. // - // TODO: run this CPU-intensive cryptography in a parallel rayon - // thread, if it shows up in profiles + // The commitment check is done here (and not during semantic + // validation) because it needs the history tree root, and the + // checkpoint verifier doesn't run contextual validation. For + // Nu5-onward the block hash commits only to non-authorizing data + // (ZIP-244), so this verifies the authorizing-data commitment. + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.block_tx_count") + .record(block.transactions.len() as f64); + #[cfg(feature = "commit-metrics")] + let _ckpt_compute = std::time::Instant::now(); + let mut commitment_result = None; + // Run the two CPU-intensive operations inside the dedicated + // commit-compute pool (see `COMMIT_COMPUTE_POOL`) so their + // nested rayon work uses isolated workers instead of + // contending with the verifier on the global pool. + let tree_result = COMMIT_COMPUTE_POOL.install(|| { + rayon::in_place_scope_fifo(|scope| { + scope.spawn_fifo(|_scope| { + commitment_result = Some(timed_commit_phase!( + "zebra.state.write.commitment_check.duration_seconds", + check::block_commitment_is_valid_for_chain_history( + block.clone(), + &network, + &history_tree, + precomputed_auth_data_root, + ) + )); + }); + + // Runs on the in-place thread so its own internal rayon scope + // (one task per note commitment tree) uses the pool directly. + timed_commit_phase!( + "zebra.state.write.update_trees.duration_seconds", + note_commitment_trees.update_trees_parallel(&block) + ) + }) + }); + + // Surface the tree-update error first, preserving the error + // precedence of the previous sequential code. + tree_result.map_err(ValidateContextError::from)?; + commitment_result.expect("scope has already finished")?; + + // Update the history tree (depends on both operations above). let history_tree_mut = Arc::make_mut(&mut history_tree); let sapling_root = note_commitment_trees.sapling.root(); let orchard_root = note_commitment_trees.orchard.root(); history_tree_mut - .push(&self.network(), block.clone(), &sapling_root, &orchard_root) + .push(&network, block.clone(), &sapling_root, &orchard_root) .map_err(Arc::new) .map_err(ValidateContextError::from)?; + // Total serial wall time of the checkpoint compute phase (note tree + // update + commitment check, then history push). Compared against the + // summed phase times, this shows the overlap win. + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.checkpoint_compute.duration_seconds") + .record(_ckpt_compute.elapsed().as_secs_f64()); + let treestate = Treestate { note_commitment_trees, history_tree, @@ -624,6 +749,34 @@ impl FinalizedState { } }; + Ok(ComputedFinalized { + height, + hash, + finalized, + prev_note_commitment_trees, + retention, + }) + } + + /// Write a [`Self::compute_finalized`]d block to disk and return its hash and + /// the updated note commitment trees. + /// + /// Asserts that the block is the child of the current finalized tip (so it + /// must be called in strict height order, which the writer guarantees). + #[allow(clippy::unwrap_in_result)] + fn write_finalized( + &mut self, + computed: ComputedFinalized, + source: &str, + ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> { + let ComputedFinalized { + height, + hash, + finalized, + prev_note_commitment_trees, + retention, + } = computed; + let committed_tip_hash = self.db.finalized_tip_hash(); let committed_tip_height = self.db.finalized_tip_height(); @@ -655,13 +808,22 @@ impl FinalizedState { let finalized_inner_block = finalized.block.clone(); let note_commitment_trees = finalized.treestate.note_commitment_trees.clone(); - let result = self.db.write_block( - finalized, - prev_note_commitment_trees, - &self.network(), - source, - retention, - ); + // Build and write the block's RocksDB batch inside the dedicated + // commit-compute pool. Like the note-commitment tree update above, the + // per-block serialization done here (raw transaction bytes and the block + // size) is parallelized with rayon; running it in the isolated pool keeps + // those workers from contending with the download/verification pipeline + // on the global pool. + let network = self.network(); + let result = COMMIT_COMPUTE_POOL.install(|| { + self.db.write_block( + finalized, + prev_note_commitment_trees, + &network, + source, + retention, + ) + }); if result.is_ok() { if retention.clears_archive_backlog() { @@ -697,6 +859,25 @@ impl FinalizedState { result.map(|hash| (hash, note_commitment_trees)) } + /// Immediately commit a `finalized` block to the finalized state, computing + /// its treestate and writing it to disk in one (non-pipelined) call. + /// + /// This can be called either by the non-finalized state (when finalizing + /// a block) or by the checkpoint verifier. + /// + /// Use `source` as the source of the block in log messages. + #[allow(clippy::unwrap_in_result)] + pub fn commit_finalized_direct( + &mut self, + finalizable_block: FinalizableBlock, + prev_note_commitment_trees: Option, + source: &str, + ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> { + let computed = + self.compute_finalized(finalizable_block, prev_note_commitment_trees, None)?; + self.write_finalized(computed, source) + } + #[cfg(feature = "elasticsearch")] /// Store finalized blocks into an elasticsearch database. /// diff --git a/zebra-state/src/service/finalized_state/tests/transparent.rs b/zebra-state/src/service/finalized_state/tests/transparent.rs index d6ca53b36f5..c10689d1868 100644 --- a/zebra-state/src/service/finalized_state/tests/transparent.rs +++ b/zebra-state/src/service/finalized_state/tests/transparent.rs @@ -128,6 +128,7 @@ fn intra_block_self_spend_chain_in_finalized_state() { new_outputs, transaction_hashes, deferred_pool_balance_change: None, + auth_data_root: None, }; let finalized = FinalizedBlock::from_checkpoint_verified( CheckpointVerifiedBlock(semantically_verified), diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 0108c7b02bd..79a68062ed8 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -40,6 +40,7 @@ use crate::{ disk_format::{ block::TransactionLocation, transparent::{AddressBalanceLocationUpdates, OutputLocation}, + IntoDisk, }, zebra_db::{metrics::block_precommit_metrics, ZebraDb}, FromDisk, RawBytes, PRUNING_METADATA, @@ -556,6 +557,11 @@ impl ZebraDb { source: &str, retention: RetentionPlan, ) -> Result { + // Time the DB reads + setup done before building the write batch (UTXO + // and address-balance lookups), to separate them from batch construction. + #[cfg(feature = "commit-metrics")] + let _reads_start = std::time::Instant::now(); + let tx_hash_indexes: HashMap = finalized .transaction_hashes .iter() @@ -670,9 +676,15 @@ impl ZebraDb { })) }; + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.batch_reads.duration_seconds") + .record(_reads_start.elapsed().as_secs_f64()); + let mut batch = DiskWriteBatch::new(); // In case of errors, propagate and do not write the batch. + #[cfg(feature = "commit-metrics")] + let _prepare_start = std::time::Instant::now(); batch.prepare_block_batch( self, network, @@ -687,6 +699,9 @@ impl ZebraDb { prev_note_commitment_trees, retention.stores_raw_transactions(), )?; + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.prepare_batch.duration_seconds") + .record(_prepare_start.elapsed().as_secs_f64()); // In pruned storage mode, delete raw transaction history that has fallen // outside the retention window, and/or advance the pruning marker. This @@ -970,8 +985,29 @@ impl DiskWriteBatch { ) -> Result<(), CommitCheckpointVerifiedError> { let db = &zebra_db.db; + // Per-sub-batch timing: build the RocksDB write batch (in memory) for each + // category of data. These scale with block weight and run serially on the + // single writer thread. Gated behind `commit-metrics` (zero prod overhead). + macro_rules! timed_subbatch { + ($name:expr, $body:expr) => {{ + #[cfg(feature = "commit-metrics")] + let _t = std::time::Instant::now(); + let _r = $body; + #[cfg(feature = "commit-metrics")] + metrics::histogram!($name).record(_t.elapsed().as_secs_f64()); + _r + }}; + } + // Commit block, transaction, and note commitment tree data. - self.prepare_block_header_and_transaction_data_batch(db, finalized, store_raw_transactions); + timed_subbatch!( + "zebra.state.write.batch.header_tx.duration_seconds", + self.prepare_block_header_and_transaction_data_batch( + db, + finalized, + store_raw_transactions + ) + ); // The consensus rules are silent on shielded transactions in the genesis block, // because there aren't any in the mainnet or testnet genesis blocks. @@ -979,8 +1015,14 @@ impl DiskWriteBatch { // which is already present from height 1 to the first shielded transaction. // // In Zebra we include the nullifiers and note commitments in the genesis block because it simplifies our code. - self.prepare_shielded_transaction_batch(zebra_db, finalized); - self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees); + timed_subbatch!( + "zebra.state.write.batch.shielded.duration_seconds", + self.prepare_shielded_transaction_batch(zebra_db, finalized) + ); + timed_subbatch!( + "zebra.state.write.batch.trees.duration_seconds", + self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees) + ); // # Consensus // @@ -994,29 +1036,38 @@ impl DiskWriteBatch { // aren't any of those on mainnet or testnet. if !finalized.height.is_min() { // Commit transaction indexes - self.prepare_transparent_transaction_batch( - zebra_db, - network, - finalized, - &new_outputs_by_out_loc, - &spent_utxos_by_outpoint, - &spent_utxos_by_out_loc, - #[cfg(feature = "indexer")] - &out_loc_by_outpoint, - address_balances, + timed_subbatch!( + "zebra.state.write.batch.transparent.duration_seconds", + self.prepare_transparent_transaction_batch( + zebra_db, + network, + finalized, + &new_outputs_by_out_loc, + &spent_utxos_by_outpoint, + &spent_utxos_by_out_loc, + #[cfg(feature = "indexer")] + &out_loc_by_outpoint, + address_balances, + ) ); } // Commit UTXOs and value pools - self.prepare_chain_value_pools_batch( - zebra_db, - finalized, - spent_utxos_by_outpoint, - value_pool, + timed_subbatch!( + "zebra.state.write.batch.value_pools.duration_seconds", + self.prepare_chain_value_pools_batch( + zebra_db, + finalized, + spent_utxos_by_outpoint, + value_pool, + ) )?; // The block has passed contextual validation, so update the metrics - block_precommit_metrics(&finalized.block, finalized.hash, finalized.height); + timed_subbatch!( + "zebra.state.write.batch.precommit_metrics.duration_seconds", + block_precommit_metrics(&finalized.block, finalized.hash, finalized.height) + ); Ok(()) } @@ -1115,18 +1166,30 @@ impl DiskWriteBatch { self.zs_insert(&hash_by_height, height, hash); self.zs_insert(&height_by_hash, hash, height); - for (transaction_index, (transaction, transaction_hash)) in block - .transactions - .iter() - .zip(transaction_hashes.iter()) - .enumerate() - { + // Serialize the raw transaction bytes in parallel: on heavy shielded + // blocks this serialization dominates the per-block write cost, and each + // transaction serializes independently. The result is byte-identical to + // inserting the transactions directly, because `RawBytes` is stored + // verbatim. The serialized bytes are inserted in height/index order below. + let raw_transactions: Vec = if store_raw_transactions { + use rayon::prelude::*; + block + .transactions + .par_iter() + .map(|transaction| RawBytes::new_raw_bytes(transaction.as_bytes())) + .collect() + } else { + Vec::new() + }; + + for (transaction_index, transaction_hash) in transaction_hashes.iter().enumerate() { let transaction_location = TransactionLocation::from_usize(*height, transaction_index); // Commit each transaction's raw bytes only when the storage policy - // keeps historical transaction data for this height. - if store_raw_transactions { - self.zs_insert(&tx_by_loc, transaction_location, transaction); + // keeps historical transaction data for this height (then + // `raw_transactions` holds the pre-serialized bytes in order). + if let Some(raw_transaction) = raw_transactions.get(transaction_index) { + self.zs_insert(&tx_by_loc, transaction_location, raw_transaction); } // Index each transaction hash and location diff --git a/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs b/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs index 22a8ef50be3..c657893da5e 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs @@ -139,6 +139,7 @@ fn test_block_db_round_trip_with( new_outputs, transaction_hashes, deferred_pool_balance_change: None, + auth_data_root: None, }) }; diff --git a/zebra-state/src/service/finalized_state/zebra_db/chain.rs b/zebra-state/src/service/finalized_state/zebra_db/chain.rs index 585f4b9edac..29a963e7f18 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/chain.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/chain.rs @@ -17,8 +17,13 @@ use std::{ }; use zebra_chain::{ - amount::NonNegative, block::Height, block_info::BlockInfo, history_tree::HistoryTree, - serialization::ZcashSerialize as _, transparent, value_balance::ValueBalance, + amount::NonNegative, + block::Height, + block_info::BlockInfo, + history_tree::HistoryTree, + serialization::{CompactSizeMessage, ZcashSerialize as _}, + transparent, + value_balance::ValueBalance, }; use crate::{ @@ -246,6 +251,7 @@ impl DiskWriteBatch { /// /// [`chain_value_pool_change`]: zebra_chain::block::Block::chain_value_pool_change /// [`add_chain_value_pool_change`]: ValueBalance::add_chain_value_pool_change + #[allow(clippy::unwrap_in_result)] pub fn prepare_chain_value_pools_batch( &mut self, db: &ZebraDb, @@ -286,11 +292,28 @@ impl DiskWriteBatch { .with_batch_for_writing(self) .zs_insert(&(), &new_value_pool); - // Get the block size to store with the BlockInfo. This is a bit wasteful - // since the block header and txs were serialized previously when writing - // them to the DB, and we could get the size if we modified the database - // code to return the size of data written; but serialization should be cheap. - let block_size = finalized.block.zcash_serialized_size(); + // Get the block size to store with the BlockInfo. + // + // `Block::zcash_serialized_size` walks the entire block's serialization + // on a single thread, which is a significant per-block cost on heavy + // shielded blocks (it re-traverses every transaction). Compute the same + // size, but sum the independent per-transaction sizes across the rayon + // pool. This is byte-count-identical to serializing the block: + // size = header + CompactSize(tx_count) + sum(transaction sizes). + let block_size = { + use rayon::prelude::*; + + let transactions = &finalized.block.transactions; + let transactions_size: usize = transactions + .par_iter() + .map(|transaction| transaction.zcash_serialized_size()) + .sum(); + let tx_count_size = CompactSizeMessage::try_from(transactions.len()) + .expect("block must have a valid transaction count") + .zcash_serialized_size(); + + finalized.block.header.zcash_serialized_size() + tx_count_size + transactions_size + }; let _ = db.block_info_cf().with_batch_for_writing(self).zs_insert( &finalized.height, diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index 813484b46f4..a2b676a9896 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -634,6 +634,8 @@ impl NonFinalizedState { block, &network, &history_tree, + // The non-finalized path doesn't precompute the auth data root. + None, )); }); diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs index 530b1886b1f..bb7a1e88c48 100644 --- a/zebra-state/src/service/write.rs +++ b/zebra-state/src/service/write.rs @@ -12,13 +12,18 @@ use tokio::sync::{ }; use tracing::Span; -use zebra_chain::block::{self, Height}; +use zebra_chain::{ + block::{self, Height}, + history_tree::HistoryTree, + parallel::tree::NoteCommitmentTrees, +}; use crate::{ constants::MAX_BLOCK_REORG_HEIGHT, + error::CommitCheckpointVerifiedError, service::{ check, - finalized_state::{FinalizedState, ZebraDb}, + finalized_state::{ComputedFinalized, FinalizedState, ZebraDb}, non_finalized_state::NonFinalizedState, queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified}, ChainTipBlock, ChainTipSender, InvalidateError, ReconsiderError, @@ -273,9 +278,138 @@ impl WriteBlockWorkerTask { let mut prev_finalized_note_commitment_trees = None; - // Write all the finalized blocks sent by the state, - // until the state closes the finalized block channel's sender. - while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() { + // === Any-order commit pipeline (finalized / checkpoint phase) === + // + // The per-block finalized commit splits into two halves (see + // `FinalizedState::compute_finalized` and `finish_pipelined`): + // + // * Stage A (compute) — the ordered, chained treestate computation: + // note-commitment tree update, ZIP-244 commitment check, history-tree + // push. Block N+1 needs block N's trees, so this stays strictly + // ordered, but it does not depend on block N's *disk write*. + // * Stage B (write) — block batch construction + RocksDB write + chain + // tip update (this thread). + // + // Running Stage A on its own thread lets it compute block N+1 while Stage + // B is still writing block N, overlapping the next block's tree chain with + // the previous block's serialization + write. The treestate (note + + // history trees) is threaded in memory from each Stage A iteration to the + // next, so the compute never reads a not-yet-written tip. Canonical tip + // advancement (`set_finalized_tip`) stays in Stage B, in height order. + + // Take ownership of the finalized receiver for the compute thread. The + // non-finalized phase below does not use it, so a dummy is left in place. + let finalized_block_write_receiver = { + let (_dummy_tx, dummy_rx) = tokio::sync::mpsc::unbounded_channel(); + std::mem::replace(finalized_block_write_receiver, dummy_rx) + }; + + // Bounded so Stage A runs at most a few blocks ahead of Stage B: back + // pressure caps memory and keeps the two stages roughly in lockstep. + const FINALIZED_PIPELINE_DEPTH: usize = 3; + type PipelinedBlock = ( + Result, + QueuedCheckpointVerified, + ); + let (computed_sender, computed_receiver) = + std::sync::mpsc::sync_channel::(FINALIZED_PIPELINE_DEPTH); + + // Set by Stage B on a write error: tells Stage A to drop its in-memory + // threaded trees and re-seed from the (reset) finalized tip. + let reset_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // --- Stage A: compute thread --- + let stage_a_state = finalized_state.clone(); + let stage_a_reset_flag = Arc::clone(&reset_flag); + let stage_a = std::thread::Builder::new() + .name("finalized-compute".to_string()) + .spawn(move || { + let mut finalized_block_write_receiver = finalized_block_write_receiver; + let mut prev_note_commitment_trees: Option = None; + let mut prev_history_tree: Option> = None; + + // The next height we expect to compute. The db tip lags Stage B, + // so we track this in memory rather than reading the db tip. + let mut next_compute_height = stage_a_state + .db + .finalized_tip_height() + .map(|height| (height + 1).expect("committed heights are valid")) + .unwrap_or(Height(0)); + + loop { + let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() else { + break; + }; + + // A Stage B write error resets the finalized tip: drop stale + // threaded trees and re-seed the expected height from the db. + if stage_a_reset_flag.swap(false, std::sync::atomic::Ordering::AcqRel) { + prev_note_commitment_trees = None; + prev_history_tree = None; + next_compute_height = stage_a_state + .db + .finalized_tip_height() + .map(|height| (height + 1).expect("committed heights are valid")) + .unwrap_or(Height(0)); + } + + // Drop children of a failed block (wrong height). Stage B + // re-checks the height against the real db tip before writing. + if ordered_block.0.height != next_compute_height { + std::mem::drop(ordered_block); + continue; + } + + #[cfg(feature = "commit-metrics")] + let _compute_start = std::time::Instant::now(); + let computed_result = stage_a_state.compute_finalized( + ordered_block.0.clone().into(), + prev_note_commitment_trees.take(), + prev_history_tree.take(), + ); + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.compute.duration_seconds") + .record(_compute_start.elapsed().as_secs_f64()); + + match &computed_result { + Ok(computed) => { + // Thread this block's trees to the next compute. + prev_note_commitment_trees = + Some(computed.threaded_note_commitment_trees()); + prev_history_tree = Some(computed.threaded_history_tree()); + next_compute_height = + (next_compute_height + 1).expect("committed heights are valid"); + } + Err(_) => { + // Stop threading; Stage B sends a reset and we re-seed + // from the db tip on the next iteration. + prev_note_commitment_trees = None; + prev_history_tree = None; + } + } + + if computed_sender.send((computed_result, ordered_block)).is_err() { + // Stage B is gone (shutting down). + break; + } + } + }) + .expect("failed to spawn the finalized-compute thread"); + + // --- Stage B: write loop (this thread) --- + // Write all the finalized blocks computed by Stage A, until Stage A closes + // the channel (because the state closed the finalized block sender). + loop { + // Time spent waiting for Stage A's next computed block (writer idle). + #[cfg(feature = "commit-metrics")] + let _wait_start = std::time::Instant::now(); + let Ok((computed_result, ordered_block)) = computed_receiver.recv() else { + break; + }; + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.wait.duration_seconds") + .record(_wait_start.elapsed().as_secs_f64()); + // TODO: split these checks into separate functions if invalid_block_reset_sender.is_closed() { @@ -283,12 +417,12 @@ impl WriteBlockWorkerTask { return; } - // Discard any children of invalid blocks in the channel + // Discard any children of invalid blocks. // - // `commit_finalized()` requires blocks in height order. - // So if there has been a block commit error, - // we need to drop all the descendants of that block, - // until we receive a block at the required next height. + // `finish_pipelined()` requires blocks in height order. So if there + // has been a block commit error, we need to drop all the descendants + // of that block, until we receive a block at the required next height. + // (This also covers blocks Stage A computed before it saw the reset.) let next_valid_height = finalized_state .db .finalized_tip_height() @@ -305,20 +439,29 @@ impl WriteBlockWorkerTask { ); // We don't want to send a reset here, because it could overwrite a valid sent hash - std::mem::drop(ordered_block); + std::mem::drop((computed_result, ordered_block)); continue; } - // Try committing the block - match finalized_state - .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take()) - { + // Try committing the block (write half only; compute is already done). + #[cfg(feature = "commit-metrics")] + let _busy_start = std::time::Instant::now(); + let commit_result = finalized_state.finish_pipelined(computed_result, ordered_block); + #[cfg(feature = "commit-metrics")] + metrics::histogram!("zebra.state.write.busy.duration_seconds") + .record(_busy_start.elapsed().as_secs_f64()); + match commit_result { Ok((finalized, note_commitment_trees)) => { let tip_block = ChainTipBlock::from(finalized); + // Kept for the handoff to the non-finalized phase below, which + // finalizes its first block from the last finalized trees. prev_finalized_note_commitment_trees = Some(note_commitment_trees); chain_tip_sender.set_finalized_tip(tip_block); } Err(error) => { + // Tell Stage A to drop its threaded trees and re-seed. + reset_flag.store(true, std::sync::atomic::Ordering::Release); + let finalized_tip = finalized_state.db.tip(); // The last block in the queue failed, so we can't commit the next block. @@ -344,6 +487,9 @@ impl WriteBlockWorkerTask { } } + // Stage A has finished (it closed the channel); join it before moving on. + let _ = stage_a.join(); + // Do this check even if the channel got closed before any finalized blocks were sent. // This can happen if we're past the finalized tip. if invalid_block_reset_sender.is_closed() { diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index ceb7d89befc..a1592d3af77 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -62,6 +62,9 @@ default = ["default-release-binaries"] # Indexer support indexer = ["zebra-state/indexer"] +# Per-block commit-phase timing histograms (zebra.state.write.*), for perf profiling +commit-metrics = ["zebra-state/commit-metrics", "zebra-consensus/commit-metrics"] + # Experimental internal miner support internal-miner = [ "thread-priority", diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 34230cca10c..59ee797040e 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -436,17 +436,30 @@ where }; let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp { - assert_eq!( - blocks.len(), - 1, - "wrong number of blocks in response to a single hash" - ); - - blocks - .first() - .expect("just checked length") - .available() - .expect("unexpected missing block status: single block failures should be errors") + // A well-behaved peer returns exactly one available block in response + // to a single-hash request. Treat a zero/multi-block response, or an + // unavailable block status, as a retryable download failure instead of + // panicking: a remote peer must not be able to crash the node. + let single_block = if blocks.len() == 1 { + blocks.first().and_then(|b| b.available()) + } else { + None + }; + + match single_block { + Some(block_and_advertiser) => block_and_advertiser, + None => { + return Err(BlockDownloadVerifyError::DownloadFailed { + error: format!( + "peer returned {} block(s), or an unavailable block, \ + for a single-hash request", + blocks.len() + ) + .into(), + hash, + }); + } + } } else { unreachable!("wrong response to block request"); }; @@ -569,6 +582,26 @@ where Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?; } + // For checkpoint-height blocks, precompute the checkpoint-verified + // block (the per-transaction txids and auth data root, which + // dominate the cost on heavy shielded blocks) here, off the + // single-threaded checkpoint verifier. Each download task runs + // concurrently, so many blocks' precomputation overlaps instead of + // serializing on the verifier. Above the checkpoint height, the + // semantic verifier needs the raw block, so send it unchanged. + let request = if block_height <= max_checkpoint_height { + let checkpoint_block = tokio::task::spawn_blocking(move || { + let hash = block.hash(); + zs::CheckpointVerifiedBlock::with_hash(block, hash) + }) + .await + .expect("checkpoint block precomputation should not panic"); + + zebra_consensus::Request::CommitCheckpointPrecomputed(checkpoint_block) + } else { + zebra_consensus::Request::Commit(block) + }; + // Wait for the verifier service to be ready. let readiness = verifier.ready(); // Prefer the cancel handle if both are ready. @@ -586,7 +619,7 @@ where let verify_start = std::time::Instant::now(); let mut rsp = verifier .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })? - .call(zebra_consensus::Request::Commit(block)).boxed(); + .call(request).boxed(); // Add a shorter timeout to workaround a known bug (#5125) let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");