Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,11 @@ impl<'de> Deserialize<'de> for Config {
non_zero_config_field.filter(|config_value| config_value > &0).unwrap_or(default_config_value)
});

// Clamp the in-flight byte budget up to the checkpoint-range floor (with a
// warning) rather than rejecting too-small configs, so older configs keep
// starting while checkpoint sync stays deadlock-free.
let mut zakura = zakura;
zakura.block_sync.clamp_inflight_block_bytes_to_floor();
zakura.block_sync.validate().map_err(|error| {
de::Error::custom(format!("invalid zakura.block_sync config: {error}"))
})?;
Expand Down
54 changes: 49 additions & 5 deletions zebra-network/src/zakura/block_sync/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,29 @@ pub const DEFAULT_BS_MAX_REORDER_LOOKAHEAD_BYTES: u64 =
DEFAULT_BS_MAX_INFLIGHT_BLOCK_BYTES - DEFAULT_BS_MAX_RESPONSE_BYTES as u64;
/// Default block-count cap for speculative reorder look-ahead bookkeeping.
pub const DEFAULT_BS_MAX_REORDER_LOOKAHEAD_BLOCKS: u32 = 4096;
/// Default maximum submitted block applies awaiting verifier completion.
/// Minimum submitted block applies required to resolve one checkpoint range.
///
/// The checkpoint verifier resolves a checkpoint window only after the whole
/// window is queued, so this defaults to one maximum checkpoint gap.
pub const DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES: usize =
zebra_chain::parameters::checkpoint::constants::MAX_CHECKPOINT_HEIGHT_GAP;
/// window, including the resolving checkpoint block, is queued. A node that
/// starts one height before a checkpoint-gap boundary can therefore need one
/// maximum checkpoint gap plus the boundary block in flight.
pub const MIN_BS_CHECKPOINT_SUBMITTED_BLOCK_APPLIES: usize =
zebra_chain::parameters::checkpoint::constants::MAX_CHECKPOINT_HEIGHT_GAP + 1;
/// Default maximum submitted block applies awaiting verifier completion.
pub const DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES: usize = MIN_BS_CHECKPOINT_SUBMITTED_BLOCK_APPLIES;
/// The byte budget required to hold one full worst-case checkpoint range in
/// flight.
///
/// The checkpoint verifier resolves a block's commit only once the entire
/// contiguous range to the next checkpoint has been submitted, and every
/// submitted body stays reserved against `max_inflight_block_bytes` until it is
/// durable. A budget that cannot hold a whole worst-case range can never
/// complete one: the verifier never commits, nothing becomes durable, and no
/// bytes are ever released.
pub const BS_CHECKPOINT_RANGE_BYTE_FLOOR: u64 =
// `MIN_BS_CHECKPOINT_SUBMITTED_BLOCK_APPLIES` is `MAX_CHECKPOINT_HEIGHT_GAP + 1`
// (= 401), which fits `u64` losslessly; the product (~802 MB) cannot overflow.
MIN_BS_CHECKPOINT_SUBMITTED_BLOCK_APPLIES as u64 * BS_PER_BLOCK_WORST_CASE_BYTES;
/// Default block-sync request timeout.
pub const DEFAULT_BS_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);
/// Default central floor-watchdog cadence.
Expand Down Expand Up @@ -231,7 +248,8 @@ impl ZakuraBlockSyncConfig {

/// Return the non-zero verifier submission cap.
pub fn submitted_apply_limit(&self) -> usize {
self.max_submitted_block_applies.max(1)
self.max_submitted_block_applies
.max(MIN_BS_CHECKPOINT_SUBMITTED_BLOCK_APPLIES)
}

/// Return the speculative look-ahead byte cap clamped to the global budget.
Expand Down Expand Up @@ -278,6 +296,32 @@ impl ZakuraBlockSyncConfig {
Ok(())
}

/// Raise `max_inflight_block_bytes` up to the checkpoint-range floor when it
/// is configured below it, warning once.
///
/// A positive budget below [`BS_CHECKPOINT_RANGE_BYTE_FLOOR`] cannot hold one
/// full worst-case checkpoint range. The checkpoint verifier only commits a
/// range once the whole range is submitted, and every submitted body stays
/// reserved against the budget until it is durable, so a budget below the
/// floor would deadlock: the verifier never commits, nothing becomes durable,
/// and no bytes are ever released. Rather than refuse to start -- which would
/// break older configs that set a smaller budget -- clamp the budget up to the
/// floor and warn. Zero is left untouched so [`validate`](Self::validate)
/// still rejects it as an explicit misconfiguration.
pub fn clamp_inflight_block_bytes_to_floor(&mut self) {
if self.max_inflight_block_bytes > 0
&& self.max_inflight_block_bytes < BS_CHECKPOINT_RANGE_BYTE_FLOOR
{
tracing::warn!(
configured_max_inflight_block_bytes = self.max_inflight_block_bytes,
checkpoint_range_byte_floor = BS_CHECKPOINT_RANGE_BYTE_FLOOR,
"zakura.block_sync.max_inflight_block_bytes is below the checkpoint-range \
floor; clamping it up so checkpoint sync cannot deadlock",
);
self.max_inflight_block_bytes = BS_CHECKPOINT_RANGE_BYTE_FLOOR;
}
}

/// Build the inert local status used before the block-sync reactor is wired.
pub fn initial_status(&self) -> BlockSyncStatus {
BlockSyncStatus {
Expand Down
12 changes: 12 additions & 0 deletions zebra-network/src/zakura/block_sync/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,17 @@ impl Sequencer {
}
}

fn clear_submitted_applies_through(&mut self, tip: block::Height) {
let heights: Vec<_> = self
.submitted_applies
.range(..=tip)
.map(|(height, _)| *height)
.collect();
for height in heights {
self.submitted_applies.remove(&height);
}
}

// ---- apply finished ----

/// The `(token, hash)` of the body currently applying at `height`, for
Expand Down Expand Up @@ -452,6 +463,7 @@ impl Sequencer {
.range(..=tip)
.map(|(height, _)| *height)
.collect();
self.clear_submitted_applies_through(tip);
let mut released = 0u64;
for height in applied {
if let Some(applying) = self.applying.remove(&height) {
Expand Down
183 changes: 122 additions & 61 deletions zebra-network/src/zakura/block_sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use proptest::{prop_assert, prop_assert_eq};
use super::*;
use super::{
config::{
BS_PER_BLOCK_WORST_CASE_BYTES, DEFAULT_BS_FANOUT, DEFAULT_BS_FLOOR_PEER_AVOID_COOLDOWN,
DEFAULT_BS_FLOOR_WATCHDOG_TICK, DEFAULT_BS_MAX_INFLIGHT_BLOCK_BYTES,
DEFAULT_BS_MAX_REORDER_LOOKAHEAD_BLOCKS, DEFAULT_BS_MAX_REORDER_LOOKAHEAD_BYTES,
DEFAULT_BS_MAX_RESPONSE_BYTES, DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES,
DEFAULT_BS_REQUEST_TIMEOUT, MAX_BS_INFLIGHT_REQUESTS, MAX_BS_RESPONSE_BYTES,
BS_CHECKPOINT_RANGE_BYTE_FLOOR, BS_PER_BLOCK_WORST_CASE_BYTES, DEFAULT_BS_FANOUT,
DEFAULT_BS_FLOOR_PEER_AVOID_COOLDOWN, DEFAULT_BS_FLOOR_WATCHDOG_TICK,
DEFAULT_BS_MAX_INFLIGHT_BLOCK_BYTES, DEFAULT_BS_MAX_REORDER_LOOKAHEAD_BLOCKS,
DEFAULT_BS_MAX_REORDER_LOOKAHEAD_BYTES, DEFAULT_BS_MAX_RESPONSE_BYTES,
DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES, DEFAULT_BS_REQUEST_TIMEOUT,
MAX_BS_INFLIGHT_REQUESTS, MAX_BS_RESPONSE_BYTES,
},
reactor::node_id_from_block_peer_id,
reorder::*,
Expand Down Expand Up @@ -732,6 +733,10 @@ fn block_sync_config_defaults_and_round_trips() {
default.max_submitted_block_applies,
DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES
);
assert_eq!(
default.submitted_apply_limit(),
DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES
);
assert_eq!(default.request_timeout, DEFAULT_BS_REQUEST_TIMEOUT);
assert_eq!(default.fanout, DEFAULT_BS_FANOUT);

Expand All @@ -748,6 +753,10 @@ fn block_sync_config_defaults_and_round_trips() {
)
.expect("nested Zakura block-sync config deserializes");
assert_eq!(config.zakura.block_sync.max_submitted_block_applies, 9);
assert_eq!(
config.zakura.block_sync.submitted_apply_limit(),
DEFAULT_BS_MAX_SUBMITTED_BLOCK_APPLIES,
);
}

#[test]
Expand All @@ -770,11 +779,78 @@ fn config_validate_rejects_degenerate_values() {
};
assert!(config.validate().is_err());

// A positive budget below the checkpoint-range floor is no longer rejected by
// `validate`: it is clamped up to the floor (with a warning) at load instead,
// so older configs keep starting. See
// `config_clamps_below_floor_inflight_block_bytes`.
config = ZakuraBlockSyncConfig {
max_inflight_block_bytes: u64::from(DEFAULT_BS_MAX_RESPONSE_BYTES),
max_inflight_block_bytes: BS_CHECKPOINT_RANGE_BYTE_FLOOR - 1,
..ZakuraBlockSyncConfig::default()
};
assert!(config.validate().is_err());
assert!(config.validate().is_ok());

config = ZakuraBlockSyncConfig {
max_inflight_block_bytes: BS_CHECKPOINT_RANGE_BYTE_FLOOR,
max_reorder_lookahead_bytes: BS_CHECKPOINT_RANGE_BYTE_FLOOR,
..ZakuraBlockSyncConfig::default()
};
assert!(config.validate().is_ok());
}

#[test]
fn config_clamps_below_floor_inflight_block_bytes() {
// A positive budget below the checkpoint-range floor is clamped up to the
// floor so checkpoint sync cannot deadlock (instead of refusing to start).
let mut below = ZakuraBlockSyncConfig {
// 256 MiB, the historical `v4.5.0-zakura-blocksync.toml` value, which is
// below the ~802 MB checkpoint-range floor.
max_inflight_block_bytes: 256 * 1024 * 1024,
..ZakuraBlockSyncConfig::default()
};
assert!(below.max_inflight_block_bytes < BS_CHECKPOINT_RANGE_BYTE_FLOOR);
below.clamp_inflight_block_bytes_to_floor();
assert_eq!(
below.max_inflight_block_bytes,
BS_CHECKPOINT_RANGE_BYTE_FLOOR
);

// A budget at or above the floor is left untouched.
let mut at_floor = ZakuraBlockSyncConfig {
max_inflight_block_bytes: BS_CHECKPOINT_RANGE_BYTE_FLOOR + 1,
..ZakuraBlockSyncConfig::default()
};
at_floor.clamp_inflight_block_bytes_to_floor();
assert_eq!(
at_floor.max_inflight_block_bytes,
BS_CHECKPOINT_RANGE_BYTE_FLOOR + 1
);

// Zero is left untouched so `validate` still rejects it as a misconfiguration.
let mut zero = ZakuraBlockSyncConfig {
max_inflight_block_bytes: 0,
..ZakuraBlockSyncConfig::default()
};
zero.clamp_inflight_block_bytes_to_floor();
assert_eq!(zero.max_inflight_block_bytes, 0);
assert!(zero.validate().is_err());
}

#[test]
fn config_deserialize_clamps_below_floor_inflight_block_bytes() {
// Regression: an older config with a too-small `max_inflight_block_bytes`
// (e.g. the stored `v4.5.0-zakura-blocksync.toml`) must still load -- clamped
// up to the checkpoint-range floor -- rather than being rejected at startup.
let config: crate::Config = toml::from_str(
r#"
[zakura.block_sync]
max_inflight_block_bytes = 268435456
"#,
)
.expect("a below-floor max_inflight_block_bytes config still loads");
assert_eq!(
config.zakura.block_sync.max_inflight_block_bytes,
BS_CHECKPOINT_RANGE_BYTE_FLOOR,
);
}

#[test]
Expand Down Expand Up @@ -2701,6 +2777,31 @@ fn sequencer_records_and_decrements_submitted_applies() {
assert!(!seq.submitted_contains(block::Height(1)));
}

#[test]
fn sequencer_release_applied_through_clears_submitted_applies() {
let mut seq = test_sequencer(0, 1);
let blocks = mainnet_blocks_1_to_3();
for (index, block) in blocks.iter().enumerate() {
let height = block::Height(index as u32 + 1);
seq.accept_body(height, block.hash(), block.clone(), 100, peer(0));
}
seq.drain_ready_into_applying();

let item = seq
.prepare_submit(block::Height(1))
.expect("height 1 is applying");
seq.record_submitted_apply(item.height, item.hash);
assert!(seq.submitted_contains(block::Height(1)));
assert!(
seq.submittable_heights().is_empty(),
"submitted-apply window is full"
);

assert_eq!(seq.release_applied_through(block::Height(1)), 100);
assert!(!seq.submitted_contains(block::Height(1)));
assert_eq!(seq.submittable_heights(), vec![block::Height(2)]);
}

#[test]
fn sequencer_advance_verified_tip_releases_bytes_and_reports_change() {
let mut seq = test_sequencer(0, 8);
Expand Down Expand Up @@ -9372,7 +9473,7 @@ async fn reactor_exchange_reanchor_releases_stale_submitted_bodies() {
}

#[tokio::test]
async fn reactor_caps_submitted_applies_until_completion_releases_slot() {
async fn reactor_clamps_tiny_submitted_apply_config_above_checkpoint_range() {
let blocks = fake_sequential_blocks(4);
let mut config = immediate_body_download_config();
config.max_inflight_block_bytes = u64::MAX;
Expand Down Expand Up @@ -9426,68 +9527,28 @@ async fn reactor_caps_submitted_applies_until_completion_releases_slot() {
}

let mut submitted = Vec::new();
while submitted.len() < 2 {
while submitted.len() < 4 {
match next_action(&mut actions).await {
BlockSyncAction::SubmitBlock { token, block } => submitted.push((
token,
BlockSyncAction::SubmitBlock { block, .. } => submitted.push(
block
.coinbase_height()
.expect("submitted test block has height"),
block.hash(),
)),
),
BlockSyncAction::QueryNeededBlocks { .. } => {}
action => panic!("unexpected action before cap reached: {action:?}"),
action => panic!("unexpected action while collecting submissions: {action:?}"),
}
}
assert_eq!(submitted[0].1, block::Height(1));
assert_eq!(submitted[1].1, block::Height(2));

assert!(
tokio::time::timeout(Duration::from_millis(100), async {
loop {
match actions.recv().await {
Some(BlockSyncAction::SubmitBlock { block, .. }) => {
return block.coinbase_height();
}
Some(BlockSyncAction::QueryNeededBlocks { .. }) => {}
Some(action) => panic!("unexpected action while capped: {action:?}"),
None => return None,
}
}
})
.await
.is_err(),
"third body must wait until an apply completion releases a slot",
assert_eq!(
submitted,
vec![
block::Height(1),
block::Height(2),
block::Height(3),
block::Height(4),
],
"tiny configured submit caps are raised to the checkpoint-safe floor"
);

let (token, height, hash) = submitted[0];
handle
.send(BlockSyncEvent::BlockApplyFinished {
token,
height,
hash,
result: BlockApplyResult::Committed,
local_frontier: None,
})
.await
.expect("apply completion queues");

loop {
match next_action(&mut actions).await {
BlockSyncAction::SubmitBlock { block, .. } => {
assert_eq!(
block
.coinbase_height()
.expect("submitted test block has height"),
block::Height(3)
);
break;
}
BlockSyncAction::QueryNeededBlocks { .. } => {}
action => panic!("unexpected action after slot release: {action:?}"),
}
}

reactor_task.abort();
}

Expand Down
19 changes: 10 additions & 9 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2809,7 +2809,7 @@ mod zakura_header_sync_driver_tests {
}

#[tokio::test]
async fn block_sync_driver_prioritizes_ready_needed_query_over_submit() {
async fn block_sync_driver_prioritizes_ready_submit_over_needed_query() {
let (action_tx, mut action_rx) = mpsc::channel(8);
let block = mainnet_block(&BLOCK_MAINNET_1_BYTES);
action_tx
Expand All @@ -2825,19 +2825,20 @@ mod zakura_header_sync_driver_tests {
.expect("query action queues");

let mut deferred_actions = VecDeque::new();
let action = coalesce_ready_needed_block_queries(&mut action_rx, &mut deferred_actions)
.expect("ready query is prioritized");
assert!(
coalesce_ready_needed_block_queries(&mut action_rx, &mut deferred_actions).is_none()
);

assert!(matches!(
action,
BlockSyncAction::QueryNeededBlocks {
verified_block_tip: block::Height(0),
best_header_tip: block::Height(8),
}
deferred_actions.pop_front(),
Some(BlockSyncAction::SubmitBlock { token: 7, .. })
));
assert!(matches!(
deferred_actions.pop_front(),
Some(BlockSyncAction::SubmitBlock { token: 7, .. })
Some(BlockSyncAction::QueryNeededBlocks {
verified_block_tip: block::Height(0),
best_header_tip: block::Height(8),
})
));
assert!(deferred_actions.is_empty());
assert!(action_rx.try_recv().is_err());
Expand Down
Loading