From 1bcc6fa9b76b7d06b2b4f7db8e1093c3b8b81ff5 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Tue, 14 Oct 2025 04:05:00 +0000 Subject: [PATCH 1/8] Remove total timeout from signature and add test --- .../node/src/protocol/signature.rs | 382 ++++++++++++++---- chain-signatures/node/src/util.rs | 15 - integration-tests/src/mpc_fixture/builder.rs | 13 +- .../src/mpc_fixture/fixture_interface.rs | 22 + .../src/mpc_fixture/fixture_tasks.rs | 12 +- .../src/mpc_fixture/message_filters.rs | 82 ++++ integration-tests/src/mpc_fixture/mod.rs | 2 + integration-tests/tests/cases/mpc.rs | 222 +++++++++- 8 files changed, 649 insertions(+), 101 deletions(-) create mode 100644 integration-tests/src/mpc_fixture/message_filters.rs diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index d11fbec7c..5c8301506 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -33,6 +33,9 @@ use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task::{JoinHandle, JoinSet}; +#[cfg(feature = "test-feature")] +use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; + use near_account_id::AccountId; /// This is the maximum amount of sign requests that we can accept in the network. @@ -101,6 +104,7 @@ pub struct SignRequest { pub proposer: Participant, pub stable: BTreeSet, pub round: usize, + attempts: Attempts, } pub struct SignQueue { @@ -119,6 +123,54 @@ pub struct SignQueue { pending: HashMap>, } +#[derive(Debug, Clone, PartialEq)] +pub struct Attempts { + attempts: u32, + next_retry_at: Instant, +} + +impl Default for Attempts { + fn default() -> Self { + Self::new() + } +} + +impl Attempts { + pub fn new() -> Self { + Self { + attempts: 0, + next_retry_at: Instant::now(), + } + } + + pub fn schedule_next_attempt(&mut self) { + self.attempts = self.attempts.saturating_add(1); + self.next_retry_at = Instant::now() + self.compute_retry_delay(); + } + + pub fn is_ready(&self, now: Instant) -> bool { + self.next_retry_at <= now + } + + pub fn is_ready_now(&self) -> bool { + self.is_ready(Instant::now()) + } + + pub fn mark_ready(&mut self, now: Instant) { + self.next_retry_at = now; + } + + fn compute_retry_delay(&self) -> Duration { + let base_ms = retry_backoff_base_ms(); + let exponent = self.attempts.saturating_sub(1).min(6); + let multiplier = 1u32 << exponent; + let delay_ms = base_ms + .saturating_mul(u64::from(multiplier)) + .min(retry_backoff_max_ms()); + Duration::from_millis(delay_ms) + } +} + impl SignQueue { pub fn channel() -> ( mpsc::Sender, @@ -226,6 +278,7 @@ impl SignQueue { proposer, stable: stable.clone(), round, + attempts: Attempts::new(), } } @@ -258,10 +311,13 @@ impl SignQueue { .with_label_values(&[indexed.chain.as_str(), my_account_id.as_str()]) .inc(); - let request = self.organize_request(stable, participants, indexed, 0); + let mut request = self.organize_request(stable, participants, indexed, 0); + if request.indexed.timestamp_sign_queue.is_none() { + request.indexed.timestamp_sign_queue = Some(Instant::now()); + } let is_mine = request.proposer == self.me; if is_mine { - self.my_requests.push_back(sign_id); + self.my_requests.push_front(sign_id); crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) .inc(); @@ -286,26 +342,43 @@ impl SignQueue { participants: &Participants, my_account_id: &AccountId, ) { - while let Some(id) = self.failed_requests.pop_front() { - let Some(request) = self.requests.remove(&id) else { + let now = Instant::now(); + let len = self.failed_requests.len(); + for _ in 0..len { + let Some(id) = self.failed_requests.pop_front() else { + break; + }; + + let Some(mut request) = self.requests.remove(&id) else { continue; }; - let (reorganized, request) = if &request.stable == stable { - // just use the same request if the participants are the same - (false, request) - } else { - let request = + if !request.attempts.is_ready(now) { + self.requests.insert(id, request); + self.failed_requests.push_back(id); + continue; + } + + let mut reorganized = false; + if &request.stable != stable { + let attempts = request.attempts; + request = self.organize_request(stable, participants, request.indexed, request.round); - (true, request) - }; + request.attempts = attempts; + reorganized = true; + } + + // Delay information is reset since we're ready to try again now that the + // backoff window has elapsed. + request.attempts.mark_ready(now); + + if request.indexed.timestamp_sign_queue.is_none() { + request.indexed.timestamp_sign_queue = Some(now); + } - // NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures. - // TODO: we need to decide how to prioritize certain requests over others such as with gas or time of - // when the request made it into the NEAR network. - // issue: https://github.com/near/mpc-recovery/issues/596 if request.proposer == self.me { - self.my_requests.push_front(request.indexed.id); + // Older retries are appended to the back so newly indexed requests run first. + self.my_requests.push_back(request.indexed.id); if reorganized { crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) @@ -318,7 +391,15 @@ impl SignQueue { } pub fn push_failed(&mut self, sign_id: SignId) { - self.failed_requests.push_back(sign_id); + if let Some(request) = self.requests.get_mut(&sign_id) { + request.attempts.schedule_next_attempt(); + + if !self.failed_requests.contains(&sign_id) { + self.failed_requests.push_back(sign_id); + } + } else { + tracing::warn!(?sign_id, "failed sign request missing from queue"); + } } pub fn take_mine(&mut self) -> Option { @@ -336,42 +417,22 @@ impl SignQueue { } } - pub fn expire(&mut self, cfg: &ProtocolConfig) { - self.requests.retain(|_, request| { - request.indexed.timestamp_sign_queue.is_none_or(|t| { - t.elapsed() < Duration::from_millis(cfg.signature.generation_timeout_total) - }) - }); - self.my_requests.retain(|id| { - let Some(request) = self.requests.get(id) else { - // if we are unable to find the corresponding request, we can remove it. - return false; - }; - crate::util::duration_between_unix( - request.indexed.unix_timestamp_indexed, - crate::util::current_unix_timestamp(), - ) < request.indexed.total_timeout - }); - self.failed_requests.retain(|id| { - let Some(request) = self.requests.get(id) else { - // if we are unable to find the corresponding request, we can remove it. - return false; - }; - crate::util::duration_between_unix( - request.indexed.unix_timestamp_indexed, - crate::util::current_unix_timestamp(), - ) < request.indexed.total_timeout - }); + pub fn expire(&mut self, _cfg: &ProtocolConfig) { + self.my_requests.retain(|id| self.requests.contains_key(id)); + self.failed_requests + .retain(|id| self.requests.contains_key(id)); } pub fn remove(&mut self, sign_id: SignId) -> Option { + self.pending.remove(&sign_id); + self.my_requests.retain(|id| id != &sign_id); + self.failed_requests.retain(|id| id != &sign_id); self.requests.remove(&sign_id) } } enum SignError { Retry, - TotalTimeout, Aborted, } @@ -380,11 +441,11 @@ struct SignatureGenerator { protocol: SignatureProtocol, dropper: PresignatureTakenDropper, participants: Vec, + me: Participant, request: SignRequest, public_key: PublicKey, created: Instant, timeout: Duration, - timeout_total: Duration, inbox: mpsc::Receiver, msg: MessageChannel, rpc: RpcChannel, @@ -457,11 +518,11 @@ impl SignatureGenerator { protocol, dropper, participants, + me, request, public_key, created: Instant::now(), timeout: Duration::from_millis(cfg.signature.generation_timeout), - timeout_total: Duration::from_millis(cfg.signature.generation_timeout_total), inbox, msg, rpc, @@ -474,16 +535,6 @@ impl SignatureGenerator { }) } - fn timeout_total(&self) -> bool { - let timestamp = self - .request - .indexed - .timestamp_sign_queue - .as_ref() - .unwrap_or(&self.created); - timestamp.elapsed() >= self.timeout_total - } - /// Receive the next message for the signature protocol; error out on the timeout being reached /// or the channel having been closed (aborted). async fn recv(&mut self) -> Result { @@ -497,13 +548,21 @@ impl SignatureGenerator { { Ok(Some(msg)) => Ok(msg), Ok(None) => { - tracing::warn!(?sign_id, presignature_id, "signature generation aborted"); + tracing::warn!( + ?sign_id, + presignature_id, + proposer = ?self.request.proposer, + me = ?self.me, + "signature generation aborted", + ); Err(SignError::Aborted) } Err(_err) => { tracing::warn!( ?sign_id, presignature_id, + proposer = ?self.request.proposer, + me = ?self.me, "signature generation timeout, retrying..." ); Err(SignError::Retry) @@ -523,8 +582,6 @@ impl SignatureGenerator { crate::metrics::SIGNATURE_POKES_CNT.with_label_values(&[my_account_id.as_str()]); let signature_generator_failures_metric = crate::metrics::SIGNATURE_GENERATOR_FAILURES .with_label_values(&[my_account_id.as_str()]); - let signature_failures_metric = - crate::metrics::SIGNATURE_FAILURES.with_label_values(&[my_account_id.as_str()]); let poke_latency = crate::metrics::SIGNATURE_POKE_CPU_TIME.with_label_values(&[my_account_id.as_str()]); @@ -539,19 +596,6 @@ impl SignatureGenerator { .observe(self.created.elapsed().as_millis() as f64); loop { - if self.timeout_total() { - tracing::warn!( - ?sign_id, - presignature_id, - "signature generation timeout, exhausted all attempts" - ); - if self.request.proposer == me { - signature_generator_failures_metric.inc(); - signature_failures_metric.inc(); - } - break Err(SignError::TotalTimeout); - } - let poke_start_time = Instant::now(); let action = match self.protocol.poke() { Ok(action) => action, @@ -561,6 +605,9 @@ impl SignatureGenerator { ?err, "signature generation failed on protocol advancement", ); + if self.request.proposer == me { + signature_generator_failures_metric.inc(); + } break Err(SignError::Retry); } }; @@ -1088,9 +1135,12 @@ impl SignatureSpawner { match result { Err(SignError::Retry) => { + crate::metrics::SIGNATURE_FAILURES + .with_label_values(&[self.my_account_id.as_str()]) + .inc(); self.sign_queue.push_failed(sign_id); } - Ok(()) | Err(SignError::TotalTimeout) | Err(SignError::Aborted) => { + Ok(()) | Err(SignError::Aborted) => { self.sign_queue.remove(sign_id); } } @@ -1208,3 +1258,189 @@ impl PendingPresignature { } } } + +#[cfg(feature = "test-feature")] +static RETRY_BACKOFF_BASE_MS: AtomicU64 = AtomicU64::new(0); +#[cfg(feature = "test-feature")] +static RETRY_BACKOFF_MAX_MS: AtomicU64 = AtomicU64::new(0); + +#[cfg(feature = "test-feature")] +pub fn set_signature_retry_backoff(base_ms: u64, max_ms: u64) { + RETRY_BACKOFF_BASE_MS.store(base_ms, AtomicOrdering::SeqCst); + RETRY_BACKOFF_MAX_MS.store(max_ms, AtomicOrdering::SeqCst); +} + +#[cfg(feature = "test-feature")] +pub fn reset_signature_retry_backoff() { + RETRY_BACKOFF_BASE_MS.store(0, AtomicOrdering::SeqCst); + RETRY_BACKOFF_MAX_MS.store(0, AtomicOrdering::SeqCst); +} + +fn retry_backoff_base_ms() -> u64 { + #[cfg(feature = "test-feature")] + match RETRY_BACKOFF_BASE_MS.load(AtomicOrdering::SeqCst) { + 0 => 1_000, + value => value, + } + + #[cfg(not(feature = "test-feature"))] + 1_000 +} + +fn retry_backoff_max_ms() -> u64 { + #[cfg(feature = "test-feature")] + match RETRY_BACKOFF_MAX_MS.load(AtomicOrdering::SeqCst) { + 0 => 60_000, + value => value, + } + #[cfg(not(feature = "test-feature"))] + 60_000 +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::ParticipantInfo; + use k256::Scalar; + use near_account_id::AccountId; + use std::str::FromStr; + use std::sync::Arc; + use tokio::sync::RwLock; + + fn make_indexed_request(id: SignId) -> IndexedSignRequest { + IndexedSignRequest { + id, + args: SignArgs { + entropy: id.request_id, + epsilon: Scalar::ONE, + payload: Scalar::ONE, + path: "test".to_string(), + key_version: 0, + }, + chain: Chain::Ethereum, + unix_timestamp_indexed: 0, + timestamp_sign_queue: None, + total_timeout: Duration::from_secs(60), + sign_request_type: SignRequestType::Sign, + participants: None, + } + } + + fn setup_queue() -> ( + Participant, + BTreeSet, + Participants, + SignQueue, + AccountId, + mpsc::Sender, + ) { + let (tx, rx) = SignQueue::channel(); + let me = Participant::from(0u32); + let other = Participant::from(1u32); + let mut stable = BTreeSet::new(); + stable.insert(me); + stable.insert(other); + + let mut participants_map = Participants::default(); + participants_map.insert(&me, ParticipantInfo::new(0)); + participants_map.insert(&other, ParticipantInfo::new(1)); + + let account_id = AccountId::from_str("me.near").unwrap(); + let queue = SignQueue::new(me, Arc::new(RwLock::new(rx))); + + (me, stable, participants_map, queue, account_id, tx) + } + + #[tokio::test] + async fn retries_wait_until_ready() { + let (me, stable, participants, mut queue, account_id, _tx) = setup_queue(); + + let sign_id = SignId::new([1; 32]); + let mut indexed = make_indexed_request(sign_id); + indexed.participants = Some(vec![me]); + let request = queue.organize_request(&stable, &participants, indexed, 0); + queue.requests.insert(sign_id, request); + queue.push_failed(sign_id); + + { + let request = queue.requests.get_mut(&sign_id).unwrap(); + request.attempts.next_retry_at = Instant::now() + Duration::from_secs(10); + } + + queue.organize_failed(&stable, &participants, &account_id); + assert!(queue.my_requests.is_empty()); + assert!(queue.failed_requests.contains(&sign_id)); + + { + let request = queue.requests.get_mut(&sign_id).unwrap(); + request.attempts.next_retry_at = Instant::now() - Duration::from_secs(1); + } + + queue.organize_failed(&stable, &participants, &account_id); + assert_eq!(queue.my_requests.len(), 1); + assert_eq!(queue.my_requests.front(), Some(&sign_id)); + assert!(queue + .requests + .get(&sign_id) + .unwrap() + .attempts + .is_ready_now()); + assert!(!queue.failed_requests.contains(&sign_id)); + } + + #[tokio::test] + async fn prioritizes_new_requests_over_old_retries() { + let (me, stable, participants, mut queue, account_id, tx) = setup_queue(); + + let old_id = SignId::new([5; 32]); + let mut old_indexed = make_indexed_request(old_id); + old_indexed.participants = Some(vec![me]); + let old_request = queue.organize_request(&stable, &participants, old_indexed, 0); + queue.requests.insert(old_id, old_request); + queue.push_failed(old_id); + queue + .requests + .get_mut(&old_id) + .unwrap() + .attempts + .next_retry_at = Instant::now() - Duration::from_millis(100); + + let new_id = SignId::new([9; 32]); + let mut new_indexed = make_indexed_request(new_id); + new_indexed.participants = Some(vec![me]); + tx.try_send(new_indexed).unwrap(); + + queue.organize(&stable, &participants, &account_id).await; + + let first = queue.take_mine().expect("new request present"); + assert_eq!(first.indexed.id, new_id); + let second = queue.take_mine().expect("old request present"); + assert_eq!(second.indexed.id, old_id); + assert!(queue.my_requests.is_empty()); + } + + #[test] + fn push_failed_respects_backoff() { + let (me, stable, participants, mut queue, _account_id, _tx) = setup_queue(); + + let sign_id = SignId::new([3; 32]); + let mut indexed = make_indexed_request(sign_id); + indexed.participants = Some(vec![me]); + let request = queue.organize_request(&stable, &participants, indexed, 0); + queue.requests.insert(sign_id, request); + + let before = Instant::now(); + queue.push_failed(sign_id); + assert_eq!(queue.failed_requests.len(), 1); + let attempts = &queue.requests.get(&sign_id).unwrap().attempts; + assert_eq!(attempts.attempts, 1); + assert!(attempts.next_retry_at > before); + + queue.push_failed(sign_id); + let attempts = &queue.requests.get(&sign_id).unwrap().attempts; + assert_eq!(attempts.attempts, 2); + assert_eq!(queue.failed_requests.len(), 1); + assert!(attempts.next_retry_at > before); + assert!(!attempts.is_ready(Instant::now())); + } +} diff --git a/chain-signatures/node/src/util.rs b/chain-signatures/node/src/util.rs index b236c4306..91214b242 100644 --- a/chain-signatures/node/src/util.rs +++ b/chain-signatures/node/src/util.rs @@ -1,6 +1,5 @@ use mpc_crypto::{near_public_key_to_affine_point, PublicKey}; -use chrono::{DateTime, LocalResult, TimeZone, Utc}; use k256::elliptic_curve::sec1::{FromEncodedPoint, ToEncodedPoint}; use k256::{AffinePoint, EncodedPoint}; use tokio::task::{AbortHandle, JoinSet}; @@ -69,20 +68,6 @@ impl AffinePointExt for AffinePoint { } } -pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool { - if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) { - let timeout = Duration::from_millis(timeout); - let now_datetime: DateTime = Utc::now(); - // Calculate the difference in seconds - let elapsed_duration = now_datetime.signed_duration_since(msg_timestamp); - let timeout = chrono::Duration::seconds(timeout.as_secs() as i64) - + chrono::Duration::nanoseconds(timeout.subsec_nanos() as i64); - elapsed_duration > timeout - } else { - false - } -} - pub fn duration_between_unix(from_timestamp: u64, to_timestamp: u64) -> Duration { Duration::from_secs(to_timestamp - from_timestamp) } diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 4f4870dcb..561a6116c 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -89,7 +89,7 @@ struct NodeMessagingBuilder { outbox: MessageOutbox, /// allows dropping specific messages sent by this node - filter: MessageFilter, + filter: Vec, } impl Default for MpcFixtureBuilder { @@ -319,7 +319,7 @@ impl MpcFixtureBuilder { /// Specify a method that acts as message filter for all sent messages the given node. pub fn with_outgoing_message_filter(mut self, node_idx: usize, filter: MessageFilter) -> Self { - self.prepared_nodes[node_idx].messaging.filter = filter; + self.prepared_nodes[node_idx].messaging.filter.push(filter); self } @@ -390,7 +390,7 @@ impl MpcFixtureNodeBuilder { inbox, outbox, channel, - filter: Box::new(|_| true), + filter: vec![Box::new(|_| true)], }; MpcFixtureNodeBuilder { @@ -440,7 +440,7 @@ impl MpcFixtureNodeBuilder { // We have to start the inbox job before calling // `MpcSignProtocol::new_test` or else subscribing to messages will // await the subscription response forever. - let _inbox_handle = tokio::spawn( + let inbox_handle = tokio::spawn( self.messaging .inbox .run(config_rx.clone(), context.contract_state.clone()), @@ -458,7 +458,7 @@ impl MpcFixtureNodeBuilder { let account_id = protocol.my_account_id().clone(); let node = protocol::Node::new(); let node_state = node.watch(); - let _protocol_handle = tokio::spawn(protocol.run( + let protocol_handle = tokio::spawn(protocol.run( node, MockGovernance { me: account_id.clone(), @@ -471,7 +471,7 @@ impl MpcFixtureNodeBuilder { // handle outbox messages manually, we want them before they are // encrypted and we want to send them directly to other node's inboxes - let _mock_network_handle = fixture_tasks::test_mock_network( + let mock_network_handle = fixture_tasks::test_mock_network( context.routing_table, shared_output, self.messaging.outbox, @@ -491,6 +491,7 @@ impl MpcFixtureNodeBuilder { triple_storage, presignature_storage, web_handle: None, + tasks: vec![inbox_handle, protocol_handle, mock_network_handle], }; node.start_web_interface(self.participant_info.account_id); diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 213807aaf..4ccab3c10 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Sender; use tokio::sync::{watch, Mutex}; +use tokio::task::JoinHandle; pub struct MpcFixture { pub nodes: Vec, @@ -36,6 +37,7 @@ pub struct MpcFixtureNode { pub presignature_storage: PresignatureStorage, pub web_handle: Option>, + pub(crate) tasks: Vec>, } /// Logs for reading outputs after a test run for assertions and debugging. @@ -109,6 +111,18 @@ impl MpcFixtureNode { ); self.web_handle = Some(tokio::spawn(task)); } + + pub async fn shutdown(&mut self) { + let tasks = std::mem::take(&mut self.tasks); + + for handle in &tasks { + handle.abort(); + } + + for handle in tasks { + let _ = handle.await; + } + } } impl std::ops::Index for MpcFixture { @@ -119,6 +133,14 @@ impl std::ops::Index for MpcFixture { } } +impl Drop for MpcFixtureNode { + fn drop(&mut self) { + for handle in std::mem::take(&mut self.tasks) { + handle.abort(); + } + } +} + impl std::ops::IndexMut for MpcFixture { fn index_mut(&mut self, index: usize) -> &mut Self::Output { &mut self.nodes[index] diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index 91712f3b4..ad9d4fdc7 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -24,7 +24,7 @@ pub(super) fn test_mock_network( mut rpc_rx: Receiver, mesh: watch::Sender, config: watch::Sender, - mut filter: MessageFilter, + mut filters: Vec, ) -> JoinHandle<()> { let msg_log = Arc::clone(&shared_output.msg_log); let rpc_actions = Arc::clone(&shared_output.rpc_actions); @@ -49,9 +49,11 @@ pub(super) fn test_mock_network( }; msg_log.lock().await.push(format!("{log_msg} from {from:?} to {to:?}")); - if !filter(&send_message) { - tracing::info!("Dropping a message because it didn't pass the test's filter"); - continue; + for filter in filters.iter_mut() { + if !filter(&send_message) { + tracing::info!(?from, ?to, log_msg, "Dropping a message because it didn't pass the test's filter"); + continue; + } } @@ -90,7 +92,7 @@ pub(super) fn test_mock_network( ) }, }; - tracing::error!(target: "mock_network", ?action_str, "Received RPC action"); + tracing::info!(target: "mock_network", ?action_str, "Received RPC action"); let mut actions_log = rpc_actions.lock().await; actions_log.insert(action_str); } diff --git a/integration-tests/src/mpc_fixture/message_filters.rs b/integration-tests/src/mpc_fixture/message_filters.rs new file mode 100644 index 000000000..323c737f4 --- /dev/null +++ b/integration-tests/src/mpc_fixture/message_filters.rs @@ -0,0 +1,82 @@ +use crate::mpc_fixture::fixture_tasks::MessageFilter; +use cait_sith::protocol::Participant; +use mpc_node::protocol; +use mpc_node::protocol::message::SendMessage; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; + +/// Helper to drop signature messages sent by a specific participant. +/// +/// Tests can install the returned [`MessageFilter`] on a node via +/// [`MpcFixtureBuilder::with_outgoing_message_filter`](crate::mpc_fixture::MpcFixtureBuilder::with_outgoing_message_filter) +/// and keep the [`SignatureDropper`] handle around to toggle the behaviour at +/// runtime. This keeps the test code expressive about which leader is being +/// suppressed during retries. +#[derive(Clone)] +pub struct SignatureDropper { + participant: Participant, + enabled: Arc, + dropped: Arc, +} + +impl SignatureDropper { + /// Create a dropper for `participant` and the corresponding message filter. + pub fn new(participant: Participant) -> (Self, MessageFilter) { + let enabled = Arc::new(AtomicBool::new(false)); + let dropped = Arc::new(AtomicUsize::new(0)); + let filter_participant = participant; + let filter_enabled = Arc::clone(&enabled); + let filter_dropped = Arc::clone(&dropped); + + let filter: MessageFilter = Box::new(move |send_message: &SendMessage| { + if !filter_enabled.load(Ordering::SeqCst) { + return true; + } + + let (message, (from, _, _)) = send_message; + if *from != filter_participant { + return true; + } + + if matches!(message, protocol::Message::Signature(_)) { + filter_dropped.fetch_add(1, Ordering::SeqCst); + false + } else { + true + } + }); + + ( + SignatureDropper { + participant, + enabled, + dropped, + }, + filter, + ) + } + + /// Enable dropping signature messages for the configured participant. + pub fn enable(&self) { + self.enabled.store(true, Ordering::SeqCst); + } + + /// Disable dropping signature messages for the configured participant. + pub fn disable(&self) { + self.enabled.store(false, Ordering::SeqCst); + } + + /// Returns the participant whose messages are affected by this dropper. + pub fn participant(&self) -> Participant { + self.participant + } + + /// Returns whether the dropper is currently active. + pub fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::SeqCst) + } + + pub fn dropped(&self) -> usize { + self.dropped.load(Ordering::SeqCst) + } +} diff --git a/integration-tests/src/mpc_fixture/mod.rs b/integration-tests/src/mpc_fixture/mod.rs index 16bedc654..1585a3c4e 100644 --- a/integration-tests/src/mpc_fixture/mod.rs +++ b/integration-tests/src/mpc_fixture/mod.rs @@ -7,6 +7,8 @@ pub mod fixture_interface; pub mod fixture_tasks; pub mod input; pub mod mock_governance; +pub mod message_filters; pub use builder::MpcFixtureBuilder; pub use fixture_interface::{MpcFixture, MpcFixtureNode}; +pub use message_filters::SignatureDropper; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index a3c5db973..f81ec9d60 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -1,12 +1,15 @@ +use cait_sith::protocol::Participant; use deadpool_redis::redis::AsyncCommands; use integration_tests::mpc_fixture::fixture_tasks::MessageFilter; -use integration_tests::mpc_fixture::MpcFixtureBuilder; +use integration_tests::mpc_fixture::{MpcFixtureBuilder, SignatureDropper}; use mpc_node::protocol::presignature::Presignature; +use mpc_node::protocol::signature::{reset_signature_retry_backoff, set_signature_retry_backoff}; use mpc_node::protocol::triple::Triple; use mpc_node::protocol::SignRequestType; use mpc_node::protocol::{Chain, IndexedSignRequest, ProtocolState}; use mpc_primitives::{SignArgs, SignId, LATEST_MPC_KEY_VERSION}; -use std::collections::BTreeMap; +use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; +use std::collections::{BTreeMap, BTreeSet}; use std::fs; use std::time::Duration; @@ -184,6 +187,190 @@ async fn test_basic_sign() { ); } +#[tokio::test(flavor = "multi_thread")] +async fn test_sign_request_retries_after_failure() { + const NUM_NODES: usize = 3; + + let mut droppers = Vec::with_capacity(NUM_NODES); + + let mut builder = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_signature_timeout_ms(1_000); + + for idx in 0..NUM_NODES { + let participant = Participant::from(idx as u32); + let (dropper, filter) = SignatureDropper::new(participant); + builder = builder.with_outgoing_message_filter(idx, filter); + droppers.push(dropper); + } + + let network = builder.build().await; + + for dropper in &droppers { + dropper.enable(); + } + + // Signature timeout should abort the task in 1seconds. + let drop_handle = { + let droppers = droppers.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + for dropper in &droppers { + dropper.disable(); + } + }) + }; + + tokio::time::timeout( + Duration::from_millis(300), + network.wait_for_presignatures(2), + ) + .await + .expect("should start with enough presignatures"); + + let request = sign_request(1); + for node in &network.nodes { + node.sign_tx.send(request.clone()).await.unwrap(); + } + + let start = std::time::Instant::now(); + let actions = tokio::time::timeout(Duration::from_secs(20), network.wait_for_actions(1)) + .await + .expect("should publish RPC action eventually"); + + drop_handle.await.unwrap(); + + let dropped_messages: usize = droppers.iter().map(SignatureDropper::dropped).sum(); + assert!( + actions + .iter() + .any(|action| action.contains("RpcAction::Publish")), + "unexpected rpc actions: {actions:?}" + ); + assert!( + dropped_messages > 0, + "expected test filter to drop at least one signature message" + ); + assert!( + start.elapsed() >= Duration::from_secs(2), + "signature completed too quickly, expected a retry delay" + ); +} + +struct RetryBackoffGuard; + +impl Drop for RetryBackoffGuard { + fn drop(&mut self) { + reset_signature_retry_backoff(); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_sign_request_retries_multiple_times() { + let _guard = RetryBackoffGuard; + set_signature_retry_backoff(10, 100); + + const NUM_NODES: usize = 3; + const TARGET_ATTEMPTS: usize = 10; + + let mut signature_droppers = Vec::with_capacity(NUM_NODES); + + let mut builder = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_signature_timeout_ms(200); + + for idx in 0..NUM_NODES { + let participant = Participant::from(idx as u32); + let (dropper, filter) = SignatureDropper::new(participant); + builder = builder.with_outgoing_message_filter(idx, filter); + signature_droppers.push(dropper); + } + + let network = builder.build().await; + + tokio::time::timeout( + Duration::from_millis(300), + network.wait_for_presignatures(2), + ) + .await + .expect("should start with enough presignatures"); + + { + let mut log = network.output.msg_log.lock().await; + log.clear(); + } + + for dropper in &signature_droppers { + dropper.enable(); + } + + let request = sign_request(3); + for node in &network.nodes { + node.sign_tx.send(request.clone()).await.unwrap(); + } + + let required_messages = TARGET_ATTEMPTS * (network.nodes.len() - 1); + + tokio::time::timeout(Duration::from_secs(10), async { + loop { + let signature_messages = { + let log = network.output.msg_log.lock().await; + log.iter() + .filter(|entry| entry.starts_with("Signature")) + .count() + }; + + if signature_messages >= required_messages { + break signature_messages; + } + + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await + .expect("expected at least ten signature retries before timeout"); + + for dropper in &signature_droppers { + dropper.disable(); + } + + tokio::time::sleep(Duration::from_millis(50)).await; + + let dropped_messages: usize = signature_droppers + .iter() + .map(SignatureDropper::dropped) + .sum(); + + assert!( + dropped_messages > 0, + "expected to drop at least one signature message before retries succeeded" + ); + + let actions = tokio::time::timeout(Duration::from_secs(10), network.wait_for_actions(1)) + .await + .expect("signature should eventually publish after retries"); + + assert!( + actions + .iter() + .any(|action| action.contains("RpcAction::Publish")), + "unexpected rpc actions: {actions:?}" + ); + + let signature_messages = { + let log = network.output.msg_log.lock().await; + log.iter() + .filter(|entry| entry.starts_with("Signature")) + .count() + }; + + let attempts = signature_messages / (network.nodes.len() - 1); + assert!( + attempts >= TARGET_ATTEMPTS, + "expected at least {TARGET_ATTEMPTS} retries, observed {attempts}" + ); +} + fn sign_request(seed: u8) -> IndexedSignRequest { IndexedSignRequest { id: SignId::new([seed; 32]), @@ -197,6 +384,37 @@ fn sign_request(seed: u8) -> IndexedSignRequest { } } +fn initial_proposer_for_request( + request: &IndexedSignRequest, + participants: &[Participant], + stable: &BTreeSet, +) -> Participant { + assert!( + !participants.is_empty(), + "expected at least one participant in the mesh snapshot" + ); + + let mut ordered_participants = participants.to_vec(); + ordered_participants.sort(); + + let entropy = request.args.entropy; + let offset = entropy[0] as usize; + + for round in 0..512 { + let idx = (offset + round) % ordered_participants.len(); + let candidate = ordered_participants[idx]; + if stable.contains(&candidate) { + return candidate; + } + } + + let mut rng = StdRng::from_seed(entropy); + *stable + .iter() + .choose(&mut rng) + .expect("stable set should not be empty") +} + fn sign_arg(seed: u8) -> SignArgs { let mut entropy = [1; 32]; entropy[0] = seed; From 3780843bb1a76e9514a4e03ae66a86410e7dd578 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Fri, 17 Oct 2025 21:49:47 +0000 Subject: [PATCH 2/8] Fix clippy & fmt --- integration-tests/src/mpc_fixture/mod.rs | 2 +- integration-tests/tests/cases/mpc.rs | 31 ------------------------ 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/integration-tests/src/mpc_fixture/mod.rs b/integration-tests/src/mpc_fixture/mod.rs index 1585a3c4e..60c03c39c 100644 --- a/integration-tests/src/mpc_fixture/mod.rs +++ b/integration-tests/src/mpc_fixture/mod.rs @@ -6,8 +6,8 @@ pub mod builder; pub mod fixture_interface; pub mod fixture_tasks; pub mod input; -pub mod mock_governance; pub mod message_filters; +pub mod mock_governance; pub use builder::MpcFixtureBuilder; pub use fixture_interface::{MpcFixture, MpcFixtureNode}; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index f81ec9d60..343eb4a88 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -384,37 +384,6 @@ fn sign_request(seed: u8) -> IndexedSignRequest { } } -fn initial_proposer_for_request( - request: &IndexedSignRequest, - participants: &[Participant], - stable: &BTreeSet, -) -> Participant { - assert!( - !participants.is_empty(), - "expected at least one participant in the mesh snapshot" - ); - - let mut ordered_participants = participants.to_vec(); - ordered_participants.sort(); - - let entropy = request.args.entropy; - let offset = entropy[0] as usize; - - for round in 0..512 { - let idx = (offset + round) % ordered_participants.len(); - let candidate = ordered_participants[idx]; - if stable.contains(&candidate) { - return candidate; - } - } - - let mut rng = StdRng::from_seed(entropy); - *stable - .iter() - .choose(&mut rng) - .expect("stable set should not be empty") -} - fn sign_arg(seed: u8) -> SignArgs { let mut entropy = [1; 32]; entropy[0] = seed; From 95dfb18f63f56a755dcd410a0adb5329398c28bd Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Tue, 21 Oct 2025 21:59:42 +0000 Subject: [PATCH 3/8] Remove exponential backoff --- .../node/src/protocol/signature.rs | 62 +++---------------- integration-tests/tests/cases/mpc.rs | 15 +---- 2 files changed, 8 insertions(+), 69 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 5c8301506..140aaaef0 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -33,9 +33,6 @@ use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task::{JoinHandle, JoinSet}; -#[cfg(feature = "test-feature")] -use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering}; - use near_account_id::AccountId; /// This is the maximum amount of sign requests that we can accept in the network. @@ -161,13 +158,7 @@ impl Attempts { } fn compute_retry_delay(&self) -> Duration { - let base_ms = retry_backoff_base_ms(); - let exponent = self.attempts.saturating_sub(1).min(6); - let multiplier = 1u32 << exponent; - let delay_ms = base_ms - .saturating_mul(u64::from(multiplier)) - .min(retry_backoff_max_ms()); - Duration::from_millis(delay_ms) + Duration::ZERO } } @@ -368,8 +359,7 @@ impl SignQueue { reorganized = true; } - // Delay information is reset since we're ready to try again now that the - // backoff window has elapsed. + // Ensure the request is marked ready before requeueing it. request.attempts.mark_ready(now); if request.indexed.timestamp_sign_queue.is_none() { @@ -1259,44 +1249,6 @@ impl PendingPresignature { } } -#[cfg(feature = "test-feature")] -static RETRY_BACKOFF_BASE_MS: AtomicU64 = AtomicU64::new(0); -#[cfg(feature = "test-feature")] -static RETRY_BACKOFF_MAX_MS: AtomicU64 = AtomicU64::new(0); - -#[cfg(feature = "test-feature")] -pub fn set_signature_retry_backoff(base_ms: u64, max_ms: u64) { - RETRY_BACKOFF_BASE_MS.store(base_ms, AtomicOrdering::SeqCst); - RETRY_BACKOFF_MAX_MS.store(max_ms, AtomicOrdering::SeqCst); -} - -#[cfg(feature = "test-feature")] -pub fn reset_signature_retry_backoff() { - RETRY_BACKOFF_BASE_MS.store(0, AtomicOrdering::SeqCst); - RETRY_BACKOFF_MAX_MS.store(0, AtomicOrdering::SeqCst); -} - -fn retry_backoff_base_ms() -> u64 { - #[cfg(feature = "test-feature")] - match RETRY_BACKOFF_BASE_MS.load(AtomicOrdering::SeqCst) { - 0 => 1_000, - value => value, - } - - #[cfg(not(feature = "test-feature"))] - 1_000 -} - -fn retry_backoff_max_ms() -> u64 { - #[cfg(feature = "test-feature")] - match RETRY_BACKOFF_MAX_MS.load(AtomicOrdering::SeqCst) { - 0 => 60_000, - value => value, - } - #[cfg(not(feature = "test-feature"))] - 60_000 -} - #[cfg(test)] mod tests { use super::*; @@ -1420,7 +1372,7 @@ mod tests { } #[test] - fn push_failed_respects_backoff() { + fn push_failed_is_ready_immediately() { let (me, stable, participants, mut queue, _account_id, _tx) = setup_queue(); let sign_id = SignId::new([3; 32]); @@ -1429,18 +1381,18 @@ mod tests { let request = queue.organize_request(&stable, &participants, indexed, 0); queue.requests.insert(sign_id, request); - let before = Instant::now(); queue.push_failed(sign_id); assert_eq!(queue.failed_requests.len(), 1); let attempts = &queue.requests.get(&sign_id).unwrap().attempts; assert_eq!(attempts.attempts, 1); - assert!(attempts.next_retry_at > before); + assert!(attempts.next_retry_at <= Instant::now()); + assert!(attempts.is_ready(Instant::now())); queue.push_failed(sign_id); let attempts = &queue.requests.get(&sign_id).unwrap().attempts; assert_eq!(attempts.attempts, 2); assert_eq!(queue.failed_requests.len(), 1); - assert!(attempts.next_retry_at > before); - assert!(!attempts.is_ready(Instant::now())); + assert!(attempts.next_retry_at <= Instant::now()); + assert!(attempts.is_ready(Instant::now())); } } diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 343eb4a88..8d497a12c 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -3,13 +3,11 @@ use deadpool_redis::redis::AsyncCommands; use integration_tests::mpc_fixture::fixture_tasks::MessageFilter; use integration_tests::mpc_fixture::{MpcFixtureBuilder, SignatureDropper}; use mpc_node::protocol::presignature::Presignature; -use mpc_node::protocol::signature::{reset_signature_retry_backoff, set_signature_retry_backoff}; use mpc_node::protocol::triple::Triple; use mpc_node::protocol::SignRequestType; use mpc_node::protocol::{Chain, IndexedSignRequest, ProtocolState}; use mpc_primitives::{SignArgs, SignId, LATEST_MPC_KEY_VERSION}; -use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fs; use std::time::Duration; @@ -257,19 +255,8 @@ async fn test_sign_request_retries_after_failure() { ); } -struct RetryBackoffGuard; - -impl Drop for RetryBackoffGuard { - fn drop(&mut self) { - reset_signature_retry_backoff(); - } -} - #[tokio::test(flavor = "multi_thread")] async fn test_sign_request_retries_multiple_times() { - let _guard = RetryBackoffGuard; - set_signature_retry_backoff(10, 100); - const NUM_NODES: usize = 3; const TARGET_ATTEMPTS: usize = 10; From 452b551748e78455be7997066d616ee2cca34a91 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Tue, 21 Oct 2025 22:03:48 +0000 Subject: [PATCH 4/8] Remove attempts --- .../node/src/protocol/signature.rs | 127 +----------------- 1 file changed, 1 insertion(+), 126 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 140aaaef0..4eef9cb6d 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -101,7 +101,6 @@ pub struct SignRequest { pub proposer: Participant, pub stable: BTreeSet, pub round: usize, - attempts: Attempts, } pub struct SignQueue { @@ -120,48 +119,6 @@ pub struct SignQueue { pending: HashMap>, } -#[derive(Debug, Clone, PartialEq)] -pub struct Attempts { - attempts: u32, - next_retry_at: Instant, -} - -impl Default for Attempts { - fn default() -> Self { - Self::new() - } -} - -impl Attempts { - pub fn new() -> Self { - Self { - attempts: 0, - next_retry_at: Instant::now(), - } - } - - pub fn schedule_next_attempt(&mut self) { - self.attempts = self.attempts.saturating_add(1); - self.next_retry_at = Instant::now() + self.compute_retry_delay(); - } - - pub fn is_ready(&self, now: Instant) -> bool { - self.next_retry_at <= now - } - - pub fn is_ready_now(&self) -> bool { - self.is_ready(Instant::now()) - } - - pub fn mark_ready(&mut self, now: Instant) { - self.next_retry_at = now; - } - - fn compute_retry_delay(&self) -> Duration { - Duration::ZERO - } -} - impl SignQueue { pub fn channel() -> ( mpsc::Sender, @@ -269,7 +226,6 @@ impl SignQueue { proposer, stable: stable.clone(), round, - attempts: Attempts::new(), } } @@ -344,24 +300,13 @@ impl SignQueue { continue; }; - if !request.attempts.is_ready(now) { - self.requests.insert(id, request); - self.failed_requests.push_back(id); - continue; - } - let mut reorganized = false; if &request.stable != stable { - let attempts = request.attempts; request = self.organize_request(stable, participants, request.indexed, request.round); - request.attempts = attempts; reorganized = true; } - // Ensure the request is marked ready before requeueing it. - request.attempts.mark_ready(now); - if request.indexed.timestamp_sign_queue.is_none() { request.indexed.timestamp_sign_queue = Some(now); } @@ -381,9 +326,7 @@ impl SignQueue { } pub fn push_failed(&mut self, sign_id: SignId) { - if let Some(request) = self.requests.get_mut(&sign_id) { - request.attempts.schedule_next_attempt(); - + if self.requests.contains_key(&sign_id) { if !self.failed_requests.contains(&sign_id) { self.failed_requests.push_back(sign_id); } @@ -1303,43 +1246,6 @@ mod tests { (me, stable, participants_map, queue, account_id, tx) } - #[tokio::test] - async fn retries_wait_until_ready() { - let (me, stable, participants, mut queue, account_id, _tx) = setup_queue(); - - let sign_id = SignId::new([1; 32]); - let mut indexed = make_indexed_request(sign_id); - indexed.participants = Some(vec![me]); - let request = queue.organize_request(&stable, &participants, indexed, 0); - queue.requests.insert(sign_id, request); - queue.push_failed(sign_id); - - { - let request = queue.requests.get_mut(&sign_id).unwrap(); - request.attempts.next_retry_at = Instant::now() + Duration::from_secs(10); - } - - queue.organize_failed(&stable, &participants, &account_id); - assert!(queue.my_requests.is_empty()); - assert!(queue.failed_requests.contains(&sign_id)); - - { - let request = queue.requests.get_mut(&sign_id).unwrap(); - request.attempts.next_retry_at = Instant::now() - Duration::from_secs(1); - } - - queue.organize_failed(&stable, &participants, &account_id); - assert_eq!(queue.my_requests.len(), 1); - assert_eq!(queue.my_requests.front(), Some(&sign_id)); - assert!(queue - .requests - .get(&sign_id) - .unwrap() - .attempts - .is_ready_now()); - assert!(!queue.failed_requests.contains(&sign_id)); - } - #[tokio::test] async fn prioritizes_new_requests_over_old_retries() { let (me, stable, participants, mut queue, account_id, tx) = setup_queue(); @@ -1350,12 +1256,6 @@ mod tests { let old_request = queue.organize_request(&stable, &participants, old_indexed, 0); queue.requests.insert(old_id, old_request); queue.push_failed(old_id); - queue - .requests - .get_mut(&old_id) - .unwrap() - .attempts - .next_retry_at = Instant::now() - Duration::from_millis(100); let new_id = SignId::new([9; 32]); let mut new_indexed = make_indexed_request(new_id); @@ -1370,29 +1270,4 @@ mod tests { assert_eq!(second.indexed.id, old_id); assert!(queue.my_requests.is_empty()); } - - #[test] - fn push_failed_is_ready_immediately() { - let (me, stable, participants, mut queue, _account_id, _tx) = setup_queue(); - - let sign_id = SignId::new([3; 32]); - let mut indexed = make_indexed_request(sign_id); - indexed.participants = Some(vec![me]); - let request = queue.organize_request(&stable, &participants, indexed, 0); - queue.requests.insert(sign_id, request); - - queue.push_failed(sign_id); - assert_eq!(queue.failed_requests.len(), 1); - let attempts = &queue.requests.get(&sign_id).unwrap().attempts; - assert_eq!(attempts.attempts, 1); - assert!(attempts.next_retry_at <= Instant::now()); - assert!(attempts.is_ready(Instant::now())); - - queue.push_failed(sign_id); - let attempts = &queue.requests.get(&sign_id).unwrap().attempts; - assert_eq!(attempts.attempts, 2); - assert_eq!(queue.failed_requests.len(), 1); - assert!(attempts.next_retry_at <= Instant::now()); - assert!(attempts.is_ready(Instant::now())); - } } From 93dd5bdc703b1b78472de5ab55550eae30e1b2d3 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 22 Oct 2025 07:39:55 +0000 Subject: [PATCH 5/8] Fix test --- integration-tests/src/mpc_fixture/fixture_tasks.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index ad9d4fdc7..8e787ab1a 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -49,13 +49,19 @@ pub(super) fn test_mock_network( }; msg_log.lock().await.push(format!("{log_msg} from {from:?} to {to:?}")); + let mut should_filter = true; for filter in filters.iter_mut() { if !filter(&send_message) { tracing::info!(?from, ?to, log_msg, "Dropping a message because it didn't pass the test's filter"); - continue; + should_filter = false; + break; } } + if !should_filter { + continue; + } + // directly send out single message, no batching // (might want to add MessageOutbox, too, but for now this is easier) From 8f261608607f0ee13f28aee46be7a38e028f845f Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 29 Oct 2025 00:45:47 +0000 Subject: [PATCH 6/8] Fix test --- integration-tests/tests/cases/mpc.rs | 42 +++++++++++++++++++--------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 8d497a12c..ebc43d894 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -208,17 +208,6 @@ async fn test_sign_request_retries_after_failure() { dropper.enable(); } - // Signature timeout should abort the task in 1seconds. - let drop_handle = { - let droppers = droppers.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(5)).await; - for dropper in &droppers { - dropper.disable(); - } - }) - }; - tokio::time::timeout( Duration::from_millis(300), network.wait_for_presignatures(2), @@ -232,12 +221,39 @@ async fn test_sign_request_retries_after_failure() { } let start = std::time::Instant::now(); + + // Wait until at least one signature attempt is dropped to prove a retry occurred. + tokio::time::timeout(Duration::from_secs(5), async { + loop { + let dropped = droppers + .iter() + .map(SignatureDropper::dropped) + .sum::(); + + if dropped > 0 { + break; + } + + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await + .expect("expected to drop at least one signature message while filters enabled"); + + if start.elapsed() < Duration::from_secs(2) { + tokio::time::sleep(Duration::from_secs(2) - start.elapsed()).await; + } + + for dropper in &droppers { + dropper.disable(); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + let actions = tokio::time::timeout(Duration::from_secs(20), network.wait_for_actions(1)) .await .expect("should publish RPC action eventually"); - drop_handle.await.unwrap(); - let dropped_messages: usize = droppers.iter().map(SignatureDropper::dropped).sum(); assert!( actions From e42046eb46b510836eb3e0e510151f38904f55cc Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 29 Oct 2025 00:51:45 +0000 Subject: [PATCH 7/8] Correct filter naming --- integration-tests/src/mpc_fixture/builder.rs | 3 ++- integration-tests/src/mpc_fixture/fixture_tasks.rs | 9 +++++---- integration-tests/src/mpc_fixture/message_filters.rs | 8 ++++---- integration-tests/tests/cases/mpc.rs | 11 +++++------ 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 561a6116c..4c5f89d50 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -318,6 +318,7 @@ impl MpcFixtureBuilder { } /// Specify a method that acts as message filter for all sent messages the given node. + /// if the filter returns true, the message is dropped, otherwise it is sent. pub fn with_outgoing_message_filter(mut self, node_idx: usize, filter: MessageFilter) -> Self { self.prepared_nodes[node_idx].messaging.filter.push(filter); self @@ -390,7 +391,7 @@ impl MpcFixtureNodeBuilder { inbox, outbox, channel, - filter: vec![Box::new(|_| true)], + filter: vec![Box::new(|_| false)], }; MpcFixtureNodeBuilder { diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index 8e787ab1a..8d3ebc1e5 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -49,16 +49,17 @@ pub(super) fn test_mock_network( }; msg_log.lock().await.push(format!("{log_msg} from {from:?} to {to:?}")); - let mut should_filter = true; + // Default to forwarding the message unless a filter votes to drop it. + let mut should_filter = false; for filter in filters.iter_mut() { - if !filter(&send_message) { + if filter(&send_message) { tracing::info!(?from, ?to, log_msg, "Dropping a message because it didn't pass the test's filter"); - should_filter = false; + should_filter = true; break; } } - if !should_filter { + if should_filter { continue; } diff --git a/integration-tests/src/mpc_fixture/message_filters.rs b/integration-tests/src/mpc_fixture/message_filters.rs index 323c737f4..c1c9379ed 100644 --- a/integration-tests/src/mpc_fixture/message_filters.rs +++ b/integration-tests/src/mpc_fixture/message_filters.rs @@ -30,19 +30,19 @@ impl SignatureDropper { let filter: MessageFilter = Box::new(move |send_message: &SendMessage| { if !filter_enabled.load(Ordering::SeqCst) { - return true; + return false; } let (message, (from, _, _)) = send_message; if *from != filter_participant { - return true; + return false; } if matches!(message, protocol::Message::Signature(_)) { filter_dropped.fetch_add(1, Ordering::SeqCst); - false - } else { true + } else { + false } }); diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index ebc43d894..9291c208f 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -406,15 +406,14 @@ async fn test_presignature_timeout() { fn create_filter() -> MessageFilter { let mut drop_counter = 20; Box::new(move |(msg, _)| { - let pass = match msg { - mpc_node::protocol::Message::Presignature(_) => drop_counter == 0, - _ => true, - }; + let should_drop = matches!(msg, mpc_node::protocol::Message::Presignature(_)) + && drop_counter > 0; - if !pass { + if should_drop { drop_counter -= 1; } - pass + + should_drop }) } From 0d05805886172a585993a605bd51a4f055f79aa7 Mon Sep 17 00:00:00 2001 From: "Phuong N." Date: Wed, 29 Oct 2025 05:08:46 +0000 Subject: [PATCH 8/8] cargo fmt --- integration-tests/tests/cases/mpc.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 9291c208f..5896af8ac 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -406,8 +406,8 @@ async fn test_presignature_timeout() { fn create_filter() -> MessageFilter { let mut drop_counter = 20; Box::new(move |(msg, _)| { - let should_drop = matches!(msg, mpc_node::protocol::Message::Presignature(_)) - && drop_counter > 0; + let should_drop = + matches!(msg, mpc_node::protocol::Message::Presignature(_)) && drop_counter > 0; if should_drop { drop_counter -= 1;