Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions chain-signatures/node/src/backlog/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ pub async fn select_checkpoints(
tracing::warn!("threshold must be greater than 0");
return HashMap::new();
}
if mesh_state.active.participants.is_empty() {
if mesh_state.active().participants.is_empty() {
tracing::warn!("no active participants available for checkpoint recovery");
return HashMap::new();
}

tracing::info!(
participant_count = mesh_state.active.participants.len(),
participant_count = mesh_state.active().participants.len(),
?chains,
threshold,
"starting checkpoint selection recovery"
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn fetch_latest(
let mut all_checkpoints = HashMap::new();

let mut tasks = JoinSet::new();
for (participant_id, info) in &mesh_state.active.participants {
for (participant_id, info) in &mesh_state.active().participants {
let client = node_client.clone();
let participant = *participant_id;
let node_url = info.url.clone();
Expand Down
2 changes: 1 addition & 1 deletion chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum NodeStatus {
/// each node only tracks it one directional.
///
/// Example: Node A only cares about IDs it owns. Hence, a peer node B is
/// considered stable after A sent SyncUpdate and B responded with a
/// considered active after A sent SyncUpdate and B responded with a
/// SyncView. This is all node A needs to know to make decisions about
/// protocols it initiates.
///
Expand Down
105 changes: 33 additions & 72 deletions chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::BTreeSet;
use std::time::Duration;

use crate::mesh::connection::NodeStatus;
Expand All @@ -12,6 +11,8 @@ use near_account_id::AccountId;
use tokio::sync::{mpsc, watch};

pub mod connection;
mod state;
pub use state::MeshState;

#[derive(Debug, Clone, clap::Parser)]
#[group(id = "mesh_options")]
Expand All @@ -31,39 +32,6 @@ impl Options {
}
}

#[derive(Clone, Debug, Default, PartialEq)]
pub struct MeshState {
/// Participants that are active in the network; as in they respond when pinged.
pub active: Participants,

/// Participants that are currently out-of-sync, they will become active
/// once we finished synchronization.
pub need_sync: Participants,

/// Participants that can be selected for a new protocol invocation.
pub stable: BTreeSet<Participant>,
}

impl MeshState {
pub fn update(&mut self, participant: Participant, status: NodeStatus, info: ParticipantInfo) {
match status {
NodeStatus::Active => {
self.active.insert(&participant, info);
self.need_sync.remove(&participant);
self.stable.insert(participant);
}
NodeStatus::Syncing => {
self.need_sync.insert(&participant, info);
}
NodeStatus::Inactive | NodeStatus::Offline => {
self.active.remove(&participant);
self.need_sync.remove(&participant);
self.stable.remove(&participant);
}
}
}
}

/// Set of connections to participants in the network. Each participant is pinged at regular
/// intervals to check their aliveness. The connections can be dropped and reconnected at any time.
pub struct Mesh {
Expand Down Expand Up @@ -132,18 +100,14 @@ impl Mesh {
// if the previous me is different from the current me, remove the
// previous me from the MeshState.
if let Some(previous_me) = previous_me.filter(|old| *old != participant) {
state.active.remove(&previous_me);
state.need_sync.remove(&previous_me);
state.stable.remove(&previous_me);
state.remove(previous_me);
}
state.update(participant, new_status, info);
});
} else {
tracing::warn!(?previous_me, ?contract, "we are no longer part of the MPC network");
self.state_tx.send_modify(|state| {
state.active.clear();
state.need_sync.clear();
state.stable.clear();
state.clear();
});
}
}
Expand Down Expand Up @@ -180,7 +144,7 @@ impl Mesh {

pub async fn wait_threshold_active(mesh_state: &mut watch::Receiver<MeshState>, threshold: usize) {
loop {
if mesh_state.borrow().active.len() >= threshold {
if mesh_state.borrow().active().len() >= threshold {
return;
}
let _ = mesh_state.changed().await;
Expand Down Expand Up @@ -303,8 +267,7 @@ mod tests {
{
tokio::time::sleep(PING_INTERVAL * 3).await;
let state = mesh_state.borrow();
assert!(state.active.contains_key(&me));
assert!(state.stable.contains(&me));
assert!(state.active().contains_key(&me));
drop(state);

for idx in 0..num_nodes {
Expand All @@ -313,13 +276,13 @@ mod tests {
tokio::time::sleep(PING_INTERVAL * 3).await;

let state = mesh_state.borrow();
assert_eq!(state.active.len(), num_nodes);
assert_eq!(state.active, expected_participants);
assert!(state.need_sync.is_empty());
assert_eq!(state.active().len(), num_nodes);
assert_eq!(state.active(), &expected_participants);
assert!(state.need_sync().is_empty());
for idx in 0..num_nodes {
assert!(state.active.contains_key(&servers[idx].id()));
assert!(state.active().contains_key(&servers[idx].id()));
}
assert!(state.active.contains_key(&me));
assert!(state.active().contains_key(&me));
}

// check that the mesh state is updated when a participant goes offline
Expand All @@ -328,32 +291,34 @@ mod tests {
tokio::time::sleep(PING_INTERVAL * 3).await;

let state = mesh_state.borrow();
assert_eq!(state.active.len(), num_nodes - 1);
assert!(state.active.contains_key(&me));
assert!(state.active.contains_key(&servers[0].id()));
assert!(!state.active.contains_key(&servers[1].id()));
assert!(state.active.contains_key(&servers[2].id()));
assert_eq!(state.active().len(), num_nodes - 1);
assert!(state.active().contains_key(&me));
assert!(state.active().contains_key(&servers[0].id()));
assert!(!state.active().contains_key(&servers[1].id()));
assert!(state.active().contains_key(&servers[2].id()));
}

// check that the mesh state is updated when a participant goes back online.
{
servers[1].make_online().await;
tokio::time::sleep(PING_INTERVAL * 3).await;

// Node is now syncing: should be in need_sync but not in active yet.
let state = mesh_state.borrow_and_update().clone();
assert_eq!(state.active.len(), num_nodes - 1);
assert_eq!(state.active().len(), num_nodes - 1);
assert!(!state.active().contains_key(&servers[1].id()));
assert!(state.need_sync().contains_key(&servers[1].id()));

sync_tx.send(servers[1].id()).await.unwrap();
tokio::time::sleep(PING_INTERVAL).await;

let state = mesh_state.borrow_and_update().clone();
assert_eq!(state.active.len(), num_nodes);
assert!(state.need_sync.is_empty());
assert_eq!(state.active().len(), num_nodes);
assert!(state.need_sync().is_empty());
for idx in 0..num_nodes {
assert!(state.active.contains_key(&servers[idx].id()));
assert!(state.stable.contains(&servers[idx].id()));
assert!(state.active().contains_key(&servers[idx].id()));
}
assert!(state.active.contains_key(&me));
assert!(state.stable.contains(&me));
assert!(state.active().contains_key(&me));
}

mesh_task.abort();
Expand Down Expand Up @@ -402,8 +367,6 @@ mod tests {

// Wait for the mesh to process the contract update and connect the new participant
let expected_participants = servers.participants();
let expected_stable: BTreeSet<_> = expected_participants.keys().copied().collect();

tokio::time::sleep(PING_INTERVAL * 3).await;
for i in 0..num_nodes {
sync_tx.send(servers[i].id()).await.unwrap();
Expand All @@ -412,16 +375,16 @@ mod tests {
tokio::time::sleep(PING_INTERVAL * 3).await;
let state = mesh_state.borrow();

assert!(state.active.len() == num_nodes);
assert!(state.need_sync.is_empty());
assert!(state.active().len() == num_nodes);
assert!(state.need_sync().is_empty());
for i in 0..num_nodes {
assert!(
state.active.contains_key(&servers[i].id()),
state.active().contains_key(&servers[i].id()),
"missing {:?}",
servers[i].id(),
);
}
assert_eq!(state.stable, expected_stable);
assert_eq!(state.active(), &expected_participants);
}

// check on node deletion with contract change.
Expand All @@ -438,21 +401,19 @@ mod tests {

// Wait for the mesh to process the contract update and remove the participant
let expected_participants = servers.participants();
let expected_stable: BTreeSet<_> = expected_participants.keys().copied().collect();

tokio::time::sleep(PING_INTERVAL * 3).await;
let state = mesh_state.borrow();

assert!(state.need_sync.is_empty());
assert!(state.active.len() == num_nodes);
assert!(state.need_sync().is_empty());
assert!(state.active().len() == num_nodes);
for i in 0..num_nodes {
assert!(
state.active.contains_key(&servers[i].id()),
state.active().contains_key(&servers[i].id()),
"missing {:?}",
servers[i].id(),
);
}
assert_eq!(state.stable, expected_stable);
assert_eq!(state.active(), &expected_participants);
}

mesh_task.abort();
Expand Down
76 changes: 76 additions & 0 deletions chain-signatures/node/src/mesh/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use cait_sith::protocol::Participant;

use crate::mesh::connection::NodeStatus;
use crate::protocol::contract::primitives::Participants;
use crate::protocol::ParticipantInfo;

#[derive(Clone, Debug, Default, PartialEq)]
pub struct MeshState {
/// Participants that are active in the network; synced and responsive to pings.
active: Participants,

/// Participants that are currently out-of-sync, they will become active
/// once we finished synchronization.
need_sync: Participants,
}

impl MeshState {
pub fn active(&self) -> &Participants {
&self.active
}

pub fn need_sync(&self) -> &Participants {
&self.need_sync
}

pub fn update(&mut self, participant: Participant, status: NodeStatus, info: ParticipantInfo) {
match status {
NodeStatus::Active => {
self.active.insert(&participant, info);
self.need_sync.remove(&participant);
}
NodeStatus::Syncing => {
self.active.remove(&participant);
self.need_sync.insert(&participant, info);
}
NodeStatus::Inactive | NodeStatus::Offline => {
self.active.remove(&participant);
self.need_sync.remove(&participant);
}
}
}

pub fn remove(&mut self, participant: Participant) {
self.active.remove(&participant);
self.need_sync.remove(&participant);
}

pub fn clear(&mut self) {
self.active.clear();
self.need_sync.clear();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn syncing_moves_participant_out_of_active_until_reactivated() {
let participant = Participant::from(7u32);
let info = ParticipantInfo::new(7);
let mut state = MeshState::default();

state.update(participant, NodeStatus::Active, info.clone());
assert!(state.active().contains_key(&participant));
assert!(!state.need_sync().contains_key(&participant));

state.update(participant, NodeStatus::Syncing, info.clone());
assert!(!state.active().contains_key(&participant));
assert!(state.need_sync().contains_key(&participant));

state.update(participant, NodeStatus::Active, info);
assert!(state.active().contains_key(&participant));
assert!(!state.need_sync().contains_key(&participant));
}
}
4 changes: 2 additions & 2 deletions chain-signatures/node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl CryptographicProtocol for GeneratingState {
let participants = self.participants.keys_vec();
tracing::info!(
?participants,
active = ?mesh_state.active,
active = ?mesh_state.active(),
"generating: progressing key generation",
);
loop {
Expand Down Expand Up @@ -174,7 +174,7 @@ impl CryptographicProtocol for WaitingForConsensusState {

impl CryptographicProtocol for ResharingState {
async fn progress(mut self, ctx: &mut MpcSignProtocol, mesh_state: MeshState) -> NodeState {
tracing::info!(active = ?mesh_state.active.keys_vec(), "progressing key reshare");
tracing::info!(active = ?mesh_state.active().keys_vec(), "progressing key reshare");

let mut resharing = match self.phase {
ResharingPhase::Resharing(resharing) => resharing,
Expand Down
4 changes: 2 additions & 2 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ impl PresignatureSpawner {
let mut posits = self.msg.subscribe_presignature_posit().await;

let mut protocol = cfg.borrow().protocol.clone();
let mut active = mesh_state.borrow().active.keys_vec();
let mut active = mesh_state.borrow().active().keys_vec();

loop {
tokio::select! {
Expand Down Expand Up @@ -740,7 +740,7 @@ impl PresignatureSpawner {
protocol = cfg.borrow().protocol.clone();
}
Ok(()) = mesh_state.changed() => {
active = mesh_state.borrow().active.keys_vec();
active = mesh_state.borrow().active().keys_vec();
}
}
}
Expand Down
Loading
Loading