Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c8ab5fb
perf(state): parallelize per-block serialization in the finalized blo…
p0mvn Jun 19, 2026
23b6d3c
perf(state): parallelize and de-duplicate the committer's UTXO/addres…
p0mvn Jun 19, 2026
e5ed735
perf(network): add the tree_aux stream wire codec and serving Request…
p0mvn Jun 22, 2026
353ca09
perf(network): exchange commitment roots between two peers over tree_…
p0mvn Jun 22, 2026
9f7cf56
perf(network): add the tree_aux client driver (fetch_roots) with a tw…
p0mvn Jun 22, 2026
991fed2
perf(zebrad): fast-sync commitment roots from peers via tree_aux (#188)
p0mvn Jun 22, 2026
87ac698
test(zebrad): integration-test tree_aux serving over the wire; add a …
p0mvn Jun 22, 2026
3496845
add response and message bounds
p0mvn Jun 22, 2026
10e8c15
fix(state): roll back the Zakura header store with the body chain (#198)
p0mvn Jun 22, 2026
b919b40
test(state): cover delete_zakura_headers_above truncation; fix its ru…
p0mvn Jun 22, 2026
f6ba763
fix(zebrad): start tree_aux root fetch at the verified tip, not genes…
p0mvn Jun 22, 2026
2f8e081
fix(network): enforce tree_aux response message cap (#240)
p0mvn Jun 23, 2026
acfcda6
fix(zebrad): serve the aligned tree-aux root prefix when roots lag he…
evan-forbes Jun 25, 2026
5506b00
refactor!: remove the separate tree_aux fetch stream
evan-forbes Jun 25, 2026
a4ff07e
fix(zakura): handle incomplete header roots
evan-forbes Jun 25, 2026
6da753e
propagate and debug log errors for observability
p0mvn Jun 25, 2026
a66631c
fix(zakura): request header roots through checkpoint handoff
evan-forbes Jun 25, 2026
c6555be
feat!: enforce ranged header requests have roots
evan-forbes Jun 25, 2026
3a27ed3
perf(network): pack block sync ranges by size hint
evan-forbes Jun 26, 2026
5dddfac
fix(zakura): align root coverage test after rebase
p0mvn Jun 27, 2026
36c2bae
perf(network): reduce block sync request bookkeeping
evan-forbes Jun 27, 2026
8b800c3
Merge branch 'feat/pre-release-main' into review/blocksync-request-bo…
p0mvn Jun 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions zebra-network/src/zakura/block_sync/peer_routine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! [`PeerRegistry`]) and that inbound now arrives as a decoded frame from this
//! task's own `FramedRecv` rather than a `PeerInput` channel.

use std::collections::{BTreeMap, HashSet};
use std::collections::BTreeMap;

use tokio::sync::{futures::Notified, mpsc, watch};
use tokio_util::sync::CancellationToken;
Expand All @@ -37,9 +37,9 @@ use super::{
reactor::{
block_sync_message_label, bs_insert_height, bs_insert_peer, bs_insert_u64, tolerated_bytes,
},
request::BlockRangeRequest,
request::{BlockRangeRequest, ExpectedBlock},
sequencer_task::{SequencedBody, SequencerView},
state::{DownloadWindow, OutstandingBlockRange, ThroughputMeter},
state::{DownloadWindow, OutstandingBlockRange, ReceivedBlockTracker, ThroughputMeter},
work_queue::{WorkItem, WorkQueue},
BlockSyncAction, BlockSyncMessage, BlockSyncMisbehavior, BlockSyncPeerSession, BlockSyncStatus,
ZakuraBlockSyncConfig, ZakuraPeerId, ZakuraTrace, MSG_BS_BLOCK,
Expand Down Expand Up @@ -472,7 +472,8 @@ impl PeerRoutine {
// dropped successor heights. Return our unreceived outstanding to
// `work.pending` (a no-op for heights already dropped from `in_flight` by
// `reset_above`) and release their reservations exactly once.
for outstanding in self.window.outstanding.drain(..).collect::<Vec<_>>() {
let outstanding = std::mem::take(&mut self.window.outstanding);
for outstanding in outstanding {
self.budget.release(outstanding.reserved_bytes());
self.work.return_items(unreceived_heights(&outstanding));
}
Expand Down Expand Up @@ -638,15 +639,15 @@ impl PeerRoutine {
count,
anchor_hash: items[0].1.hash,
// The reserved worst-case total (released on a send failure
// below); distinct from the size estimates in `expected_bytes`.
// below); distinct from the size estimates in `expected_blocks`.
estimated_bytes: reserved_bytes,
expected_hashes: items
expected_blocks: items
.iter()
.map(|(height, item)| (*height, item.hash))
.collect(),
expected_bytes: items
.iter()
.map(|(height, item)| (*height, item.estimated_bytes))
.map(|(height, item)| ExpectedBlock {
height: *height,
hash: item.hash,
estimated_bytes: item.estimated_bytes,
})
.collect(),
};

Expand Down Expand Up @@ -675,17 +676,20 @@ impl PeerRoutine {
let deadline = queued_at + self.config.request_timeout;
metrics::counter!("sync.block.request.sent").increment(1);
self.window.record_outbound_request_scheduled();
let request_start_height = request.start_height;
let request_count = request.count;
let request_estimated_bytes = request.estimated_bytes;
self.window.outstanding.push(OutstandingBlockRange {
request: request.clone(),
request,
queued_at,
deadline,
received: HashSet::new(),
received: ReceivedBlockTracker::default(),
});
self.publish_outstanding();
self.trace_get_blocks_sent(
request.start_height,
request.count,
request.estimated_bytes,
request_start_height,
request_count,
request_estimated_bytes,
);
}

Expand Down Expand Up @@ -1222,16 +1226,16 @@ impl PeerRoutine {
/// Publish this peer's current *unreceived* in-flight height→hash set to the
/// registry, so the producer's `!has_outstanding_request` filter and the
/// low-water `total_unreceived` gate read the same per-request-granularity
/// count the previous reactor used (`expected_hashes.len() − received.len()`).
/// count the previous reactor used (`expected_blocks.len() − received.len()`).
/// Received-but-uncommitted heights are excluded here because they are held in
/// `work.in_flight` instead — the producer's `!in_flight_contains` clause
/// already keeps them out of `pending`.
fn publish_outstanding(&self) {
let mut map: BTreeMap<block::Height, block::Hash> = BTreeMap::new();
for outstanding in &self.window.outstanding {
for (height, hash) in &outstanding.request.expected_hashes {
if !outstanding.has_received(*height) {
map.insert(*height, *hash);
for expected in &outstanding.request.expected_blocks {
if !outstanding.has_received(expected.height) {
map.insert(expected.height, expected.hash);
}
}
}
Expand Down Expand Up @@ -1453,10 +1457,10 @@ fn unreceived_heights(
) -> impl Iterator<Item = block::Height> + '_ {
outstanding
.request
.expected_hashes
.expected_blocks
.iter()
.filter(move |(height, _)| !outstanding.has_received(*height))
.map(|(height, _)| *height)
.filter(move |expected| !outstanding.has_received(expected.height))
.map(|expected| expected.height)
}

impl Drop for PeerRoutine {
Expand All @@ -1479,10 +1483,10 @@ impl Drop for PeerRoutine {
self.work.return_items(
outstanding
.request
.expected_hashes
.expected_blocks
.iter()
.filter(|(height, _)| !outstanding.has_received(*height))
.map(|(height, _)| *height),
.filter(|expected| !outstanding.has_received(expected.height))
.map(|expected| expected.height),
);
}
self.registry.clear_outstanding(&self.peer, self.generation);
Expand Down
4 changes: 2 additions & 2 deletions zebra-network/src/zakura/block_sync/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ impl BlockSyncReactor {
// registry to precompute the two peer-derived halves of the reset
// decision. Received-and-buffered heights are caught by the Sequencer's own
// reorder/applying predicates, so reading only unreceived heights here is a
// benign (correct) narrowing of the original `expected_hashes` scan.
// benign (correct) narrowing of the original expected-blocks scan.
let peer_has_successor_after = next_height(tip)
.map(|next| self.registry.any_outstanding_at_or_above(next))
.unwrap_or(false);
Expand Down Expand Up @@ -1131,7 +1131,7 @@ impl BlockSyncReactor {
// The unreceived in-flight heights now live in the routines, mirrored into
// the registry's per-peer outstanding set (per-request granularity: each
// entry is one still-unreceived requested height). `total_unreceived` sums
// them — the same count the old per-peer `expected_hashes − received`
// them — the same count the old per-peer `expected_blocks − received`
// produced.
let outstanding = self.registry.total_unreceived();

Expand Down
27 changes: 20 additions & 7 deletions zebra-network/src/zakura/block_sync/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ pub(super) struct BlockRangeRequest {
pub(super) anchor_hash: block::Hash,
/// The reserved worst-case byte total for this request (released on
/// timeout/disconnect/send-failure). Distinct from the per-height size
/// estimates in `expected_bytes`.
/// estimates in `expected_blocks`.
pub(super) estimated_bytes: u64,
pub(super) expected_blocks: Vec<ExpectedBlock>,
}

/// Per-height metadata needed to validate one body in a range request.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub(super) struct ExpectedBlock {
pub(super) height: block::Height,
pub(super) hash: block::Hash,
pub(super) estimated_bytes: u64,
pub(super) expected_hashes: Vec<(block::Height, block::Hash)>,
pub(super) expected_bytes: Vec<(block::Height, u64)>,
}

impl BlockRangeRequest {
Expand All @@ -47,15 +54,21 @@ impl BlockRangeRequest {
self.start_height <= height && height <= self.end_height()
}

pub(super) fn offset_for_height(&self, height: block::Height) -> Option<u32> {
self.contains(height)
.then(|| height.0.checked_sub(self.start_height.0))
.flatten()
}

pub(super) fn expected_hash(&self, height: block::Height) -> Option<block::Hash> {
self.expected_hashes
self.expected_blocks
.iter()
.find_map(|(known_height, hash)| (*known_height == height).then_some(*hash))
.find_map(|expected| (expected.height == height).then_some(expected.hash))
}

pub(super) fn estimated_bytes_for_height(&self, height: block::Height) -> Option<u64> {
self.expected_bytes
self.expected_blocks
.iter()
.find_map(|(known_height, bytes)| (*known_height == height).then_some(*bytes))
.find_map(|expected| (expected.height == height).then_some(expected.estimated_bytes))
}
}
56 changes: 49 additions & 7 deletions zebra-network/src/zakura/block_sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ pub(super) struct OutstandingBlockRange {
pub(super) request: BlockRangeRequest,
pub(super) queued_at: Instant,
pub(super) deadline: Instant,
pub(super) received: HashSet<block::Height>,
pub(super) received: ReceivedBlockTracker,
}

impl OutstandingBlockRange {
Expand All @@ -465,7 +465,7 @@ impl OutstandingBlockRange {
pub(super) fn reserved_bytes(&self) -> u64 {
let outstanding = self
.request
.expected_hashes
.expected_blocks
.len()
.saturating_sub(self.received.len());
// `outstanding` is a count bounded by `MAX_BS_BLOCKS_PER_REQUEST`, so the
Expand All @@ -478,11 +478,15 @@ impl OutstandingBlockRange {
}

pub(super) fn has_received(&self, height: block::Height) -> bool {
self.received.contains(&height)
self.request
.offset_for_height(height)
.is_some_and(|offset| self.received.contains_offset(offset))
}

pub(super) fn mark_received(&mut self, height: block::Height) {
self.received.insert(height);
if let Some(offset) = self.request.offset_for_height(height) {
self.received.insert_offset(offset);
}
}

/// Mark every requested height at or below `tip` as received and return the
Expand All @@ -491,16 +495,54 @@ impl OutstandingBlockRange {
pub(super) fn mark_received_through(&mut self, tip: block::Height) -> u64 {
let newly_received = self
.request
.expected_hashes
.expected_blocks
.iter()
.filter(|(height, _)| *height <= tip && self.received.insert(*height))
.filter(|expected| {
expected.height <= tip
&& self
.request
.offset_for_height(expected.height)
.is_some_and(|offset| self.received.insert_offset(offset))
})
.count();
// Bounded by `MAX_BS_BLOCKS_PER_REQUEST`; cannot overflow `u64`.
BS_PER_BLOCK_WORST_CASE_BYTES.saturating_mul(newly_received as u64)
}

pub(super) fn is_complete(&self) -> bool {
self.received.len() == self.request.expected_hashes.len()
self.received.len() == self.request.expected_blocks.len()
}
}

#[derive(Clone, Debug, Default)]
pub(super) struct ReceivedBlockTracker {
bits: u128,
count: usize,
}

impl ReceivedBlockTracker {
pub(super) fn len(&self) -> usize {
self.count
}

fn contains_offset(&self, offset: u32) -> bool {
Self::bit_for_offset(offset).is_some_and(|bit| self.bits & bit != 0)
}

fn insert_offset(&mut self, offset: u32) -> bool {
let Some(bit) = Self::bit_for_offset(offset) else {
return false;
};
if self.bits & bit != 0 {
return false;
}
self.bits |= bit;
self.count = self.count.saturating_add(1);
true
}

fn bit_for_offset(offset: u32) -> Option<u128> {
1u128.checked_shl(offset)
}
}

Expand Down
45 changes: 29 additions & 16 deletions zebra-network/src/zakura/block_sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,15 @@ fn window_request(height: u32) -> OutstandingBlockRange {
count: 1,
anchor_hash: block::Hash([byte; 32]),
estimated_bytes: 1,
expected_hashes: vec![(block::Height(height), block::Hash([byte; 32]))],
expected_bytes: vec![(block::Height(height), 1)],
expected_blocks: vec![ExpectedBlock {
height: block::Height(height),
hash: block::Hash([byte; 32]),
estimated_bytes: 1,
}],
},
queued_at: Instant::now(),
deadline: Instant::now(),
received: HashSet::new(),
received: ReceivedBlockTracker::default(),
}
}

Expand Down Expand Up @@ -2166,24 +2169,31 @@ fn outstanding_three_block_range(budget: &mut ByteBudget) -> OutstandingBlockRan
anchor_hash: block::Hash([1; 32]),
// Worst-case reservation: three blocks each reserve one worst-case share.
estimated_bytes: worst * 3,
expected_hashes: vec![
(block::Height(1), block::Hash([1; 32])),
(block::Height(2), block::Hash([2; 32])),
(block::Height(3), block::Hash([3; 32])),
],
// Size hints below the worst case; the reservation does not depend on them.
expected_bytes: vec![
(block::Height(1), 1_000),
(block::Height(2), 1_000),
(block::Height(3), 1_000),
expected_blocks: vec![
ExpectedBlock {
height: block::Height(1),
hash: block::Hash([1; 32]),
estimated_bytes: 1_000,
},
ExpectedBlock {
height: block::Height(2),
hash: block::Hash([2; 32]),
estimated_bytes: 1_000,
},
ExpectedBlock {
height: block::Height(3),
hash: block::Hash([3; 32]),
estimated_bytes: 1_000,
},
],
};
assert!(budget.try_reserve(request.estimated_bytes));
OutstandingBlockRange {
request,
queued_at: Instant::now(),
deadline: Instant::now(),
received: HashSet::new(),
received: ReceivedBlockTracker::default(),
}
}

Expand Down Expand Up @@ -2302,15 +2312,18 @@ fn underestimated_body_is_buffered_without_budget_drop() {
count: 1,
anchor_hash: block::Hash([1; 32]),
estimated_bytes: worst,
expected_hashes: vec![(block::Height(1), block::Hash([1; 32]))],
expected_bytes: vec![(block::Height(1), hint)],
expected_blocks: vec![ExpectedBlock {
height: block::Height(1),
hash: block::Hash([1; 32]),
estimated_bytes: hint,
}],
};
assert!(budget.try_reserve(request.estimated_bytes));
let mut outstanding = OutstandingBlockRange {
request,
queued_at: Instant::now(),
deadline: Instant::now(),
received: HashSet::new(),
received: ReceivedBlockTracker::default(),
};
assert_eq!(budget.reserved(), worst);

Expand Down
4 changes: 2 additions & 2 deletions zebra-state/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1496,8 +1496,8 @@ pub enum ReadRequest {

/// Returns scheduling-only body-size hints for a contiguous height range.
///
/// Confirmed committed block sizes win over untrusted advertised header
/// hints. Unknown advertised sizes are returned as `None`.
/// Only confirmed committed block sizes are returned. Unknown sizes are
/// returned as `None`.
BlockSizeHints {
/// First height to read.
from: block::Height,
Expand Down
2 changes: 1 addition & 1 deletion zebra-state/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ async fn header_only_service_requests_preserve_body_boundary() -> std::result::R
count: 2,
})
.await?,
ReadResponse::BlockSizeHints(vec![(Height(1), Some(999_999)), (Height(2), None)]),
ReadResponse::BlockSizeHints(vec![(Height(1), None), (Height(2), None)]),
);

assert_eq!(
Expand Down
Loading