diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 85ccb96f693..a4d5078aeca 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -6913,6 +6913,44 @@ impl BeaconChain { ); } + /// Get the earliest epoch in which the node has met its custody requirements. + /// A `None` response indicates that we've met our cutody requirements up to the + /// column data availability window + pub fn earliest_custodied_data_column_epoch(&self) -> Option { + self.store + .get_data_column_custody_info() + .unwrap_or(None) + .and_then(|info| info.earliest_data_column_slot) + .map(|slot| { + let mut epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + // If the earliest custodied slot isn't the first slot in the epoch + // The node has only met its custody requirements for the next epoch. + if slot > epoch.start_slot(T::EthSpec::slots_per_epoch()) { + epoch += 1; + } + epoch + }) + } + + /// The data availability boundary for custodying columns. It will just be the + /// regular data availability boundary unless we are near the Fulu fork epoch. + pub fn column_data_availability_boundary(&self) -> Option { + match self.data_availability_boundary() { + Some(da_boundary_epoch) => { + if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch { + if da_boundary_epoch < fulu_fork_epoch { + Some(fulu_fork_epoch) + } else { + Some(da_boundary_epoch) + } + } else { + None // Fulu hasn't been enabled + } + } + None => None, // Deneb hasn't been enabled + } + } + /// This method serves to get a sense of the current chain health. It is used in block proposal /// to determine whether we should outsource payload production duties. /// diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 5564c7916fa..d0a5625cc00 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -388,12 +388,6 @@ where .init_blob_info(genesis.beacon_block.slot()) .map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?, ); - self.pending_io_batch.push( - store - .init_data_column_info(genesis.beacon_block.slot()) - .map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?, - ); - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, genesis.clone()) .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; let current_slot = None; @@ -604,11 +598,6 @@ where .init_blob_info(weak_subj_block.slot()) .map_err(|e| format!("Failed to initialize blob info: {:?}", e))?, ); - self.pending_io_batch.push( - store - .init_data_column_info(weak_subj_block.slot()) - .map_err(|e| format!("Failed to initialize data column info: {:?}", e))?, - ); let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 15e0a55cf5a..8973a234a27 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -8,7 +8,6 @@ use state_processing::{ use std::borrow::Cow; use std::iter; use std::time::Duration; -use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::{debug, instrument}; @@ -70,7 +69,6 @@ impl BeaconChain { ) -> Result { let anchor_info = self.store.get_anchor_info(); let blob_info = self.store.get_blob_info(); - let data_column_info = self.store.get_data_column_info(); // Take all blocks with slots less than the oldest block slot. let num_relevant = blocks.partition_point(|available_block| { @@ -97,7 +95,6 @@ impl BeaconChain { let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; - let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; let mut blob_batch = Vec::::new(); let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); @@ -133,9 +130,7 @@ impl BeaconChain { AvailableBlockData::Blobs(..) => { new_oldest_blob_slot = Some(block.slot()); } - AvailableBlockData::DataColumns(_) => { - new_oldest_data_column_slot = Some(block.slot()); - } + AvailableBlockData::DataColumns(_) => {} } // Store the blobs or data columns too @@ -250,19 +245,6 @@ impl BeaconChain { ); } - // Update the data column info. - if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot - && let Some(oldest_data_column_slot) = new_oldest_data_column_slot - { - let new_data_column_info = DataColumnInfo { - oldest_data_column_slot: Some(oldest_data_column_slot), - }; - anchor_and_blob_batch.push( - self.store - .compare_and_set_data_column_info(data_column_info, new_data_column_info)?, - ); - } - // Update the anchor. let new_anchor = AnchorInfo { oldest_block_slot: prev_block_slot, diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index ddc59783394..0efab9e3fa3 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -5,6 +5,7 @@ mod migration_schema_v25; mod migration_schema_v26; mod migration_schema_v27; mod migration_schema_v28; +mod migration_schema_v29; use crate::beacon_chain::BeaconChainTypes; use std::sync::Arc; @@ -88,6 +89,11 @@ pub fn migrate_schema( let ops = migration_schema_v28::downgrade_from_v28::(db.clone())?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(28), SchemaVersion(29)) => { + let ops = migration_schema_v29::upgrade_to_v29()?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(29), SchemaVersion(28)) => migration_schema_v29::downgrade_from_v29(), // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v29.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v29.rs new file mode 100644 index 00000000000..9b830037a57 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v29.rs @@ -0,0 +1,15 @@ +use store::{DBColumn, Error, KeyValueStoreOp, metadata::DATA_COLUMN_INFO_KEY}; + +/// Add `DataColumnCustodyInfo` entry to v27. +pub fn upgrade_to_v29() -> Result, Error> { + Ok(vec![KeyValueStoreOp::DeleteKey( + DBColumn::BeaconMeta, + DATA_COLUMN_INFO_KEY.as_slice().to_vec(), + )]) +} + +pub fn downgrade_from_v29() -> Result<(), Error> { + Err(Error::MigrationError( + "Cannot downgrade from v29".to_string(), + )) +} diff --git a/beacon_node/beacon_chain/tests/schema_stability.rs b/beacon_node/beacon_chain/tests/schema_stability.rs index 3b09921c15c..99c049ebe0d 100644 --- a/beacon_node/beacon_chain/tests/schema_stability.rs +++ b/beacon_node/beacon_chain/tests/schema_stability.rs @@ -9,10 +9,8 @@ use operation_pool::PersistedOperationPool; use ssz::Encode; use std::sync::{Arc, LazyLock}; use store::{ - DBColumn, HotColdDB, StoreConfig, StoreItem, - database::interface::BeaconNodeBackend, - hot_cold_store::Split, - metadata::{DataColumnCustodyInfo, DataColumnInfo}, + DBColumn, HotColdDB, StoreConfig, StoreItem, database::interface::BeaconNodeBackend, + hot_cold_store::Split, metadata::DataColumnCustodyInfo, }; use strum::IntoEnumIterator; use tempfile::{TempDir, tempdir}; @@ -133,7 +131,6 @@ fn check_metadata_sizes(store: &Store) { 6 } ); - assert_eq!(DataColumnInfo::default().ssz_bytes_len(), 5); assert_eq!(DataColumnCustodyInfo::default().ssz_bytes_len(), 5); } diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 58e02ffe007..ad897243fd5 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1154,33 +1154,42 @@ impl NetworkBeaconProcessor { let request_start_slot = Slot::from(req.start_slot); - let data_availability_boundary_slot = match self.chain.data_availability_boundary() { - Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), - None => { - debug!("Deneb fork is disabled"); - return Err((RpcErrorResponse::InvalidRequest, "Deneb fork is disabled")); - } - }; + let column_data_availability_boundary_slot = + match self.chain.column_data_availability_boundary() { + Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), + None => { + debug!("Deneb fork is disabled"); + return Err((RpcErrorResponse::InvalidRequest, "Deneb fork is disabled")); + } + }; - let oldest_data_column_slot = self - .chain - .store - .get_data_column_info() - .oldest_data_column_slot - .unwrap_or(data_availability_boundary_slot); + let earliest_custodied_data_column_slot = + match self.chain.earliest_custodied_data_column_epoch() { + Some(earliest_custodied_epoch) => { + let earliest_custodied_slot = + earliest_custodied_epoch.start_slot(T::EthSpec::slots_per_epoch()); + // Ensure the earliest columns we serve are within the data availability window + if earliest_custodied_slot < column_data_availability_boundary_slot { + column_data_availability_boundary_slot + } else { + earliest_custodied_slot + } + } + None => column_data_availability_boundary_slot, + }; - if request_start_slot < oldest_data_column_slot { + if request_start_slot < earliest_custodied_data_column_slot { debug!( %request_start_slot, - %oldest_data_column_slot, - %data_availability_boundary_slot, + %earliest_custodied_data_column_slot, + %column_data_availability_boundary_slot, "Range request start slot is older than data availability boundary." ); - return if data_availability_boundary_slot < oldest_data_column_slot { + return if earliest_custodied_data_column_slot > column_data_availability_boundary_slot { Err(( RpcErrorResponse::ResourceUnavailable, - "blobs pruned within boundary", + "columns pruned within boundary", )) } else { Err(( diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index f62647ae545..63027fe2ecb 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -33,8 +33,6 @@ pub enum Error { AnchorInfoConcurrentMutation, /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. BlobInfoConcurrentMutation, - /// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied. - DataColumnInfoConcurrentMutation, /// The block or state is unavailable due to weak subjectivity sync. HistoryUnavailable, /// State reconstruction cannot commence because not all historic blocks are known. @@ -94,7 +92,6 @@ pub enum Error { LoadAnchorInfo(Box), LoadSplit(Box), LoadBlobInfo(Box), - LoadDataColumnInfo(Box), LoadConfig(Box), LoadHotStateSummary(Hash256, Box), LoadHotStateSummaryForSplit(Box), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0d8a65e0644..a17180966fd 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -8,8 +8,8 @@ use crate::memory_store::MemoryStore; use crate::metadata::{ ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, AnchorInfo, BLOB_INFO_KEY, BlobInfo, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, CompactionTimestamp, - DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY, DataColumnCustodyInfo, DataColumnInfo, - SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, SchemaVersion, + DATA_COLUMN_CUSTODY_INFO_KEY, DataColumnCustodyInfo, SCHEMA_VERSION_KEY, SPLIT_KEY, + STATE_UPPER_LIMIT_NO_RETAIN, SchemaVersion, }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ @@ -57,8 +57,6 @@ pub struct HotColdDB, Cold: ItemStore> { anchor_info: RwLock, /// The starting slots for the range of blobs stored in the database. blob_info: RwLock, - /// The starting slots for the range of data columns stored in the database. - data_column_info: RwLock, pub(crate) config: StoreConfig, pub hierarchy: HierarchyModuli, /// Cold database containing compact historical data. @@ -225,7 +223,6 @@ impl HotColdDB, MemoryStore> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(ANCHOR_UNINITIALIZED), blob_info: RwLock::new(BlobInfo::default()), - data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: MemoryStore::open(), blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -279,7 +276,6 @@ impl HotColdDB, BeaconNodeBackend> { split: RwLock::new(Split::default()), anchor_info, blob_info: RwLock::new(BlobInfo::default()), - data_column_info: RwLock::new(DataColumnInfo::default()), blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?, cold_db: BeaconNodeBackend::open(&config, cold_path)?, hot_db, @@ -364,35 +360,9 @@ impl HotColdDB, BeaconNodeBackend> { }; db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; - let data_column_info = db.load_data_column_info()?; - let fulu_fork_slot = db - .spec - .fulu_fork_epoch - .map(|epoch| epoch.start_slot(E::slots_per_epoch())); - let new_data_column_info = match &data_column_info { - Some(data_column_info) => { - // Set the oldest data column slot to the fork slot if it is not yet set. - let oldest_data_column_slot = - data_column_info.oldest_data_column_slot.or(fulu_fork_slot); - DataColumnInfo { - oldest_data_column_slot, - } - } - // First start. - None => DataColumnInfo { - // Set the oldest data column slot to the fork slot if it is not yet set. - oldest_data_column_slot: fulu_fork_slot, - }, - }; - db.compare_and_set_data_column_info_with_write( - <_>::default(), - new_data_column_info.clone(), - )?; - info!( path = ?blobs_db_path, oldest_blob_slot = ?new_blob_info.oldest_blob_slot, - oldest_data_column_slot = ?new_data_column_info.oldest_data_column_slot, "Blob DB initialized" ); @@ -2722,24 +2692,6 @@ impl, Cold: ItemStore> HotColdDB self.blob_info.read_recursive().clone() } - /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. - pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { - let oldest_data_column_slot = self.spec.fulu_fork_epoch.map(|fork_epoch| { - std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) - }); - let data_column_info = DataColumnInfo { - oldest_data_column_slot, - }; - self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info) - } - - /// Get a clone of the store's data column info. - /// - /// To do mutations, use `compare_and_set_data_column_info`. - pub fn get_data_column_info(&self) -> DataColumnInfo { - self.data_column_info.read_recursive().clone() - } - /// Atomically update the blob info from `prev_value` to `new_value`. /// /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other @@ -2787,56 +2739,6 @@ impl, Cold: ItemStore> HotColdDB blob_info.as_kv_store_op(BLOB_INFO_KEY) } - /// Atomically update the data column info from `prev_value` to `new_value`. - /// - /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other - /// values. - /// - /// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided - /// is not correct. - pub fn compare_and_set_data_column_info( - &self, - prev_value: DataColumnInfo, - new_value: DataColumnInfo, - ) -> Result { - let mut data_column_info = self.data_column_info.write(); - if *data_column_info == prev_value { - let kv_op = self.store_data_column_info_in_batch(&new_value); - *data_column_info = new_value; - Ok(kv_op) - } else { - Err(Error::DataColumnInfoConcurrentMutation) - } - } - - /// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately. - pub fn compare_and_set_data_column_info_with_write( - &self, - prev_value: DataColumnInfo, - new_value: DataColumnInfo, - ) -> Result<(), Error> { - let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?; - self.hot_db.do_atomically(vec![kv_store_op]) - } - - /// Load the blob info from disk, but do not set `self.data_column_info`. - fn load_data_column_info(&self) -> Result, Error> { - self.hot_db - .get(&DATA_COLUMN_INFO_KEY) - .map_err(|e| Error::LoadDataColumnInfo(e.into())) - } - - /// Store the given `data_column_info` to disk. - /// - /// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues - /// with recursive locking. - fn store_data_column_info_in_batch( - &self, - data_column_info: &DataColumnInfo, - ) -> KeyValueStoreOp { - data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY) - } - /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index cf494684515..21fa15e90d0 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -228,30 +228,3 @@ impl StoreItem for DataColumnCustodyInfo { Ok(DataColumnCustodyInfo::from_ssz_bytes(bytes)?) } } - -/// Database parameters relevant to data column sync. -#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] -pub struct DataColumnInfo { - /// The slot after which data columns are or *will be* available (>=). - /// - /// If this slot is in the future, then it is the first slot of the Fulu fork, from which - /// data columns will be available. - /// - /// If the `oldest_data_column_slot` is `None` then this means that the Fulu fork epoch is - /// not yet known. - pub oldest_data_column_slot: Option, -} - -impl StoreItem for DataColumnInfo { - fn db_column() -> DBColumn { - DBColumn::BeaconMeta - } - - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() - } - - fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(Self::from_ssz_bytes(bytes)?) - } -}