Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3957,7 +3957,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, block_data) = signed_block.deconstruct();

match self.get_blobs_or_columns_store_op(block_root, block_data) {
match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
Expand Down Expand Up @@ -7163,6 +7163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_slot: Slot,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<'_, T::EthSpec>>, String> {
match block_data {
Expand All @@ -7175,7 +7176,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
}
AvailableBlockData::DataColumns(data_columns) => {
AvailableBlockData::DataColumns(mut data_columns) => {
let columns_to_custody = self.custody_columns_for_epoch(Some(
block_slot.epoch(T::EthSpec::slots_per_epoch()),
));
// Supernodes need to persist all sampled custody columns
if columns_to_custody.len() != self.spec.number_of_custody_groups as usize {
data_columns
.retain(|data_column| columns_to_custody.contains(&data_column.index));
}
debug!(
%block_root,
count = data_columns.len(),
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Store the blobs or data columns too
if let Some(op) = self
.get_blobs_or_columns_store_op(block_root, block_data)
.get_blobs_or_columns_store_op(block_root, block.slot(), block_data)
.map_err(|e| {
HistoricalBlockError::StoreError(StoreError::DBError {
message: format!("get_blobs_or_columns_store_op error {e:?}"),
Expand Down
25 changes: 16 additions & 9 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ enum DataSidecars<E: EthSpec> {
}

async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars<E>>>) {
let harness = get_harness(VALIDATOR_COUNT);
// The assumption that you can re-import a block based on what you have in your DB
// is no longer true, as fullnodes stores less than what they sample.
// We use a supernode here to build a chain segment.
let harness = get_harness(VALIDATOR_COUNT, true);

harness
.extend_chain(
Expand Down Expand Up @@ -101,14 +104,18 @@ async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars
(segment, segment_sidecars)
}

fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<E>> {
fn get_harness(
validator_count: usize,
supernode: bool,
) -> BeaconChainHarness<EphemeralHarnessType<E>> {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.chain_config(ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
})
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.import_all_data_columns(supernode)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();
Expand Down Expand Up @@ -252,7 +259,7 @@ fn update_data_column_signed_header<E: EthSpec>(

#[tokio::test]
async fn chain_segment_full_segment() {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs)
.into_iter()
Expand Down Expand Up @@ -290,7 +297,7 @@ async fn chain_segment_full_segment() {
#[tokio::test]
async fn chain_segment_varying_chunk_size() {
for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs)
.into_iter()
Expand Down Expand Up @@ -322,7 +329,7 @@ async fn chain_segment_varying_chunk_size() {

#[tokio::test]
async fn chain_segment_non_linear_parent_roots() {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;

harness
Expand Down Expand Up @@ -379,7 +386,7 @@ async fn chain_segment_non_linear_parent_roots() {

#[tokio::test]
async fn chain_segment_non_linear_slots() {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
harness
.chain
Expand Down Expand Up @@ -521,7 +528,7 @@ async fn assert_invalid_signature(
async fn get_invalid_sigs_harness(
chain_segment: &[BeaconSnapshot<E>],
) -> BeaconChainHarness<EphemeralHarnessType<E>> {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);
harness
.chain
.slot_clock
Expand Down Expand Up @@ -979,7 +986,7 @@ fn unwrap_err<T, U>(result: Result<T, U>) -> U {

#[tokio::test]
async fn block_gossip_verification() {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;

let block_index = CHAIN_SEGMENT_LENGTH - 2;
Expand Down Expand Up @@ -1382,7 +1389,7 @@ async fn verify_block_for_gossip_slashing_detection() {

#[tokio::test]
async fn verify_block_for_gossip_doppelganger_detection() {
let harness = get_harness(VALIDATOR_COUNT);
let harness = get_harness(VALIDATOR_COUNT, false);

let state = harness.get_current_state();
let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await;
Expand Down
90 changes: 90 additions & 0 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2735,6 +2735,14 @@ async fn weak_subjectivity_sync_test(
.rng(Box::new(StdRng::seed_from_u64(42)))
.build()
.expect("should build");
beacon_chain
.data_availability_checker
.custody_context()
.init_ordered_data_columns_from_custody_groups(
(0..spec.number_of_custody_groups).collect(),
&spec,
)
.unwrap();

let beacon_chain = Arc::new(beacon_chain);
let wss_block_root = wss_block.canonical_root();
Expand Down Expand Up @@ -4137,6 +4145,88 @@ async fn replay_from_split_state() {
assert_eq!(state.slot(), split.slot);
}

/// Test that regular nodes filter and store only custody columns when processing blocks with data columns.
#[tokio::test]
async fn test_custody_column_filtering_regular_node() {
// Skip test if PeerDAS is not scheduled
if !test_spec::<E>().is_peer_das_scheduled() {
return;
}

let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);

// Generate a block with data columns
harness.execution_block_generator().set_min_blob_count(1);
let current_slot = harness.get_current_slot();
let block_root = harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;

// Get custody columns for this epoch - regular nodes only store a subset
let expected_custody_columns: HashSet<_> = harness
.chain
.custody_columns_for_epoch(Some(current_slot.epoch(E::slots_per_epoch())))
.iter()
.copied()
.collect();

// Check what actually got stored in the database
let stored_column_indices: HashSet<_> = store
.get_data_column_keys(block_root)
.expect("should get stored column keys")
.into_iter()
.collect();

assert_eq!(
stored_column_indices, expected_custody_columns,
"Regular node should only store custody columns"
);
}

/// Test that supernodes store all data columns when processing blocks with data columns.
#[tokio::test]
async fn test_custody_column_filtering_supernode() {
// Skip test if PeerDAS is not scheduled
if !test_spec::<E>().is_peer_das_scheduled() {
return;
}

let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);

// Generate a block with data columns
harness.execution_block_generator().set_min_blob_count(1);
let block_root = harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;

// Supernodes are expected to store all data columns
let expected_custody_columns: HashSet<_> = (0..E::number_of_columns() as u64).collect();

// Check what actually got stored in the database
let stored_column_indices: HashSet<_> = store
.get_data_column_keys(block_root)
.expect("should get stored column keys")
.into_iter()
.collect();

assert_eq!(
stored_column_indices, expected_custody_columns,
"Supernode should store all custody columns"
);
}

/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).
Expand Down
Loading