Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 136 additions & 73 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,13 @@ impl SignQueue {
.with_label_values(&[indexed.chain.as_str(), my_account_id.as_str()])
.inc();

let request = self.organize_request(stable, participants, indexed, 0);
let mut request = self.organize_request(stable, participants, indexed, 0);
if request.indexed.timestamp_sign_queue.is_none() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this field optional in the first place? Can we make it non optional and ensure timestamp is added at the indexer level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it had a purpose once upon a time. Maybe @ppca can probably explain why, but I've since removed it in #587

request.indexed.timestamp_sign_queue = Some(Instant::now());
}
let is_mine = request.proposer == self.me;
if is_mine {
self.my_requests.push_back(sign_id);
self.my_requests.push_front(sign_id);
crate::metrics::NUM_SIGN_REQUESTS_MINE
.with_label_values(&[my_account_id.as_str()])
.inc();
Expand All @@ -286,26 +289,31 @@ impl SignQueue {
participants: &Participants,
my_account_id: &AccountId,
) {
while let Some(id) = self.failed_requests.pop_front() {
let Some(request) = self.requests.remove(&id) else {
let now = Instant::now();
let len = self.failed_requests.len();
for _ in 0..len {
let Some(id) = self.failed_requests.pop_front() else {
break;
};

let Some(mut request) = self.requests.remove(&id) else {
continue;
};

let (reorganized, request) = if &request.stable == stable {
// just use the same request if the participants are the same
(false, request)
} else {
let request =
let mut reorganized = false;
if &request.stable != stable {
request =
self.organize_request(stable, participants, request.indexed, request.round);
(true, request)
};
reorganized = true;
}

if request.indexed.timestamp_sign_queue.is_none() {
request.indexed.timestamp_sign_queue = Some(now);
}

// NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures.
// TODO: we need to decide how to prioritize certain requests over others such as with gas or time of
// when the request made it into the NEAR network.
// issue: https://github.com/near/mpc-recovery/issues/596
if request.proposer == self.me {
self.my_requests.push_front(request.indexed.id);
// Older retries are appended to the back so newly indexed requests run first.
self.my_requests.push_back(request.indexed.id);
if reorganized {
crate::metrics::NUM_SIGN_REQUESTS_MINE
.with_label_values(&[my_account_id.as_str()])
Expand All @@ -318,7 +326,13 @@ impl SignQueue {
}

pub fn push_failed(&mut self, sign_id: SignId) {
self.failed_requests.push_back(sign_id);
if self.requests.contains_key(&sign_id) {
if !self.failed_requests.contains(&sign_id) {
self.failed_requests.push_back(sign_id);
}
} else {
tracing::warn!(?sign_id, "failed sign request missing from queue");
}
}

pub fn take_mine(&mut self) -> Option<SignRequest> {
Expand All @@ -336,42 +350,22 @@ impl SignQueue {
}
}

pub fn expire(&mut self, cfg: &ProtocolConfig) {
self.requests.retain(|_, request| {
request.indexed.timestamp_sign_queue.is_none_or(|t| {
t.elapsed() < Duration::from_millis(cfg.signature.generation_timeout_total)
})
});
self.my_requests.retain(|id| {
let Some(request) = self.requests.get(id) else {
// if we are unable to find the corresponding request, we can remove it.
return false;
};
crate::util::duration_between_unix(
request.indexed.unix_timestamp_indexed,
crate::util::current_unix_timestamp(),
) < request.indexed.total_timeout
});
self.failed_requests.retain(|id| {
let Some(request) = self.requests.get(id) else {
// if we are unable to find the corresponding request, we can remove it.
return false;
};
crate::util::duration_between_unix(
request.indexed.unix_timestamp_indexed,
crate::util::current_unix_timestamp(),
) < request.indexed.total_timeout
});
pub fn expire(&mut self, _cfg: &ProtocolConfig) {
self.my_requests.retain(|id| self.requests.contains_key(id));
self.failed_requests
.retain(|id| self.requests.contains_key(id));
}

pub fn remove(&mut self, sign_id: SignId) -> Option<SignRequest> {
self.pending.remove(&sign_id);
self.my_requests.retain(|id| id != &sign_id);
self.failed_requests.retain(|id| id != &sign_id);
self.requests.remove(&sign_id)
}
}

enum SignError {
Retry,
TotalTimeout,
Aborted,
}

Expand All @@ -380,11 +374,11 @@ struct SignatureGenerator {
protocol: SignatureProtocol,
dropper: PresignatureTakenDropper,
participants: Vec<Participant>,
me: Participant,
request: SignRequest,
public_key: PublicKey,
created: Instant,
timeout: Duration,
timeout_total: Duration,
inbox: mpsc::Receiver<SignatureMessage>,
msg: MessageChannel,
rpc: RpcChannel,
Expand Down Expand Up @@ -457,11 +451,11 @@ impl SignatureGenerator {
protocol,
dropper,
participants,
me,
request,
public_key,
created: Instant::now(),
timeout: Duration::from_millis(cfg.signature.generation_timeout),
timeout_total: Duration::from_millis(cfg.signature.generation_timeout_total),
inbox,
msg,
rpc,
Expand All @@ -474,16 +468,6 @@ impl SignatureGenerator {
})
}

fn timeout_total(&self) -> bool {
let timestamp = self
.request
.indexed
.timestamp_sign_queue
.as_ref()
.unwrap_or(&self.created);
timestamp.elapsed() >= self.timeout_total
}

/// Receive the next message for the signature protocol; error out on the timeout being reached
/// or the channel having been closed (aborted).
async fn recv(&mut self) -> Result<SignatureMessage, SignError> {
Expand All @@ -497,13 +481,21 @@ impl SignatureGenerator {
{
Ok(Some(msg)) => Ok(msg),
Ok(None) => {
tracing::warn!(?sign_id, presignature_id, "signature generation aborted");
tracing::warn!(
?sign_id,
presignature_id,
proposer = ?self.request.proposer,
me = ?self.me,
"signature generation aborted",
);
Err(SignError::Aborted)
}
Err(_err) => {
tracing::warn!(
?sign_id,
presignature_id,
proposer = ?self.request.proposer,
me = ?self.me,
"signature generation timeout, retrying..."
);
Err(SignError::Retry)
Expand All @@ -523,8 +515,6 @@ impl SignatureGenerator {
crate::metrics::SIGNATURE_POKES_CNT.with_label_values(&[my_account_id.as_str()]);
let signature_generator_failures_metric = crate::metrics::SIGNATURE_GENERATOR_FAILURES
.with_label_values(&[my_account_id.as_str()]);
let signature_failures_metric =
crate::metrics::SIGNATURE_FAILURES.with_label_values(&[my_account_id.as_str()]);
let poke_latency =
crate::metrics::SIGNATURE_POKE_CPU_TIME.with_label_values(&[my_account_id.as_str()]);

Expand All @@ -539,19 +529,6 @@ impl SignatureGenerator {
.observe(self.created.elapsed().as_millis() as f64);

loop {
if self.timeout_total() {
tracing::warn!(
?sign_id,
presignature_id,
"signature generation timeout, exhausted all attempts"
);
if self.request.proposer == me {
signature_generator_failures_metric.inc();
signature_failures_metric.inc();
}
break Err(SignError::TotalTimeout);
}

let poke_start_time = Instant::now();
let action = match self.protocol.poke() {
Ok(action) => action,
Expand All @@ -561,6 +538,9 @@ impl SignatureGenerator {
?err,
"signature generation failed on protocol advancement",
);
if self.request.proposer == me {
signature_generator_failures_metric.inc();
}
break Err(SignError::Retry);
}
};
Expand Down Expand Up @@ -1088,9 +1068,12 @@ impl SignatureSpawner {

match result {
Err(SignError::Retry) => {
crate::metrics::SIGNATURE_FAILURES
.with_label_values(&[self.my_account_id.as_str()])
.inc();
self.sign_queue.push_failed(sign_id);
}
Ok(()) | Err(SignError::TotalTimeout) | Err(SignError::Aborted) => {
Ok(()) | Err(SignError::Aborted) => {
self.sign_queue.remove(sign_id);
}
}
Expand Down Expand Up @@ -1208,3 +1191,83 @@ impl PendingPresignature {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::ParticipantInfo;
use k256::Scalar;
use near_account_id::AccountId;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;

fn make_indexed_request(id: SignId) -> IndexedSignRequest {
IndexedSignRequest {
id,
args: SignArgs {
entropy: id.request_id,
epsilon: Scalar::ONE,
payload: Scalar::ONE,
path: "test".to_string(),
key_version: 0,
},
chain: Chain::Ethereum,
unix_timestamp_indexed: 0,
timestamp_sign_queue: None,
total_timeout: Duration::from_secs(60),
sign_request_type: SignRequestType::Sign,
participants: None,
}
}

fn setup_queue() -> (
Participant,
BTreeSet<Participant>,
Participants,
SignQueue,
AccountId,
mpsc::Sender<IndexedSignRequest>,
) {
let (tx, rx) = SignQueue::channel();
let me = Participant::from(0u32);
let other = Participant::from(1u32);
let mut stable = BTreeSet::new();
stable.insert(me);
stable.insert(other);

let mut participants_map = Participants::default();
participants_map.insert(&me, ParticipantInfo::new(0));
participants_map.insert(&other, ParticipantInfo::new(1));

let account_id = AccountId::from_str("me.near").unwrap();
let queue = SignQueue::new(me, Arc::new(RwLock::new(rx)));

(me, stable, participants_map, queue, account_id, tx)
}

#[tokio::test]
async fn prioritizes_new_requests_over_old_retries() {
let (me, stable, participants, mut queue, account_id, tx) = setup_queue();

let old_id = SignId::new([5; 32]);
let mut old_indexed = make_indexed_request(old_id);
old_indexed.participants = Some(vec![me]);
let old_request = queue.organize_request(&stable, &participants, old_indexed, 0);
queue.requests.insert(old_id, old_request);
queue.push_failed(old_id);

let new_id = SignId::new([9; 32]);
let mut new_indexed = make_indexed_request(new_id);
new_indexed.participants = Some(vec![me]);
tx.try_send(new_indexed).unwrap();

queue.organize(&stable, &participants, &account_id).await;

let first = queue.take_mine().expect("new request present");
assert_eq!(first.indexed.id, new_id);
let second = queue.take_mine().expect("old request present");
assert_eq!(second.indexed.id, old_id);
assert!(queue.my_requests.is_empty());
}
}
15 changes: 0 additions & 15 deletions chain-signatures/node/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use mpc_crypto::{near_public_key_to_affine_point, PublicKey};

use chrono::{DateTime, LocalResult, TimeZone, Utc};
use k256::elliptic_curve::sec1::{FromEncodedPoint, ToEncodedPoint};
use k256::{AffinePoint, EncodedPoint};
use tokio::task::{AbortHandle, JoinSet};
Expand Down Expand Up @@ -69,20 +68,6 @@ impl AffinePointExt for AffinePoint {
}
}

pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool {
if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) {
let timeout = Duration::from_millis(timeout);
let now_datetime: DateTime<Utc> = Utc::now();
// Calculate the difference in seconds
let elapsed_duration = now_datetime.signed_duration_since(msg_timestamp);
let timeout = chrono::Duration::seconds(timeout.as_secs() as i64)
+ chrono::Duration::nanoseconds(timeout.subsec_nanos() as i64);
elapsed_duration > timeout
} else {
false
}
}

pub fn duration_between_unix(from_timestamp: u64, to_timestamp: u64) -> Duration {
Duration::from_secs(to_timestamp - from_timestamp)
}
Expand Down
Loading
Loading