Skip to content

Commit 2c328e3

Browse files
pawanjay176jimmygchenmichaelsproul
authored
Persist only custody columns in db (#8188)
* Only persist custody columns * Get claude to write tests * lint * Address review comments and fix tests. * Use supernode only when building chain segments * Clean up * Rewrite tests. * Fix tests * Clippy --------- Co-authored-by: Jimmy Chen <[email protected]> Co-authored-by: Michael Sproul <[email protected]>
1 parent 178df7a commit 2c328e3

File tree

4 files changed

+118
-12
lines changed

4 files changed

+118
-12
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3957,7 +3957,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
39573957
// See https://github.com/sigp/lighthouse/issues/2028
39583958
let (_, signed_block, block_data) = signed_block.deconstruct();
39593959

3960-
match self.get_blobs_or_columns_store_op(block_root, block_data) {
3960+
match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) {
39613961
Ok(Some(blobs_or_columns_store_op)) => {
39623962
ops.push(blobs_or_columns_store_op);
39633963
}
@@ -7163,6 +7163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
71637163
pub(crate) fn get_blobs_or_columns_store_op(
71647164
&self,
71657165
block_root: Hash256,
7166+
block_slot: Slot,
71667167
block_data: AvailableBlockData<T::EthSpec>,
71677168
) -> Result<Option<StoreOp<'_, T::EthSpec>>, String> {
71687169
match block_data {
@@ -7175,7 +7176,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
71757176
);
71767177
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
71777178
}
7178-
AvailableBlockData::DataColumns(data_columns) => {
7179+
AvailableBlockData::DataColumns(mut data_columns) => {
7180+
let columns_to_custody = self.custody_columns_for_epoch(Some(
7181+
block_slot.epoch(T::EthSpec::slots_per_epoch()),
7182+
));
7183+
// Supernodes need to persist all sampled custody columns
7184+
if columns_to_custody.len() != self.spec.number_of_custody_groups as usize {
7185+
data_columns
7186+
.retain(|data_column| columns_to_custody.contains(&data_column.index));
7187+
}
71797188
debug!(
71807189
%block_root,
71817190
count = data_columns.len(),

beacon_node/beacon_chain/src/historical_blocks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
140140

141141
// Store the blobs or data columns too
142142
if let Some(op) = self
143-
.get_blobs_or_columns_store_op(block_root, block_data)
143+
.get_blobs_or_columns_store_op(block_root, block.slot(), block_data)
144144
.map_err(|e| {
145145
HistoricalBlockError::StoreError(StoreError::DBError {
146146
message: format!("get_blobs_or_columns_store_op error {e:?}"),

beacon_node/beacon_chain/tests/block_verification.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ enum DataSidecars<E: EthSpec> {
4242
}
4343

4444
async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars<E>>>) {
45-
let harness = get_harness(VALIDATOR_COUNT);
45+
// The assumption that you can re-import a block based on what you have in your DB
46+
// is no longer true, as fullnodes stores less than what they sample.
47+
// We use a supernode here to build a chain segment.
48+
let harness = get_harness(VALIDATOR_COUNT, true);
4649

4750
harness
4851
.extend_chain(
@@ -101,14 +104,18 @@ async fn get_chain_segment() -> (Vec<BeaconSnapshot<E>>, Vec<Option<DataSidecars
101104
(segment, segment_sidecars)
102105
}
103106

104-
fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<E>> {
107+
fn get_harness(
108+
validator_count: usize,
109+
supernode: bool,
110+
) -> BeaconChainHarness<EphemeralHarnessType<E>> {
105111
let harness = BeaconChainHarness::builder(MainnetEthSpec)
106112
.default_spec()
107113
.chain_config(ChainConfig {
108114
reconstruct_historic_states: true,
109115
..ChainConfig::default()
110116
})
111117
.keypairs(KEYPAIRS[0..validator_count].to_vec())
118+
.import_all_data_columns(supernode)
112119
.fresh_ephemeral_store()
113120
.mock_execution_layer()
114121
.build();
@@ -252,7 +259,7 @@ fn update_data_column_signed_header<E: EthSpec>(
252259

253260
#[tokio::test]
254261
async fn chain_segment_full_segment() {
255-
let harness = get_harness(VALIDATOR_COUNT);
262+
let harness = get_harness(VALIDATOR_COUNT, false);
256263
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
257264
let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs)
258265
.into_iter()
@@ -290,7 +297,7 @@ async fn chain_segment_full_segment() {
290297
#[tokio::test]
291298
async fn chain_segment_varying_chunk_size() {
292299
for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] {
293-
let harness = get_harness(VALIDATOR_COUNT);
300+
let harness = get_harness(VALIDATOR_COUNT, false);
294301
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
295302
let blocks: Vec<RpcBlock<E>> = chain_segment_blocks(&chain_segment, &chain_segment_blobs)
296303
.into_iter()
@@ -322,7 +329,7 @@ async fn chain_segment_varying_chunk_size() {
322329

323330
#[tokio::test]
324331
async fn chain_segment_non_linear_parent_roots() {
325-
let harness = get_harness(VALIDATOR_COUNT);
332+
let harness = get_harness(VALIDATOR_COUNT, false);
326333
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
327334

328335
harness
@@ -379,7 +386,7 @@ async fn chain_segment_non_linear_parent_roots() {
379386

380387
#[tokio::test]
381388
async fn chain_segment_non_linear_slots() {
382-
let harness = get_harness(VALIDATOR_COUNT);
389+
let harness = get_harness(VALIDATOR_COUNT, false);
383390
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
384391
harness
385392
.chain
@@ -521,7 +528,7 @@ async fn assert_invalid_signature(
521528
async fn get_invalid_sigs_harness(
522529
chain_segment: &[BeaconSnapshot<E>],
523530
) -> BeaconChainHarness<EphemeralHarnessType<E>> {
524-
let harness = get_harness(VALIDATOR_COUNT);
531+
let harness = get_harness(VALIDATOR_COUNT, false);
525532
harness
526533
.chain
527534
.slot_clock
@@ -979,7 +986,7 @@ fn unwrap_err<T, U>(result: Result<T, U>) -> U {
979986

980987
#[tokio::test]
981988
async fn block_gossip_verification() {
982-
let harness = get_harness(VALIDATOR_COUNT);
989+
let harness = get_harness(VALIDATOR_COUNT, false);
983990
let (chain_segment, chain_segment_blobs) = get_chain_segment().await;
984991

985992
let block_index = CHAIN_SEGMENT_LENGTH - 2;
@@ -1382,7 +1389,7 @@ async fn verify_block_for_gossip_slashing_detection() {
13821389

13831390
#[tokio::test]
13841391
async fn verify_block_for_gossip_doppelganger_detection() {
1385-
let harness = get_harness(VALIDATOR_COUNT);
1392+
let harness = get_harness(VALIDATOR_COUNT, false);
13861393

13871394
let state = harness.get_current_state();
13881395
let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await;

beacon_node/beacon_chain/tests/store_tests.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2735,6 +2735,14 @@ async fn weak_subjectivity_sync_test(
27352735
.rng(Box::new(StdRng::seed_from_u64(42)))
27362736
.build()
27372737
.expect("should build");
2738+
beacon_chain
2739+
.data_availability_checker
2740+
.custody_context()
2741+
.init_ordered_data_columns_from_custody_groups(
2742+
(0..spec.number_of_custody_groups).collect(),
2743+
&spec,
2744+
)
2745+
.unwrap();
27382746

27392747
let beacon_chain = Arc::new(beacon_chain);
27402748
let wss_block_root = wss_block.canonical_root();
@@ -4137,6 +4145,88 @@ async fn replay_from_split_state() {
41374145
assert_eq!(state.slot(), split.slot);
41384146
}
41394147

4148+
/// Test that regular nodes filter and store only custody columns when processing blocks with data columns.
4149+
#[tokio::test]
4150+
async fn test_custody_column_filtering_regular_node() {
4151+
// Skip test if PeerDAS is not scheduled
4152+
if !test_spec::<E>().is_peer_das_scheduled() {
4153+
return;
4154+
}
4155+
4156+
let db_path = tempdir().unwrap();
4157+
let store = get_store(&db_path);
4158+
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
4159+
4160+
// Generate a block with data columns
4161+
harness.execution_block_generator().set_min_blob_count(1);
4162+
let current_slot = harness.get_current_slot();
4163+
let block_root = harness
4164+
.extend_chain(
4165+
1,
4166+
BlockStrategy::OnCanonicalHead,
4167+
AttestationStrategy::AllValidators,
4168+
)
4169+
.await;
4170+
4171+
// Get custody columns for this epoch - regular nodes only store a subset
4172+
let expected_custody_columns: HashSet<_> = harness
4173+
.chain
4174+
.custody_columns_for_epoch(Some(current_slot.epoch(E::slots_per_epoch())))
4175+
.iter()
4176+
.copied()
4177+
.collect();
4178+
4179+
// Check what actually got stored in the database
4180+
let stored_column_indices: HashSet<_> = store
4181+
.get_data_column_keys(block_root)
4182+
.expect("should get stored column keys")
4183+
.into_iter()
4184+
.collect();
4185+
4186+
assert_eq!(
4187+
stored_column_indices, expected_custody_columns,
4188+
"Regular node should only store custody columns"
4189+
);
4190+
}
4191+
4192+
/// Test that supernodes store all data columns when processing blocks with data columns.
4193+
#[tokio::test]
4194+
async fn test_custody_column_filtering_supernode() {
4195+
// Skip test if PeerDAS is not scheduled
4196+
if !test_spec::<E>().is_peer_das_scheduled() {
4197+
return;
4198+
}
4199+
4200+
let db_path = tempdir().unwrap();
4201+
let store = get_store(&db_path);
4202+
let harness = get_harness_import_all_data_columns(store.clone(), LOW_VALIDATOR_COUNT);
4203+
4204+
// Generate a block with data columns
4205+
harness.execution_block_generator().set_min_blob_count(1);
4206+
let block_root = harness
4207+
.extend_chain(
4208+
1,
4209+
BlockStrategy::OnCanonicalHead,
4210+
AttestationStrategy::AllValidators,
4211+
)
4212+
.await;
4213+
4214+
// Supernodes are expected to store all data columns
4215+
let expected_custody_columns: HashSet<_> = (0..E::number_of_columns() as u64).collect();
4216+
4217+
// Check what actually got stored in the database
4218+
let stored_column_indices: HashSet<_> = store
4219+
.get_data_column_keys(block_root)
4220+
.expect("should get stored column keys")
4221+
.into_iter()
4222+
.collect();
4223+
4224+
assert_eq!(
4225+
stored_column_indices, expected_custody_columns,
4226+
"Supernode should store all custody columns"
4227+
);
4228+
}
4229+
41404230
/// Checks that two chains are the same, for the purpose of these tests.
41414231
///
41424232
/// Several fields that are hard/impossible to check are ignored (e.g., the store).

0 commit comments

Comments
 (0)