Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/books/hyperactor-book/src/actors/actor.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait Actor: Sized + Send + Debug + 'static {
) -> Result<(), anyhow::Error> {
assert_eq!(envelope.sender(), this.self_id());

anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
}
}
```
Expand Down Expand Up @@ -159,7 +159,7 @@ async fn handle_undeliverable_message(
) -> Result<(), anyhow::Error> {
assert_eq!(envelope.sender(), this.self_id());

anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
}
```
This method is called when a message sent by this actor fails to be delivered.
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub fn handle_undeliverable_message<A: Actor>(
) -> Result<(), anyhow::Error> {
assert_eq!(envelope.sender(), cx.self_id());

anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
}

/// An actor that does nothing. It is used to represent "client only" actors,
Expand Down
13 changes: 9 additions & 4 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,15 @@ impl MessageEnvelope {
impl fmt::Display for MessageEnvelope {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.error_msg() {
None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data),
None => write!(
f,
"{} > {}: {} {{{}}}",
self.sender, self.dest, self.data, self.headers
),
Some(err) => write!(
f,
"{} > {}: {}: delivery error: {}",
self.sender, self.dest, self.data, err
"{} > {}: {} {{{}}}: delivery error: {}",
self.sender, self.dest, self.data, self.headers, err
),
}
}
Expand Down Expand Up @@ -1610,6 +1614,7 @@ impl<M: Message> PortHandle<M> {
let mut headers = Attrs::new();

crate::mailbox::headers::set_send_timestamp(&mut headers);
crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);

self.sender.send(headers, message).map_err(|err| {
MailboxSenderError::new_unbound::<M>(
Expand Down Expand Up @@ -3060,7 +3065,7 @@ mod tests {

assert_eq!(
format!("{}", envelope),
r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"#
r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"#
);
}

Expand Down
9 changes: 9 additions & 0 deletions hyperactor/src/mailbox/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! This module provides header attributes and utilities for message metadata,
//! including latency tracking timestamps used to measure message processing times.

use std::any::type_name;
use std::time::SystemTime;

use crate::attrs::Attrs;
Expand All @@ -23,6 +24,9 @@ use crate::metrics::MESSAGE_LATENCY_MICROS;
declare_attrs! {
/// Send timestamp for message latency tracking
pub attr SEND_TIMESTAMP: SystemTime;

/// The rust type of the message.
pub attr RUST_MESSAGE_TYPE: String;
}

/// Set the send timestamp for latency tracking if timestamp not already set.
Expand All @@ -33,6 +37,11 @@ pub fn set_send_timestamp(headers: &mut Attrs) {
}
}

/// Set the send timestamp for latency tracking if timestamp not already set.
pub fn set_rust_message_type<M>(headers: &mut Attrs) {
headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
}

/// This function checks the configured sampling rate and, if the random sample passes,
/// calculates the latency between the send timestamp and the current time, then records
/// the latency metric with the associated actor ID.
Expand Down
47 changes: 15 additions & 32 deletions hyperactor/src/mailbox/undeliverable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,47 +103,30 @@ pub(crate) fn return_undeliverable(
/// Errors that occur during message delivery and return.
pub enum UndeliverableMessageError {
/// Delivery of a message to its destination failed.
#[error("a message from {from} to {to} was undeliverable and returned: {error:?}")]
#[error(
"a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
.envelope.sender(),
.envelope.dest(),
.envelope.error_msg()
)]
DeliveryFailure {
/// The sender of the message.
from: ActorId,
/// The destination of the message.
to: PortId,
/// Details of why the message couldn't be delivered.
error: Option<String>,
/// The undelivered message.
envelope: MessageEnvelope,
},

/// Delivery of an undeliverable message back to its sender
/// failed.
#[error("returning an undeliverable message to sender {sender} failed: {error:?}")]
#[error(
"returning an undeliverable message to sender {} failed: {:?}: {envelope}",
.envelope.sender(),
.envelope.error_msg()
)]
ReturnFailure {
/// The actor the message was to be returned to.
sender: ActorId,

/// Details of why the return failed.
error: Option<String>,
/// The undelivered message.
envelope: MessageEnvelope,
},
}

impl UndeliverableMessageError {
/// Constructs `DeliveryFailure` from a failed delivery attempt.
pub fn delivery_failure(envelope: &MessageEnvelope) -> Self {
UndeliverableMessageError::DeliveryFailure {
from: envelope.sender().clone(),
to: envelope.dest().clone(),
error: envelope.error_msg(),
}
}

/// Constructs a `ReturnFailure` from a failed return attempt.
pub fn return_failure(envelope: &MessageEnvelope) -> Self {
UndeliverableMessageError::ReturnFailure {
sender: envelope.sender().clone(),
error: envelope.error_msg(),
}
}
}

/// Drain undeliverables and convert them into
/// `ActorSupervisionEvent`, using a caller-provided resolver to
/// obtain the (possibly late) sink. If the resolver returns `None`,
Expand Down
1 change: 1 addition & 0 deletions hyperactor/src/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ impl<M: RemoteMessage> PortRef<M> {
message: Serialized,
) {
crate::mailbox::headers::set_send_timestamp(&mut headers);
crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
cx.post(self.port_id.clone(), headers, message);
}

Expand Down
4 changes: 2 additions & 2 deletions hyperactor/src/test_utils/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ impl Actor for PingPongActor {
match &self.params.undeliverable_port_ref {
Some(port) => port.send(cx, undelivered).unwrap(),
None => {
let Undeliverable(envelope) = &undelivered;
anyhow::bail!(UndeliverableMessageError::delivery_failure(envelope));
let Undeliverable(envelope) = undelivered;
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
}
}

Expand Down
5 changes: 3 additions & 2 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,7 @@ mod tests {
let serialized = Serialized::serialize(pay).unwrap();
let mut headers = Attrs::new();
hyperactor::mailbox::headers::set_send_timestamp(&mut headers);
hyperactor::mailbox::headers::set_rust_message_type::<Payload>(&mut headers);
let envelope = MessageEnvelope::new(src.clone(), dst.clone(), serialized, headers);
let frame = Frame::Message(0u64, envelope);
let message = serde_multipart::serialize_illegal_bincode(&frame).unwrap();
Expand Down Expand Up @@ -1547,7 +1548,7 @@ mod tests {

// Message sized to exactly max frame length.
let payload = Payload {
part: Part::from(Bytes::from(vec![0u8; 698])),
part: Part::from(Bytes::from(vec![0u8; 588])),
reply_port: reply_handle.bind(),
};
let frame_len = frame_length(
Expand All @@ -1567,7 +1568,7 @@ mod tests {

// Message sized to max frame length + 1.
let payload = Payload {
part: Part::from(Bytes::from(vec![0u8; 699])),
part: Part::from(Bytes::from(vec![0u8; 589])),
reply_port: reply_handle.bind(),
};
let frame_len = frame_length(
Expand Down
8 changes: 6 additions & 2 deletions hyperactor_mesh/src/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ impl Actor for CommActor {
err,
));
message_envelope.set_error(error);
UndeliverableMessageError::return_failure(&message_envelope)
UndeliverableMessageError::ReturnFailure {
envelope: message_envelope,
}
})?;
return Ok(());
}
Expand All @@ -226,7 +228,9 @@ impl Actor for CommActor {
err,
));
message_envelope.set_error(error);
UndeliverableMessageError::return_failure(&message_envelope)
UndeliverableMessageError::ReturnFailure {
envelope: message_envelope,
}
})?;
return Ok(());
}
Expand Down