diff --git a/dash-spv/src/storage/disk/filters.rs b/dash-spv/src/storage/disk/filters.rs index 0d6bcf457..f25e6b1cb 100644 --- a/dash-spv/src/storage/disk/filters.rs +++ b/dash-spv/src/storage/disk/filters.rs @@ -5,7 +5,8 @@ use std::ops::Range; use dashcore::hash_types::FilterHeader; use dashcore_hashes::Hash; -use crate::error::StorageResult; +use crate::error::{StorageError, StorageResult}; +use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY; use super::manager::DiskStorageManager; use super::segments::SegmentState; @@ -216,6 +217,17 @@ impl DiskStorageManager { } tokio::fs::create_dir_all(&filters_dir).await?; + // Remove trusted checkpoint predecessor filter header metadata if present + let metadata_path = + self.base_path.join(format!("state/{}.dat", CHECKPOINT_PREV_FILTER_HEADER_KEY)); + if metadata_path.exists() { + if let Err(e) = tokio::fs::remove_file(&metadata_path).await { + if e.kind() != std::io::ErrorKind::NotFound { + return Err(StorageError::Io(e)); + } + } + } + // Restart background worker for future operations self.start_worker().await; diff --git a/dash-spv/src/storage/memory.rs b/dash-spv/src/storage/memory.rs index 839baf5ba..7023836f8 100644 --- a/dash-spv/src/storage/memory.rs +++ b/dash-spv/src/storage/memory.rs @@ -7,6 +7,7 @@ use std::ops::Range; use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid}; use crate::error::{StorageError, StorageResult}; +use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY; use crate::storage::{MasternodeState, StorageManager, StorageStats}; use crate::types::{ChainState, MempoolState, UnconfirmedTransaction}; @@ -310,6 +311,7 @@ impl StorageManager for MemoryStorageManager { async fn clear_filters(&mut self) -> StorageResult<()> { self.filter_headers.clear(); self.filters.clear(); + self.metadata.remove(CHECKPOINT_PREV_FILTER_HEADER_KEY); Ok(()) } diff --git a/dash-spv/src/storage/metadata_keys.rs b/dash-spv/src/storage/metadata_keys.rs new file mode 100644 index 000000000..5ebdcea80 --- /dev/null +++ b/dash-spv/src/storage/metadata_keys.rs @@ -0,0 +1,4 @@ +//! Common metadata keys stored by storage backends. + +/// Metadata key storing the filter header for the block immediately before a trusted checkpoint. +pub const CHECKPOINT_PREV_FILTER_HEADER_KEY: &str = "checkpoint_prev_filter_header_v1"; diff --git a/dash-spv/src/storage/mod.rs b/dash-spv/src/storage/mod.rs index f8d7d795b..ec1ffc7cc 100644 --- a/dash-spv/src/storage/mod.rs +++ b/dash-spv/src/storage/mod.rs @@ -2,6 +2,7 @@ pub mod disk; pub mod memory; +pub mod metadata_keys; pub mod sync_state; pub mod sync_storage; pub mod types; diff --git a/dash-spv/src/sync/filters/download.rs b/dash-spv/src/sync/filters/download.rs index 8dac8c5a6..9f980f1e8 100644 --- a/dash-spv/src/sync/filters/download.rs +++ b/dash-spv/src/sync/filters/download.rs @@ -16,6 +16,7 @@ use dashcore::{ BlockHash, }; +use super::manager::TrustedCheckpointFilterHeader; use super::types::*; use crate::error::{SyncError, SyncResult}; use crate::network::NetworkManager; @@ -38,19 +39,50 @@ impl header, + None if self.sync_base_height > 0 && height == self.sync_base_height => { + match self.load_checkpoint_prev_filter_header(storage).await? { + Some(trusted) if trusted.height == prev_height => trusted.header, + Some(trusted) => { + tracing::error!( + "Checkpoint predecessor header height mismatch: expected {}, stored {}", + prev_height, + trusted.height + ); + return Ok(false); + } + None => { + tracing::warn!( + "Missing trusted checkpoint predecessor filter header at height {}", + prev_height + ); + return Ok(false); + } + } + } + None => { + tracing::warn!( + "Missing filter header at height {} required for verifying cfilter at {}", + prev_height, + height + ); + return Ok(false); + } + }; + let expected_header = storage.get_filter_header(height).await.map_err(|e| { SyncError::Storage(format!("Failed to load expected filter header: {}", e)) })?; - let (Some(prev_header), Some(expected_header)) = (prev_header, expected_header) else { - tracing::warn!( - "Missing filter headers in storage for height {} (prev and/or expected)", - height - ); + let Some(expected_header) = expected_header else { + tracing::warn!("Missing filter headers in storage for height {}", height); return Ok(false); }; @@ -582,8 +614,21 @@ impl 0 + // If this is the first batch that begins at the checkpoint base, persist the + // trusted predecessor filter header so we can verify the checkpoint filter. + if self.sync_base_height > 0 && start_height == self.sync_base_height { + if let Some(prev_height) = self.sync_base_height.checked_sub(1) { + let record = TrustedCheckpointFilterHeader { + height: prev_height, + header: cfheaders.previous_filter_header, + }; + self.persist_checkpoint_prev_filter_header(storage, record).await?; + tracing::info!( + "Stored trusted checkpoint predecessor filter header at height {}", + prev_height + ); + } + } else if self.sync_base_height > 0 && start_height == self.sync_base_height + 1 && current_filter_tip < self.sync_base_height { diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index e4f95447f..d85d87c56 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -6,16 +6,51 @@ use dashcore::{hash_types::FilterHeader, network::message_filter::CFHeaders, BlockHash}; use dashcore_hashes::{sha256d, Hash}; use std::collections::{HashMap, HashSet, VecDeque}; +use tokio::sync::Mutex; use crate::client::ClientConfig; use crate::error::{SyncError, SyncResult}; use crate::network::NetworkManager; +use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY; use crate::storage::StorageManager; use crate::types::SharedFilterHeights; // Import types and constants from the types module use super::types::*; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) struct TrustedCheckpointFilterHeader { + pub(super) height: u32, + pub(super) header: FilterHeader, +} + +impl TrustedCheckpointFilterHeader { + fn to_bytes(self) -> [u8; 36] { + let mut buf = [0u8; 36]; + buf[..4].copy_from_slice(&self.height.to_le_bytes()); + buf[4..].copy_from_slice(self.header.as_byte_array()); + buf + } + + fn from_bytes(bytes: &[u8]) -> Option { + if bytes.len() != 36 { + return None; + } + + let mut height_bytes = [0u8; 4]; + height_bytes.copy_from_slice(&bytes[..4]); + let height = u32::from_le_bytes(height_bytes); + + let mut header_bytes = [0u8; 32]; + header_bytes.copy_from_slice(&bytes[4..36]); + + Some(Self { + height, + header: FilterHeader::from_byte_array(header_bytes), + }) + } +} + /// Manages BIP157 compact block filter synchronization. /// /// # Generic Parameters @@ -102,6 +137,8 @@ pub struct FilterSyncManager { pub(super) max_concurrent_cfheader_requests: usize, /// Timeout for CFHeaders requests pub(super) cfheader_request_timeout: std::time::Duration, + /// Trusted predecessor filter header for the configured checkpoint base + checkpoint_prev_filter_header: Mutex>, } impl @@ -148,6 +185,7 @@ impl SyncResult<()> { + storage + .store_metadata(CHECKPOINT_PREV_FILTER_HEADER_KEY, &header.to_bytes()) + .await + .map_err(|e| { + SyncError::Storage(format!( + "Failed to persist checkpoint predecessor filter header: {}", + e + )) + })?; + + let mut guard = self.checkpoint_prev_filter_header.lock().await; + *guard = Some(header); + Ok(()) + } + + pub(super) async fn load_checkpoint_prev_filter_header( + &self, + storage: &S, + ) -> SyncResult> { + let mut guard = self.checkpoint_prev_filter_header.lock().await; + if guard.is_none() { + if let Some(bytes) = + storage.load_metadata(CHECKPOINT_PREV_FILTER_HEADER_KEY).await.map_err(|e| { + SyncError::Storage(format!( + "Failed to load checkpoint predecessor filter header: {}", + e + )) + })? + { + if let Some(record) = TrustedCheckpointFilterHeader::from_bytes(&bytes) { + *guard = Some(record); + } else { + tracing::warn!( + "Stored checkpoint predecessor filter header has unexpected format ({} bytes)", + bytes.len() + ); + } + } + } + + Ok(*guard) + } + /// Handle overlapping filter headers by skipping already processed ones. pub fn has_pending_downloads(&self) -> bool { !self.pending_block_downloads.is_empty() || !self.downloading_blocks.is_empty() diff --git a/dash-spv/src/sync/masternodes.rs b/dash-spv/src/sync/masternodes.rs index a777330f0..7746e1673 100644 --- a/dash-spv/src/sync/masternodes.rs +++ b/dash-spv/src/sync/masternodes.rs @@ -49,6 +49,20 @@ pub struct MasternodeSyncManager { // Track retry attempts for MnListDiff requests mnlistdiff_retry_count: u8, + + // Checkpoint base height (0 when syncing from genesis) + sync_base_height: u32, + + // Track start-list backfills required before QRInfo can be processed + pending_start_list_backfills: HashMap, + + // Deferred QRInfo that will be retried once missing start lists are backfilled + pending_qrinfo_retry: Option, +} + +enum QrInfoProcessResult { + Completed, + WaitingForStartList(BlockHash), } impl @@ -118,11 +132,113 @@ impl Result<(), String> { + if self.pending_start_list_backfills.contains_key(&missing_block_hash) { + tracing::info!( + "Already waiting for start masternode list at block {}", + missing_block_hash + ); + return Ok(()); + } + + let engine = + self.engine.as_mut().ok_or_else(|| "Masternode engine not initialized".to_string())?; + let (latest_height, latest_block_hash) = engine + .latest_masternode_list() + .map(|list| (list.known_height, list.block_hash)) + .ok_or_else(|| "Masternode engine has no known masternode list".to_string())?; + + let target_height = storage + .get_header_height_by_hash(&missing_block_hash) + .await + .map_err(|e| { + format!( + "Failed to look up height for missing start list block {}: {}", + missing_block_hash, e + ) + })? + .ok_or_else(|| { + format!( + "Height not found in storage for missing start list block {}", + missing_block_hash + ) + })?; + + if engine.masternode_lists.contains_key(&target_height) { + tracing::debug!("Engine already has masternode list at height {}", target_height); + return Ok(()); + } + + engine.feed_block_height(target_height, missing_block_hash); + + let request = dashcore::network::message_sml::GetMnListDiff { + base_block_hash: latest_block_hash, + block_hash: missing_block_hash, + }; + + network + .send_message(NetworkMessage::GetMnListD(request)) + .await + .map_err(|e| format!("Failed to send MnListDiff backfill request: {}", e))?; + + tracing::info!( + "Requested MnListDiff backfill from height {} to {} ({} -> {})", + latest_height, + target_height, + latest_block_hash, + missing_block_hash + ); + + self.pending_start_list_backfills.insert(missing_block_hash, target_height); + + Ok(()) + } + + async fn apply_start_list_backfill( + &mut self, + diff: MnListDiff, + target_height: u32, + ) -> Result<(), String> { + let engine = + self.engine.as_mut().ok_or_else(|| "Masternode engine not initialized".to_string())?; + let block_hash = diff.block_hash; + engine.feed_block_height(target_height, block_hash); + engine + .apply_diff(diff, Some(target_height), false, None) + .map_err(|e| format!("Failed to apply MnListDiff backfill: {}", e))?; + tracing::info!( + "Applied MnListDiff backfill up to height {} (block {})", + target_height, + block_hash + ); + Ok(()) + } + + async fn retry_pending_qrinfo(&mut self, storage: &mut S, network: &mut dyn NetworkManager) { + if let Some(pending) = self.pending_qrinfo_retry.take() { + tracing::info!("Retrying deferred QRInfo after start list backfill"); + self.handle_qrinfo_message(pending, storage, network).await; + } + } + /// Request QRInfo - simplified non-blocking implementation pub async fn request_qrinfo( &mut self, @@ -235,6 +351,47 @@ impl(&self, storage: &T) -> SyncResult + where + T: StorageManager + ?Sized, + { + if let Some(last_qrinfo_hash) = self.last_qrinfo_block_hash { + return Ok(last_qrinfo_hash); + } + + if self.sync_base_height > 0 { + match storage.get_header(self.sync_base_height).await { + Ok(Some(header)) => { + let hash = header.block_hash(); + tracing::debug!( + "Using checkpoint base block at height {} as QRInfo base: {}", + self.sync_base_height, + hash + ); + return Ok(hash); + } + Ok(None) => { + tracing::warn!( + "Checkpoint header at height {} not found in storage, falling back to genesis", + self.sync_base_height + ); + } + Err(e) => { + tracing::warn!( + "Failed to load checkpoint header at height {}: {}. Falling back to genesis", + self.sync_base_height, + e + ); + } + } + } + + self.config + .network + .known_genesis_block_hash() + .ok_or_else(|| SyncError::InvalidState("Genesis hash not available".to_string())) + } + /// Log detailed QRInfo statistics fn log_qrinfo_details(&self, qr_info: &QRInfo, prefix: &str) { let h4c_count = if qr_info.quorum_snapshot_and_mn_list_diff_at_h_minus_4c.is_some() { @@ -411,19 +568,7 @@ impl SyncResult { self.insert_mn_list_diff(&diff, storage).await; + let mut backfill_applied = false; + if let Some(target_height) = self.pending_start_list_backfills.remove(&diff.block_hash) { + match self.apply_start_list_backfill(diff.clone(), target_height).await { + Ok(()) => { + backfill_applied = true; + } + Err(e) => { + tracing::error!( + "❌ Failed to apply start list backfill for block {}: {}", + diff.block_hash, + e + ); + self.error = Some(e); + } + } + } + // Decrement pending request counter if we were expecting this response if self.pending_mnlistdiff_requests > 0 { self.pending_mnlistdiff_requests -= 1; @@ -496,6 +658,10 @@ impl {} + Ok(QrInfoProcessResult::WaitingForStartList(block_hash)) => { + tracing::info!( + "⏳ Waiting for missing start masternode list at block {} before processing QRInfo", + block_hash + ); + self.pending_qrinfo_retry = Some(qr_info); + return; + } + Err(e) => { + tracing::error!("❌ Failed to process QRInfo follow-up diffs: {}", e); + self.error = Some(e); + return; + } } // Cache the QRInfo using the requested block hash as key @@ -727,7 +896,7 @@ impl Result<(), String> { + ) -> Result { tracing::info!( "🔗 Feeding QRInfo to engine and getting additional diffs for quorum validation" ); @@ -754,6 +923,12 @@ impl { tracing::info!("✅ Successfully fed QRInfo to masternode list engine"); } + Err(dashcore::sml::quorum_validation_error::QuorumValidationError::SMLError( + dashcore::sml::error::SmlError::MissingStartMasternodeList(block_hash), + )) => { + self.schedule_start_list_backfill(block_hash, storage, network).await?; + return Ok(QrInfoProcessResult::WaitingForStartList(block_hash)); + } Err(e) => { let error_msg = format!("Failed to feed QRInfo to engine: {}", e); tracing::error!("❌ {}", error_msg); @@ -798,7 +973,7 @@ impl {