diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f085684442b..85ccb96f693 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3957,7 +3957,7 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (_, signed_block, block_data) = signed_block.deconstruct(); - match self.get_blobs_or_columns_store_op(block_root, block_data) { + match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) { Ok(Some(blobs_or_columns_store_op)) => { ops.push(blobs_or_columns_store_op); } @@ -7163,6 +7163,7 @@ impl BeaconChain { pub(crate) fn get_blobs_or_columns_store_op( &self, block_root: Hash256, + block_slot: Slot, block_data: AvailableBlockData, ) -> Result>, String> { match block_data { @@ -7175,7 +7176,15 @@ impl BeaconChain { ); Ok(Some(StoreOp::PutBlobs(block_root, blobs))) } - AvailableBlockData::DataColumns(data_columns) => { + AvailableBlockData::DataColumns(mut data_columns) => { + let columns_to_custody = self.custody_columns_for_epoch(Some( + block_slot.epoch(T::EthSpec::slots_per_epoch()), + )); + // Supernodes need to persist all sampled custody columns + if columns_to_custody.len() != self.spec.number_of_custody_groups as usize { + data_columns + .retain(|data_column| columns_to_custody.contains(&data_column.index)); + } debug!( %block_root, count = data_columns.len(), diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 8b9fb5e3549..15e0a55cf5a 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -140,7 +140,7 @@ impl BeaconChain { // Store the blobs or data columns too if let Some(op) = self - .get_blobs_or_columns_store_op(block_root, block_data) + .get_blobs_or_columns_store_op(block_root, block.slot(), block_data) .map_err(|e| { HistoricalBlockError::StoreError(StoreError::DBError { message: format!("get_blobs_or_columns_store_op error {e:?}"), diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index b27295751ec..47f5be02cb4 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -42,7 +42,10 @@ enum DataSidecars { } async fn get_chain_segment() -> (Vec>, Vec>>) { - let harness = get_harness(VALIDATOR_COUNT); + // The assumption that you can re-import a block based on what you have in your DB + // is no longer true, as fullnodes stores less than what they sample. + // We use a supernode here to build a chain segment. + let harness = get_harness(VALIDATOR_COUNT, true); harness .extend_chain( @@ -101,7 +104,10 @@ async fn get_chain_segment() -> (Vec>, Vec BeaconChainHarness> { +fn get_harness( + validator_count: usize, + supernode: bool, +) -> BeaconChainHarness> { let harness = BeaconChainHarness::builder(MainnetEthSpec) .default_spec() .chain_config(ChainConfig { @@ -109,6 +115,7 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness( #[tokio::test] async fn chain_segment_full_segment() { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) .into_iter() @@ -290,7 +297,7 @@ async fn chain_segment_full_segment() { #[tokio::test] async fn chain_segment_varying_chunk_size() { for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) .into_iter() @@ -322,7 +329,7 @@ async fn chain_segment_varying_chunk_size() { #[tokio::test] async fn chain_segment_non_linear_parent_roots() { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; harness @@ -379,7 +386,7 @@ async fn chain_segment_non_linear_parent_roots() { #[tokio::test] async fn chain_segment_non_linear_slots() { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; harness .chain @@ -521,7 +528,7 @@ async fn assert_invalid_signature( async fn get_invalid_sigs_harness( chain_segment: &[BeaconSnapshot], ) -> BeaconChainHarness> { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); harness .chain .slot_clock @@ -979,7 +986,7 @@ fn unwrap_err(result: Result) -> U { #[tokio::test] async fn block_gossip_verification() { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let block_index = CHAIN_SEGMENT_LENGTH - 2; @@ -1382,7 +1389,7 @@ async fn verify_block_for_gossip_slashing_detection() { #[tokio::test] async fn verify_block_for_gossip_doppelganger_detection() { - let harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT, false); let state = harness.get_current_state(); let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index cd4032f55d9..449b5dd0434 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2735,6 +2735,14 @@ async fn weak_subjectivity_sync_test( .rng(Box::new(StdRng::seed_from_u64(42))) .build() .expect("should build"); + beacon_chain + .data_availability_checker + .custody_context() + .init_ordered_data_columns_from_custody_groups( + (0..spec.number_of_custody_groups).collect(), + &spec, + ) + .unwrap(); let beacon_chain = Arc::new(beacon_chain); let wss_block_root = wss_block.canonical_root(); @@ -4137,6 +4145,88 @@ async fn replay_from_split_state() { assert_eq!(state.slot(), split.slot); } +/// Test that regular nodes filter and store only custody columns when processing blocks with data columns. +#[tokio::test] +async fn test_custody_column_filtering_regular_node() { + // Skip test if PeerDAS is not scheduled + if !test_spec::().is_peer_das_scheduled() { + return; + } + + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Generate a block with data columns + harness.execution_block_generator().set_min_blob_count(1); + let current_slot = harness.get_current_slot(); + let block_root = harness + .extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Get custody columns for this epoch - regular nodes only store a subset + let expected_custody_columns: HashSet<_> = harness + .chain + .custody_columns_for_epoch(Some(current_slot.epoch(E::slots_per_epoch()))) + .iter() + .copied() + .collect(); + + // Check what actually got stored in the database + let stored_column_indices: HashSet<_> = store + .get_data_column_keys(block_root) + .expect("should get stored column keys") + .into_iter() + .collect(); + + assert_eq!( + stored_column_indices, expected_custody_columns, + "Regular node should only store custody columns" + ); +} + +/// Test that supernodes store all data columns when processing blocks with data columns. +#[tokio::test] +async fn test_custody_column_filtering_supernode() { + // Skip test if PeerDAS is not scheduled + if !test_spec::().is_peer_das_scheduled() { + return; + } + + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT); + + // Generate a block with data columns + harness.execution_block_generator().set_min_blob_count(1); + let block_root = harness + .extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Supernodes are expected to store all data columns + let expected_custody_columns: HashSet<_> = (0..E::number_of_columns() as u64).collect(); + + // Check what actually got stored in the database + let stored_column_indices: HashSet<_> = store + .get_data_column_keys(block_root) + .expect("should get stored column keys") + .into_iter() + .collect(); + + assert_eq!( + stored_column_indices, expected_custody_columns, + "Supernode should store all custody columns" + ); +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store).