diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index d11fbec7..4eef9cb6 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -258,10 +258,13 @@ impl SignQueue { .with_label_values(&[indexed.chain.as_str(), my_account_id.as_str()]) .inc(); - let request = self.organize_request(stable, participants, indexed, 0); + let mut request = self.organize_request(stable, participants, indexed, 0); + if request.indexed.timestamp_sign_queue.is_none() { + request.indexed.timestamp_sign_queue = Some(Instant::now()); + } let is_mine = request.proposer == self.me; if is_mine { - self.my_requests.push_back(sign_id); + self.my_requests.push_front(sign_id); crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) .inc(); @@ -286,26 +289,31 @@ impl SignQueue { participants: &Participants, my_account_id: &AccountId, ) { - while let Some(id) = self.failed_requests.pop_front() { - let Some(request) = self.requests.remove(&id) else { + let now = Instant::now(); + let len = self.failed_requests.len(); + for _ in 0..len { + let Some(id) = self.failed_requests.pop_front() else { + break; + }; + + let Some(mut request) = self.requests.remove(&id) else { continue; }; - let (reorganized, request) = if &request.stable == stable { - // just use the same request if the participants are the same - (false, request) - } else { - let request = + let mut reorganized = false; + if &request.stable != stable { + request = self.organize_request(stable, participants, request.indexed, request.round); - (true, request) - }; + reorganized = true; + } + + if request.indexed.timestamp_sign_queue.is_none() { + request.indexed.timestamp_sign_queue = Some(now); + } - // NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures. - // TODO: we need to decide how to prioritize certain requests over others such as with gas or time of - // when the request made it into the NEAR network. - // issue: https://github.com/near/mpc-recovery/issues/596 if request.proposer == self.me { - self.my_requests.push_front(request.indexed.id); + // Older retries are appended to the back so newly indexed requests run first. + self.my_requests.push_back(request.indexed.id); if reorganized { crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) @@ -318,7 +326,13 @@ impl SignQueue { } pub fn push_failed(&mut self, sign_id: SignId) { - self.failed_requests.push_back(sign_id); + if self.requests.contains_key(&sign_id) { + if !self.failed_requests.contains(&sign_id) { + self.failed_requests.push_back(sign_id); + } + } else { + tracing::warn!(?sign_id, "failed sign request missing from queue"); + } } pub fn take_mine(&mut self) -> Option { @@ -336,42 +350,22 @@ impl SignQueue { } } - pub fn expire(&mut self, cfg: &ProtocolConfig) { - self.requests.retain(|_, request| { - request.indexed.timestamp_sign_queue.is_none_or(|t| { - t.elapsed() < Duration::from_millis(cfg.signature.generation_timeout_total) - }) - }); - self.my_requests.retain(|id| { - let Some(request) = self.requests.get(id) else { - // if we are unable to find the corresponding request, we can remove it. - return false; - }; - crate::util::duration_between_unix( - request.indexed.unix_timestamp_indexed, - crate::util::current_unix_timestamp(), - ) < request.indexed.total_timeout - }); - self.failed_requests.retain(|id| { - let Some(request) = self.requests.get(id) else { - // if we are unable to find the corresponding request, we can remove it. - return false; - }; - crate::util::duration_between_unix( - request.indexed.unix_timestamp_indexed, - crate::util::current_unix_timestamp(), - ) < request.indexed.total_timeout - }); + pub fn expire(&mut self, _cfg: &ProtocolConfig) { + self.my_requests.retain(|id| self.requests.contains_key(id)); + self.failed_requests + .retain(|id| self.requests.contains_key(id)); } pub fn remove(&mut self, sign_id: SignId) -> Option { + self.pending.remove(&sign_id); + self.my_requests.retain(|id| id != &sign_id); + self.failed_requests.retain(|id| id != &sign_id); self.requests.remove(&sign_id) } } enum SignError { Retry, - TotalTimeout, Aborted, } @@ -380,11 +374,11 @@ struct SignatureGenerator { protocol: SignatureProtocol, dropper: PresignatureTakenDropper, participants: Vec, + me: Participant, request: SignRequest, public_key: PublicKey, created: Instant, timeout: Duration, - timeout_total: Duration, inbox: mpsc::Receiver, msg: MessageChannel, rpc: RpcChannel, @@ -457,11 +451,11 @@ impl SignatureGenerator { protocol, dropper, participants, + me, request, public_key, created: Instant::now(), timeout: Duration::from_millis(cfg.signature.generation_timeout), - timeout_total: Duration::from_millis(cfg.signature.generation_timeout_total), inbox, msg, rpc, @@ -474,16 +468,6 @@ impl SignatureGenerator { }) } - fn timeout_total(&self) -> bool { - let timestamp = self - .request - .indexed - .timestamp_sign_queue - .as_ref() - .unwrap_or(&self.created); - timestamp.elapsed() >= self.timeout_total - } - /// Receive the next message for the signature protocol; error out on the timeout being reached /// or the channel having been closed (aborted). async fn recv(&mut self) -> Result { @@ -497,13 +481,21 @@ impl SignatureGenerator { { Ok(Some(msg)) => Ok(msg), Ok(None) => { - tracing::warn!(?sign_id, presignature_id, "signature generation aborted"); + tracing::warn!( + ?sign_id, + presignature_id, + proposer = ?self.request.proposer, + me = ?self.me, + "signature generation aborted", + ); Err(SignError::Aborted) } Err(_err) => { tracing::warn!( ?sign_id, presignature_id, + proposer = ?self.request.proposer, + me = ?self.me, "signature generation timeout, retrying..." ); Err(SignError::Retry) @@ -523,8 +515,6 @@ impl SignatureGenerator { crate::metrics::SIGNATURE_POKES_CNT.with_label_values(&[my_account_id.as_str()]); let signature_generator_failures_metric = crate::metrics::SIGNATURE_GENERATOR_FAILURES .with_label_values(&[my_account_id.as_str()]); - let signature_failures_metric = - crate::metrics::SIGNATURE_FAILURES.with_label_values(&[my_account_id.as_str()]); let poke_latency = crate::metrics::SIGNATURE_POKE_CPU_TIME.with_label_values(&[my_account_id.as_str()]); @@ -539,19 +529,6 @@ impl SignatureGenerator { .observe(self.created.elapsed().as_millis() as f64); loop { - if self.timeout_total() { - tracing::warn!( - ?sign_id, - presignature_id, - "signature generation timeout, exhausted all attempts" - ); - if self.request.proposer == me { - signature_generator_failures_metric.inc(); - signature_failures_metric.inc(); - } - break Err(SignError::TotalTimeout); - } - let poke_start_time = Instant::now(); let action = match self.protocol.poke() { Ok(action) => action, @@ -561,6 +538,9 @@ impl SignatureGenerator { ?err, "signature generation failed on protocol advancement", ); + if self.request.proposer == me { + signature_generator_failures_metric.inc(); + } break Err(SignError::Retry); } }; @@ -1088,9 +1068,12 @@ impl SignatureSpawner { match result { Err(SignError::Retry) => { + crate::metrics::SIGNATURE_FAILURES + .with_label_values(&[self.my_account_id.as_str()]) + .inc(); self.sign_queue.push_failed(sign_id); } - Ok(()) | Err(SignError::TotalTimeout) | Err(SignError::Aborted) => { + Ok(()) | Err(SignError::Aborted) => { self.sign_queue.remove(sign_id); } } @@ -1208,3 +1191,83 @@ impl PendingPresignature { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::ParticipantInfo; + use k256::Scalar; + use near_account_id::AccountId; + use std::str::FromStr; + use std::sync::Arc; + use tokio::sync::RwLock; + + fn make_indexed_request(id: SignId) -> IndexedSignRequest { + IndexedSignRequest { + id, + args: SignArgs { + entropy: id.request_id, + epsilon: Scalar::ONE, + payload: Scalar::ONE, + path: "test".to_string(), + key_version: 0, + }, + chain: Chain::Ethereum, + unix_timestamp_indexed: 0, + timestamp_sign_queue: None, + total_timeout: Duration::from_secs(60), + sign_request_type: SignRequestType::Sign, + participants: None, + } + } + + fn setup_queue() -> ( + Participant, + BTreeSet, + Participants, + SignQueue, + AccountId, + mpsc::Sender, + ) { + let (tx, rx) = SignQueue::channel(); + let me = Participant::from(0u32); + let other = Participant::from(1u32); + let mut stable = BTreeSet::new(); + stable.insert(me); + stable.insert(other); + + let mut participants_map = Participants::default(); + participants_map.insert(&me, ParticipantInfo::new(0)); + participants_map.insert(&other, ParticipantInfo::new(1)); + + let account_id = AccountId::from_str("me.near").unwrap(); + let queue = SignQueue::new(me, Arc::new(RwLock::new(rx))); + + (me, stable, participants_map, queue, account_id, tx) + } + + #[tokio::test] + async fn prioritizes_new_requests_over_old_retries() { + let (me, stable, participants, mut queue, account_id, tx) = setup_queue(); + + let old_id = SignId::new([5; 32]); + let mut old_indexed = make_indexed_request(old_id); + old_indexed.participants = Some(vec![me]); + let old_request = queue.organize_request(&stable, &participants, old_indexed, 0); + queue.requests.insert(old_id, old_request); + queue.push_failed(old_id); + + let new_id = SignId::new([9; 32]); + let mut new_indexed = make_indexed_request(new_id); + new_indexed.participants = Some(vec![me]); + tx.try_send(new_indexed).unwrap(); + + queue.organize(&stable, &participants, &account_id).await; + + let first = queue.take_mine().expect("new request present"); + assert_eq!(first.indexed.id, new_id); + let second = queue.take_mine().expect("old request present"); + assert_eq!(second.indexed.id, old_id); + assert!(queue.my_requests.is_empty()); + } +} diff --git a/chain-signatures/node/src/util.rs b/chain-signatures/node/src/util.rs index b236c430..91214b24 100644 --- a/chain-signatures/node/src/util.rs +++ b/chain-signatures/node/src/util.rs @@ -1,6 +1,5 @@ use mpc_crypto::{near_public_key_to_affine_point, PublicKey}; -use chrono::{DateTime, LocalResult, TimeZone, Utc}; use k256::elliptic_curve::sec1::{FromEncodedPoint, ToEncodedPoint}; use k256::{AffinePoint, EncodedPoint}; use tokio::task::{AbortHandle, JoinSet}; @@ -69,20 +68,6 @@ impl AffinePointExt for AffinePoint { } } -pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool { - if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) { - let timeout = Duration::from_millis(timeout); - let now_datetime: DateTime = Utc::now(); - // Calculate the difference in seconds - let elapsed_duration = now_datetime.signed_duration_since(msg_timestamp); - let timeout = chrono::Duration::seconds(timeout.as_secs() as i64) - + chrono::Duration::nanoseconds(timeout.subsec_nanos() as i64); - elapsed_duration > timeout - } else { - false - } -} - pub fn duration_between_unix(from_timestamp: u64, to_timestamp: u64) -> Duration { Duration::from_secs(to_timestamp - from_timestamp) } diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 4f4870dc..4c5f89d5 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -89,7 +89,7 @@ struct NodeMessagingBuilder { outbox: MessageOutbox, /// allows dropping specific messages sent by this node - filter: MessageFilter, + filter: Vec, } impl Default for MpcFixtureBuilder { @@ -318,8 +318,9 @@ impl MpcFixtureBuilder { } /// Specify a method that acts as message filter for all sent messages the given node. + /// if the filter returns true, the message is dropped, otherwise it is sent. pub fn with_outgoing_message_filter(mut self, node_idx: usize, filter: MessageFilter) -> Self { - self.prepared_nodes[node_idx].messaging.filter = filter; + self.prepared_nodes[node_idx].messaging.filter.push(filter); self } @@ -390,7 +391,7 @@ impl MpcFixtureNodeBuilder { inbox, outbox, channel, - filter: Box::new(|_| true), + filter: vec![Box::new(|_| false)], }; MpcFixtureNodeBuilder { @@ -440,7 +441,7 @@ impl MpcFixtureNodeBuilder { // We have to start the inbox job before calling // `MpcSignProtocol::new_test` or else subscribing to messages will // await the subscription response forever. - let _inbox_handle = tokio::spawn( + let inbox_handle = tokio::spawn( self.messaging .inbox .run(config_rx.clone(), context.contract_state.clone()), @@ -458,7 +459,7 @@ impl MpcFixtureNodeBuilder { let account_id = protocol.my_account_id().clone(); let node = protocol::Node::new(); let node_state = node.watch(); - let _protocol_handle = tokio::spawn(protocol.run( + let protocol_handle = tokio::spawn(protocol.run( node, MockGovernance { me: account_id.clone(), @@ -471,7 +472,7 @@ impl MpcFixtureNodeBuilder { // handle outbox messages manually, we want them before they are // encrypted and we want to send them directly to other node's inboxes - let _mock_network_handle = fixture_tasks::test_mock_network( + let mock_network_handle = fixture_tasks::test_mock_network( context.routing_table, shared_output, self.messaging.outbox, @@ -491,6 +492,7 @@ impl MpcFixtureNodeBuilder { triple_storage, presignature_storage, web_handle: None, + tasks: vec![inbox_handle, protocol_handle, mock_network_handle], }; node.start_web_interface(self.participant_info.account_id); diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 213807aa..4ccab3c1 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Sender; use tokio::sync::{watch, Mutex}; +use tokio::task::JoinHandle; pub struct MpcFixture { pub nodes: Vec, @@ -36,6 +37,7 @@ pub struct MpcFixtureNode { pub presignature_storage: PresignatureStorage, pub web_handle: Option>, + pub(crate) tasks: Vec>, } /// Logs for reading outputs after a test run for assertions and debugging. @@ -109,6 +111,18 @@ impl MpcFixtureNode { ); self.web_handle = Some(tokio::spawn(task)); } + + pub async fn shutdown(&mut self) { + let tasks = std::mem::take(&mut self.tasks); + + for handle in &tasks { + handle.abort(); + } + + for handle in tasks { + let _ = handle.await; + } + } } impl std::ops::Index for MpcFixture { @@ -119,6 +133,14 @@ impl std::ops::Index for MpcFixture { } } +impl Drop for MpcFixtureNode { + fn drop(&mut self) { + for handle in std::mem::take(&mut self.tasks) { + handle.abort(); + } + } +} + impl std::ops::IndexMut for MpcFixture { fn index_mut(&mut self, index: usize) -> &mut Self::Output { &mut self.nodes[index] diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index 91712f3b..8d3ebc1e 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -24,7 +24,7 @@ pub(super) fn test_mock_network( mut rpc_rx: Receiver, mesh: watch::Sender, config: watch::Sender, - mut filter: MessageFilter, + mut filters: Vec, ) -> JoinHandle<()> { let msg_log = Arc::clone(&shared_output.msg_log); let rpc_actions = Arc::clone(&shared_output.rpc_actions); @@ -49,8 +49,17 @@ pub(super) fn test_mock_network( }; msg_log.lock().await.push(format!("{log_msg} from {from:?} to {to:?}")); - if !filter(&send_message) { - tracing::info!("Dropping a message because it didn't pass the test's filter"); + // Default to forwarding the message unless a filter votes to drop it. + let mut should_filter = false; + for filter in filters.iter_mut() { + if filter(&send_message) { + tracing::info!(?from, ?to, log_msg, "Dropping a message because it didn't pass the test's filter"); + should_filter = true; + break; + } + } + + if should_filter { continue; } @@ -90,7 +99,7 @@ pub(super) fn test_mock_network( ) }, }; - tracing::error!(target: "mock_network", ?action_str, "Received RPC action"); + tracing::info!(target: "mock_network", ?action_str, "Received RPC action"); let mut actions_log = rpc_actions.lock().await; actions_log.insert(action_str); } diff --git a/integration-tests/src/mpc_fixture/message_filters.rs b/integration-tests/src/mpc_fixture/message_filters.rs new file mode 100644 index 00000000..c1c9379e --- /dev/null +++ b/integration-tests/src/mpc_fixture/message_filters.rs @@ -0,0 +1,82 @@ +use crate::mpc_fixture::fixture_tasks::MessageFilter; +use cait_sith::protocol::Participant; +use mpc_node::protocol; +use mpc_node::protocol::message::SendMessage; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; + +/// Helper to drop signature messages sent by a specific participant. +/// +/// Tests can install the returned [`MessageFilter`] on a node via +/// [`MpcFixtureBuilder::with_outgoing_message_filter`](crate::mpc_fixture::MpcFixtureBuilder::with_outgoing_message_filter) +/// and keep the [`SignatureDropper`] handle around to toggle the behaviour at +/// runtime. This keeps the test code expressive about which leader is being +/// suppressed during retries. +#[derive(Clone)] +pub struct SignatureDropper { + participant: Participant, + enabled: Arc, + dropped: Arc, +} + +impl SignatureDropper { + /// Create a dropper for `participant` and the corresponding message filter. + pub fn new(participant: Participant) -> (Self, MessageFilter) { + let enabled = Arc::new(AtomicBool::new(false)); + let dropped = Arc::new(AtomicUsize::new(0)); + let filter_participant = participant; + let filter_enabled = Arc::clone(&enabled); + let filter_dropped = Arc::clone(&dropped); + + let filter: MessageFilter = Box::new(move |send_message: &SendMessage| { + if !filter_enabled.load(Ordering::SeqCst) { + return false; + } + + let (message, (from, _, _)) = send_message; + if *from != filter_participant { + return false; + } + + if matches!(message, protocol::Message::Signature(_)) { + filter_dropped.fetch_add(1, Ordering::SeqCst); + true + } else { + false + } + }); + + ( + SignatureDropper { + participant, + enabled, + dropped, + }, + filter, + ) + } + + /// Enable dropping signature messages for the configured participant. + pub fn enable(&self) { + self.enabled.store(true, Ordering::SeqCst); + } + + /// Disable dropping signature messages for the configured participant. + pub fn disable(&self) { + self.enabled.store(false, Ordering::SeqCst); + } + + /// Returns the participant whose messages are affected by this dropper. + pub fn participant(&self) -> Participant { + self.participant + } + + /// Returns whether the dropper is currently active. + pub fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::SeqCst) + } + + pub fn dropped(&self) -> usize { + self.dropped.load(Ordering::SeqCst) + } +} diff --git a/integration-tests/src/mpc_fixture/mod.rs b/integration-tests/src/mpc_fixture/mod.rs index 16bedc65..60c03c39 100644 --- a/integration-tests/src/mpc_fixture/mod.rs +++ b/integration-tests/src/mpc_fixture/mod.rs @@ -6,7 +6,9 @@ pub mod builder; pub mod fixture_interface; pub mod fixture_tasks; pub mod input; +pub mod message_filters; pub mod mock_governance; pub use builder::MpcFixtureBuilder; pub use fixture_interface::{MpcFixture, MpcFixtureNode}; +pub use message_filters::SignatureDropper; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index a3c5db97..5896af8a 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -1,6 +1,7 @@ +use cait_sith::protocol::Participant; use deadpool_redis::redis::AsyncCommands; use integration_tests::mpc_fixture::fixture_tasks::MessageFilter; -use integration_tests::mpc_fixture::MpcFixtureBuilder; +use integration_tests::mpc_fixture::{MpcFixtureBuilder, SignatureDropper}; use mpc_node::protocol::presignature::Presignature; use mpc_node::protocol::triple::Triple; use mpc_node::protocol::SignRequestType; @@ -184,6 +185,195 @@ async fn test_basic_sign() { ); } +#[tokio::test(flavor = "multi_thread")] +async fn test_sign_request_retries_after_failure() { + const NUM_NODES: usize = 3; + + let mut droppers = Vec::with_capacity(NUM_NODES); + + let mut builder = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_signature_timeout_ms(1_000); + + for idx in 0..NUM_NODES { + let participant = Participant::from(idx as u32); + let (dropper, filter) = SignatureDropper::new(participant); + builder = builder.with_outgoing_message_filter(idx, filter); + droppers.push(dropper); + } + + let network = builder.build().await; + + for dropper in &droppers { + dropper.enable(); + } + + tokio::time::timeout( + Duration::from_millis(300), + network.wait_for_presignatures(2), + ) + .await + .expect("should start with enough presignatures"); + + let request = sign_request(1); + for node in &network.nodes { + node.sign_tx.send(request.clone()).await.unwrap(); + } + + let start = std::time::Instant::now(); + + // Wait until at least one signature attempt is dropped to prove a retry occurred. + tokio::time::timeout(Duration::from_secs(5), async { + loop { + let dropped = droppers + .iter() + .map(SignatureDropper::dropped) + .sum::(); + + if dropped > 0 { + break; + } + + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await + .expect("expected to drop at least one signature message while filters enabled"); + + if start.elapsed() < Duration::from_secs(2) { + tokio::time::sleep(Duration::from_secs(2) - start.elapsed()).await; + } + + for dropper in &droppers { + dropper.disable(); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + let actions = tokio::time::timeout(Duration::from_secs(20), network.wait_for_actions(1)) + .await + .expect("should publish RPC action eventually"); + + let dropped_messages: usize = droppers.iter().map(SignatureDropper::dropped).sum(); + assert!( + actions + .iter() + .any(|action| action.contains("RpcAction::Publish")), + "unexpected rpc actions: {actions:?}" + ); + assert!( + dropped_messages > 0, + "expected test filter to drop at least one signature message" + ); + assert!( + start.elapsed() >= Duration::from_secs(2), + "signature completed too quickly, expected a retry delay" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_sign_request_retries_multiple_times() { + const NUM_NODES: usize = 3; + const TARGET_ATTEMPTS: usize = 10; + + let mut signature_droppers = Vec::with_capacity(NUM_NODES); + + let mut builder = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_signature_timeout_ms(200); + + for idx in 0..NUM_NODES { + let participant = Participant::from(idx as u32); + let (dropper, filter) = SignatureDropper::new(participant); + builder = builder.with_outgoing_message_filter(idx, filter); + signature_droppers.push(dropper); + } + + let network = builder.build().await; + + tokio::time::timeout( + Duration::from_millis(300), + network.wait_for_presignatures(2), + ) + .await + .expect("should start with enough presignatures"); + + { + let mut log = network.output.msg_log.lock().await; + log.clear(); + } + + for dropper in &signature_droppers { + dropper.enable(); + } + + let request = sign_request(3); + for node in &network.nodes { + node.sign_tx.send(request.clone()).await.unwrap(); + } + + let required_messages = TARGET_ATTEMPTS * (network.nodes.len() - 1); + + tokio::time::timeout(Duration::from_secs(10), async { + loop { + let signature_messages = { + let log = network.output.msg_log.lock().await; + log.iter() + .filter(|entry| entry.starts_with("Signature")) + .count() + }; + + if signature_messages >= required_messages { + break signature_messages; + } + + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await + .expect("expected at least ten signature retries before timeout"); + + for dropper in &signature_droppers { + dropper.disable(); + } + + tokio::time::sleep(Duration::from_millis(50)).await; + + let dropped_messages: usize = signature_droppers + .iter() + .map(SignatureDropper::dropped) + .sum(); + + assert!( + dropped_messages > 0, + "expected to drop at least one signature message before retries succeeded" + ); + + let actions = tokio::time::timeout(Duration::from_secs(10), network.wait_for_actions(1)) + .await + .expect("signature should eventually publish after retries"); + + assert!( + actions + .iter() + .any(|action| action.contains("RpcAction::Publish")), + "unexpected rpc actions: {actions:?}" + ); + + let signature_messages = { + let log = network.output.msg_log.lock().await; + log.iter() + .filter(|entry| entry.starts_with("Signature")) + .count() + }; + + let attempts = signature_messages / (network.nodes.len() - 1); + assert!( + attempts >= TARGET_ATTEMPTS, + "expected at least {TARGET_ATTEMPTS} retries, observed {attempts}" + ); +} + fn sign_request(seed: u8) -> IndexedSignRequest { IndexedSignRequest { id: SignId::new([seed; 32]), @@ -216,15 +406,14 @@ async fn test_presignature_timeout() { fn create_filter() -> MessageFilter { let mut drop_counter = 20; Box::new(move |(msg, _)| { - let pass = match msg { - mpc_node::protocol::Message::Presignature(_) => drop_counter == 0, - _ => true, - }; + let should_drop = + matches!(msg, mpc_node::protocol::Message::Presignature(_)) && drop_counter > 0; - if !pass { + if should_drop { drop_counter -= 1; } - pass + + should_drop }) }