From c1c32575979d45b0f3e6ad7ae2a04337e39c8ca9 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 23 Feb 2026 18:01:09 +0200 Subject: [PATCH 1/3] Remove stable, incapsulate mesh state, fix bugs --- .../node/src/backlog/selection.rs | 6 +- chain-signatures/node/src/mesh/connection.rs | 2 +- chain-signatures/node/src/mesh/mod.rs | 101 +++++------------- chain-signatures/node/src/mesh/state.rs | 76 +++++++++++++ .../node/src/protocol/cryptography.rs | 4 +- .../node/src/protocol/presignature.rs | 4 +- .../node/src/protocol/signature.rs | 55 +++++----- .../node/src/protocol/sync/mod.rs | 4 +- chain-signatures/node/src/protocol/triple.rs | 4 +- chain-signatures/node/src/stream/ops.rs | 6 +- integration-tests/src/mpc_fixture/builder.rs | 11 +- .../src/mpc_fixture/fixture_tasks.rs | 2 +- 12 files changed, 154 insertions(+), 121 deletions(-) create mode 100644 chain-signatures/node/src/mesh/state.rs diff --git a/chain-signatures/node/src/backlog/selection.rs b/chain-signatures/node/src/backlog/selection.rs index b82feb526..47f0a8ce1 100644 --- a/chain-signatures/node/src/backlog/selection.rs +++ b/chain-signatures/node/src/backlog/selection.rs @@ -36,13 +36,13 @@ pub async fn select_checkpoints( tracing::warn!("threshold must be greater than 0"); return HashMap::new(); } - if mesh_state.active.participants.is_empty() { + if mesh_state.active().participants.is_empty() { tracing::warn!("no active participants available for checkpoint recovery"); return HashMap::new(); } tracing::info!( - participant_count = mesh_state.active.participants.len(), + participant_count = mesh_state.active().participants.len(), ?chains, threshold, "starting checkpoint selection recovery" @@ -84,7 +84,7 @@ async fn fetch_latest( let mut all_checkpoints = HashMap::new(); let mut tasks = JoinSet::new(); - for (participant_id, info) in &mesh_state.active.participants { + for (participant_id, info) in &mesh_state.active().participants { let client = node_client.clone(); let participant = *participant_id; let node_url = info.url.clone(); diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 9ed61ed54..db7180423 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -29,7 +29,7 @@ pub enum NodeStatus { /// each node only tracks it one directional. /// /// Example: Node A only cares about IDs it owns. Hence, a peer node B is - /// considered stable after A sent SyncUpdate and B responded with a + /// considered active after A sent SyncUpdate and B responded with a /// SyncView. This is all node A needs to know to make decisions about /// protocols it initiates. /// diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index c890f6dde..c2ff64eae 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeSet; use std::time::Duration; use crate::mesh::connection::NodeStatus; @@ -12,6 +11,8 @@ use near_account_id::AccountId; use tokio::sync::{mpsc, watch}; pub mod connection; +mod state; +pub use state::MeshState; #[derive(Debug, Clone, clap::Parser)] #[group(id = "mesh_options")] @@ -31,39 +32,6 @@ impl Options { } } -#[derive(Clone, Debug, Default, PartialEq)] -pub struct MeshState { - /// Participants that are active in the network; as in they respond when pinged. - pub active: Participants, - - /// Participants that are currently out-of-sync, they will become active - /// once we finished synchronization. - pub need_sync: Participants, - - /// Participants that can be selected for a new protocol invocation. - pub stable: BTreeSet, -} - -impl MeshState { - pub fn update(&mut self, participant: Participant, status: NodeStatus, info: ParticipantInfo) { - match status { - NodeStatus::Active => { - self.active.insert(&participant, info); - self.need_sync.remove(&participant); - self.stable.insert(participant); - } - NodeStatus::Syncing => { - self.need_sync.insert(&participant, info); - } - NodeStatus::Inactive | NodeStatus::Offline => { - self.active.remove(&participant); - self.need_sync.remove(&participant); - self.stable.remove(&participant); - } - } - } -} - /// Set of connections to participants in the network. Each participant is pinged at regular /// intervals to check their aliveness. The connections can be dropped and reconnected at any time. pub struct Mesh { @@ -132,18 +100,14 @@ impl Mesh { // if the previous me is different from the current me, remove the // previous me from the MeshState. if let Some(previous_me) = previous_me.filter(|old| *old != participant) { - state.active.remove(&previous_me); - state.need_sync.remove(&previous_me); - state.stable.remove(&previous_me); + state.remove(previous_me); } state.update(participant, new_status, info); }); } else { tracing::warn!(?previous_me, ?contract, "we are no longer part of the MPC network"); self.state_tx.send_modify(|state| { - state.active.clear(); - state.need_sync.clear(); - state.stable.clear(); + state.clear(); }); } } @@ -180,7 +144,7 @@ impl Mesh { pub async fn wait_threshold_active(mesh_state: &mut watch::Receiver, threshold: usize) { loop { - if mesh_state.borrow().active.len() >= threshold { + if mesh_state.borrow().active().len() >= threshold { return; } let _ = mesh_state.changed().await; @@ -303,8 +267,7 @@ mod tests { { tokio::time::sleep(PING_INTERVAL * 3).await; let state = mesh_state.borrow(); - assert!(state.active.contains_key(&me)); - assert!(state.stable.contains(&me)); + assert!(state.active().contains_key(&me)); drop(state); for idx in 0..num_nodes { @@ -313,13 +276,13 @@ mod tests { tokio::time::sleep(PING_INTERVAL * 3).await; let state = mesh_state.borrow(); - assert_eq!(state.active.len(), num_nodes); - assert_eq!(state.active, expected_participants); - assert!(state.need_sync.is_empty()); + assert_eq!(state.active().len(), num_nodes); + assert_eq!(state.active(), &expected_participants); + assert!(state.need_sync().is_empty()); for idx in 0..num_nodes { - assert!(state.active.contains_key(&servers[idx].id())); + assert!(state.active().contains_key(&servers[idx].id())); } - assert!(state.active.contains_key(&me)); + assert!(state.active().contains_key(&me)); } // check that the mesh state is updated when a participant goes offline @@ -328,11 +291,11 @@ mod tests { tokio::time::sleep(PING_INTERVAL * 3).await; let state = mesh_state.borrow(); - assert_eq!(state.active.len(), num_nodes - 1); - assert!(state.active.contains_key(&me)); - assert!(state.active.contains_key(&servers[0].id())); - assert!(!state.active.contains_key(&servers[1].id())); - assert!(state.active.contains_key(&servers[2].id())); + assert_eq!(state.active().len(), num_nodes - 1); + assert!(state.active().contains_key(&me)); + assert!(state.active().contains_key(&servers[0].id())); + assert!(!state.active().contains_key(&servers[1].id())); + assert!(state.active().contains_key(&servers[2].id())); } // check that the mesh state is updated when a participant goes back online. @@ -341,19 +304,17 @@ mod tests { tokio::time::sleep(PING_INTERVAL * 3).await; let state = mesh_state.borrow_and_update().clone(); - assert_eq!(state.active.len(), num_nodes - 1); + assert_eq!(state.active().len(), num_nodes - 1); sync_tx.send(servers[1].id()).await.unwrap(); tokio::time::sleep(PING_INTERVAL).await; let state = mesh_state.borrow_and_update().clone(); - assert_eq!(state.active.len(), num_nodes); - assert!(state.need_sync.is_empty()); + assert_eq!(state.active().len(), num_nodes); + assert!(state.need_sync().is_empty()); for idx in 0..num_nodes { - assert!(state.active.contains_key(&servers[idx].id())); - assert!(state.stable.contains(&servers[idx].id())); + assert!(state.active().contains_key(&servers[idx].id())); } - assert!(state.active.contains_key(&me)); - assert!(state.stable.contains(&me)); + assert!(state.active().contains_key(&me)); } mesh_task.abort(); @@ -402,8 +363,6 @@ mod tests { // Wait for the mesh to process the contract update and connect the new participant let expected_participants = servers.participants(); - let expected_stable: BTreeSet<_> = expected_participants.keys().copied().collect(); - tokio::time::sleep(PING_INTERVAL * 3).await; for i in 0..num_nodes { sync_tx.send(servers[i].id()).await.unwrap(); @@ -412,16 +371,16 @@ mod tests { tokio::time::sleep(PING_INTERVAL * 3).await; let state = mesh_state.borrow(); - assert!(state.active.len() == num_nodes); - assert!(state.need_sync.is_empty()); + assert!(state.active().len() == num_nodes); + assert!(state.need_sync().is_empty()); for i in 0..num_nodes { assert!( - state.active.contains_key(&servers[i].id()), + state.active().contains_key(&servers[i].id()), "missing {:?}", servers[i].id(), ); } - assert_eq!(state.stable, expected_stable); + assert_eq!(state.active(), &expected_participants); } // check on node deletion with contract change. @@ -438,21 +397,19 @@ mod tests { // Wait for the mesh to process the contract update and remove the participant let expected_participants = servers.participants(); - let expected_stable: BTreeSet<_> = expected_participants.keys().copied().collect(); - tokio::time::sleep(PING_INTERVAL * 3).await; let state = mesh_state.borrow(); - assert!(state.need_sync.is_empty()); - assert!(state.active.len() == num_nodes); + assert!(state.need_sync().is_empty()); + assert!(state.active().len() == num_nodes); for i in 0..num_nodes { assert!( - state.active.contains_key(&servers[i].id()), + state.active().contains_key(&servers[i].id()), "missing {:?}", servers[i].id(), ); } - assert_eq!(state.stable, expected_stable); + assert_eq!(state.active(), &expected_participants); } mesh_task.abort(); diff --git a/chain-signatures/node/src/mesh/state.rs b/chain-signatures/node/src/mesh/state.rs new file mode 100644 index 000000000..02b6c07c3 --- /dev/null +++ b/chain-signatures/node/src/mesh/state.rs @@ -0,0 +1,76 @@ +use cait_sith::protocol::Participant; + +use crate::mesh::connection::NodeStatus; +use crate::protocol::contract::primitives::Participants; +use crate::protocol::ParticipantInfo; + +#[derive(Clone, Debug, Default, PartialEq)] +pub struct MeshState { + /// Participants that are active in the network; synced and responsive to pings. + active: Participants, + + /// Participants that are currently out-of-sync, they will become active + /// once we finished synchronization. + need_sync: Participants, +} + +impl MeshState { + pub fn active(&self) -> &Participants { + &self.active + } + + pub fn need_sync(&self) -> &Participants { + &self.need_sync + } + + pub fn update(&mut self, participant: Participant, status: NodeStatus, info: ParticipantInfo) { + match status { + NodeStatus::Active => { + self.active.insert(&participant, info); + self.need_sync.remove(&participant); + } + NodeStatus::Syncing => { + self.active.remove(&participant); + self.need_sync.insert(&participant, info); + } + NodeStatus::Inactive | NodeStatus::Offline => { + self.active.remove(&participant); + self.need_sync.remove(&participant); + } + } + } + + pub fn remove(&mut self, participant: Participant) { + self.active.remove(&participant); + self.need_sync.remove(&participant); + } + + pub fn clear(&mut self) { + self.active.clear(); + self.need_sync.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn syncing_moves_participant_out_of_active_until_reactivated() { + let participant = Participant::from(7u32); + let info = ParticipantInfo::new(7); + let mut state = MeshState::default(); + + state.update(participant, NodeStatus::Active, info.clone()); + assert!(state.active().contains_key(&participant)); + assert!(!state.need_sync().contains_key(&participant)); + + state.update(participant, NodeStatus::Syncing, info.clone()); + assert!(!state.active().contains_key(&participant)); + assert!(state.need_sync().contains_key(&participant)); + + state.update(participant, NodeStatus::Active, info); + assert!(state.active().contains_key(&participant)); + assert!(!state.need_sync().contains_key(&participant)); + } +} diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index 874992d7e..3ea793f6c 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -51,7 +51,7 @@ impl CryptographicProtocol for GeneratingState { let participants = self.participants.keys_vec(); tracing::info!( ?participants, - active = ?mesh_state.active, + active = ?mesh_state.active(), "generating: progressing key generation", ); loop { @@ -174,7 +174,7 @@ impl CryptographicProtocol for WaitingForConsensusState { impl CryptographicProtocol for ResharingState { async fn progress(mut self, ctx: &mut MpcSignProtocol, mesh_state: MeshState) -> NodeState { - tracing::info!(active = ?mesh_state.active.keys_vec(), "progressing key reshare"); + tracing::info!(active = ?mesh_state.active().keys_vec(), "progressing key reshare"); let mut resharing = match self.phase { ResharingPhase::Resharing(resharing) => resharing, diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 5c884ccdc..0f3786699 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -687,7 +687,7 @@ impl PresignatureSpawner { let mut posits = self.msg.subscribe_presignature_posit().await; let mut protocol = cfg.borrow().protocol.clone(); - let mut active = mesh_state.borrow().active.keys_vec(); + let mut active = mesh_state.borrow().active().keys_vec(); loop { tokio::select! { @@ -740,7 +740,7 @@ impl PresignatureSpawner { protocol = cfg.borrow().protocol.clone(); } Ok(()) = mesh_state.changed() => { - active = mesh_state.borrow().active.keys_vec(); + active = mesh_state.borrow().active().keys_vec(); } } } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 29d7d6b69..2bccc14c5 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -141,7 +141,7 @@ impl SignState { struct SignPositor { proposer: Participant, - stable: BTreeSet, + active: BTreeSet, presignature_id: PresignatureId, presignature: Option, } @@ -188,8 +188,8 @@ impl SignOrganizer { participants[index % participants.len()] } - /// Waits for threshold stable participants to be present. - async fn wait_stable( + /// Waits for threshold active participants to be present. + async fn wait_active( &self, ctx: &SignTask, state: &mut SignState, @@ -199,20 +199,21 @@ impl SignOrganizer { let mut once = true; loop { - let stable_count = { - let stable = &state.mesh_state.borrow().stable; - if stable.len() >= threshold { - return Some(stable.clone()); + let active_count = { + let active: BTreeSet<_> = + state.mesh_state.borrow().active().keys().copied().collect(); + if active.len() >= threshold { + return Some(active); } - stable.len() + active.len() }; if once { tracing::info!( ?sign_id, - stable_count, + active_count, ?threshold, - "waiting for enough stable participants" + "waiting for enough active participants" ); once = false; } @@ -231,9 +232,9 @@ impl SignOrganizer { let participants = ctx.participants.iter().copied().collect::>(); tracing::info!(?sign_id, round = ?state.round, "entering organizing phase"); - let (stable, proposer) = { - let Some(stable) = self.wait_stable(ctx, state, threshold).await else { - tracing::warn!(?sign_id, round = ?state.round, "no stable participants, reorganizing"); + let (active, proposer) = { + let Some(active) = self.wait_active(ctx, state, threshold).await else { + tracing::warn!(?sign_id, round = ?state.round, "no active participants, reorganizing"); state.bump_round(); return SignPhase::Organizing(self); }; @@ -241,11 +242,11 @@ impl SignOrganizer { let max_rounds = state.round + ROUND_INTERVAL; let (selected_round, proposer) = (state.round..max_rounds) .map(|r| (r, Self::proposer_per_round(r, &participants, &entropy))) - .find(|(_, potential_proposer)| stable.contains(potential_proposer)) + .find(|(_, potential_proposer)| active.contains(potential_proposer)) .unwrap_or_else(|| { ( max_rounds, - *stable + *active .iter() .choose(&mut StdRng::from_seed(entropy)) .unwrap(), @@ -261,23 +262,23 @@ impl SignOrganizer { ?proposer, ?me, is_mine, - stable_count = stable.len(), + active_count = active.len(), "organized: selected proposer" ); - (stable, proposer) + (active, proposer) }; let is_proposer = proposer == ctx.me; - let (presignature_id, presignature, stable) = if is_proposer { + let (presignature_id, presignature, active) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); - let stable = stable.iter().copied().collect::>(); + let active = active.iter().copied().collect::>(); let mut recycle = Vec::new(); let remaining = state.budget.remaining(); let fetch = tokio::time::timeout(remaining, async { loop { if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { - let participants = intersect_vec(&[&taken.artifact.participants, &stable]); + let participants = intersect_vec(&[&taken.artifact.participants, &active]); if participants.len() < ctx.threshold { recycle.push(taken); continue; @@ -332,16 +333,16 @@ impl SignOrganizer { .await; } - // Update stable to only include participants that are in both the presignature and stable set - let stable = participants.into_iter().collect::>(); - (presignature_id, Some(taken), stable) + // Update active to only include participants that are in both the presignature and active set + let active = participants.into_iter().collect::>(); + (presignature_id, Some(taken), active) } else { - (PresignatureId::default(), None, stable) + (PresignatureId::default(), None, active) }; SignPhase::Posit(SignPositor { proposer, - stable, + active, presignature_id, presignature, }) @@ -519,7 +520,7 @@ impl SignPositor { ) -> SignPhase { let SignPositor { proposer, - stable, + active, mut presignature_id, presignature, } = self; @@ -552,7 +553,7 @@ impl SignPositor { } // GUARANTEE: at least threshold participants from organizing phase. - let posit_participants = stable.iter().copied().collect::>(); + let posit_participants = active.iter().copied().collect::>(); let mut counter = SinglePositCounter::new(ctx.me, &posit_participants); let remaining = state.budget.remaining(); diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index b503bc24e..a8322bc9b 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -183,7 +183,7 @@ impl SyncTask { continue; } - let need_sync = &self.mesh_state.borrow().need_sync.clone(); + let need_sync = self.mesh_state.borrow().need_sync().clone(); if need_sync.is_empty() { continue; } @@ -211,7 +211,7 @@ impl SyncTask { } let update = self.new_update(me).await; - let active = self.mesh_state.borrow().active.clone(); + let active = self.mesh_state.borrow().active().clone(); let start = Instant::now(); let task = tokio::spawn(broadcast_sync( diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 452e9bbeb..167fca485 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -560,7 +560,7 @@ impl TripleSpawner { let mut expiration_interval = tokio::time::interval(Duration::from_secs(60)); let mut posits = self.msg.subscribe_triple_posit().await; - let mut active = mesh_state.borrow().active.keys_vec(); + let mut active = mesh_state.borrow().active().keys_vec(); let mut protocol = cfg.borrow().protocol.clone(); loop { @@ -611,7 +611,7 @@ impl TripleSpawner { protocol = cfg.borrow().protocol.clone(); } Ok(()) = mesh_state.changed() => { - active = mesh_state.borrow().active.keys_vec(); + active = mesh_state.borrow().active().keys_vec(); } } } diff --git a/chain-signatures/node/src/stream/ops.rs b/chain-signatures/node/src/stream/ops.rs index d40707f8c..05918ab74 100644 --- a/chain-signatures/node/src/stream/ops.rs +++ b/chain-signatures/node/src/stream/ops.rs @@ -620,6 +620,7 @@ pub(crate) fn sender_string(sender: [u8; 32], source_chain: Chain) -> anyhow::Re mod tests { use super::*; use crate::backlog::Backlog; + use crate::mesh::connection::NodeStatus; use crate::mesh::wait_threshold_active; use crate::node_client::NodeClient; use crate::protocol::contract::primitives::{ParticipantInfo, Participants}; @@ -686,10 +687,7 @@ mod tests { let threshold = 1; let mut mesh_state = MeshState::default(); let participant = Participant::from(0u32); - mesh_state - .active - .insert(&participant, ParticipantInfo::new(0)); - mesh_state.stable.insert(participant); + mesh_state.update(participant, NodeStatus::Active, ParticipantInfo::new(0)); let (_mesh_tx, mut mesh_rx) = watch::channel(mesh_state); wait_threshold_active(&mut mesh_rx, threshold).await; diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 07bbd7135..1517559af 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -16,6 +16,7 @@ use mpc_contract::primitives::{ use mpc_keys::hpke::{self, Ciphered}; use mpc_node::backlog::Backlog; use mpc_node::config::{Config, LocalConfig, NetworkConfig}; +use mpc_node::mesh::connection::NodeStatus; use mpc_node::mesh::MeshState; use mpc_node::protocol::contract::primitives::{Candidates, Participants, PkVotes, Votes}; use mpc_node::protocol::contract::{InitializingContractState, RunningContractState}; @@ -235,12 +236,12 @@ impl MpcFixtureBuilder { } fn build_mesh_state(&self) -> MeshState { - // mark all participant as already active and stable when the network starts - MeshState { - active: self.participants.clone(), - need_sync: Default::default(), - stable: self.participants.keys_vec().into_iter().collect(), + // mark all participants as already active when the network starts + let mut mesh_state = MeshState::default(); + for (participant, info) in self.participants.iter() { + mesh_state.update(*participant, NodeStatus::Active, info.clone()); } + mesh_state } pub fn with_preshared_key(mut self) -> Self { diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index 29c9224bb..c740897e4 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -42,7 +42,7 @@ pub(super) fn test_mock_network( // directly send out single message, no batching // (might want to add MessageOutbox, too, but for now this is easier) let config = config.borrow().clone(); - let participants = mesh.borrow().active.clone(); + let participants = mesh.borrow().active().clone(); let (msg, (from, to, _ts)) = &send_message; let receiver_info = participants.get(to).expect("TODO: support sending to non-active participants in tests"); match SignedMessage::encrypt( From 15443b87f37650dc3dc18fb9e3ebb558f8673834 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 23 Feb 2026 18:16:59 +0200 Subject: [PATCH 2/3] extend test_mesh_update test --- chain-signatures/node/src/mesh/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index c2ff64eae..7904e5425 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -303,8 +303,12 @@ mod tests { servers[1].make_online().await; tokio::time::sleep(PING_INTERVAL * 3).await; + // Node is now syncing: should be in need_sync but not in active yet. let state = mesh_state.borrow_and_update().clone(); assert_eq!(state.active().len(), num_nodes - 1); + assert!(!state.active().contains_key(&servers[1].id())); + assert!(state.need_sync().contains_key(&servers[1].id())); + sync_tx.send(servers[1].id()).await.unwrap(); tokio::time::sleep(PING_INTERVAL).await; From 198a58b36951f0e159fce15e5bf0bd9d59a5a5f6 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 23 Feb 2026 18:57:47 +0200 Subject: [PATCH 3/3] fmt --- chain-signatures/node/src/mesh/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 7904e5425..b9a1d3283 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -308,7 +308,7 @@ mod tests { assert_eq!(state.active().len(), num_nodes - 1); assert!(!state.active().contains_key(&servers[1].id())); assert!(state.need_sync().contains_key(&servers[1].id())); - + sync_tx.send(servers[1].id()).await.unwrap(); tokio::time::sleep(PING_INTERVAL).await;