From e138af34a67acdd101a8e9e34b74fa2d948e59aa Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Mon, 27 Oct 2025 08:54:47 -0700 Subject: [PATCH 1/3] [hyperactor] store rust type in message headers for diagnostics This will be useful for debugging undelivered messages in particular. We now store the undelivered envelope directly and render all of its headers. Differential Revision: [D85572863](https://our.internmc.facebook.com/intern/diff/D85572863/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85572863/)! [ghstack-poisoned] --- .../books/hyperactor-book/src/actors/actor.md | 4 +- hyperactor/src/actor.rs | 2 +- hyperactor/src/mailbox.rs | 13 +++-- hyperactor/src/mailbox/headers.rs | 9 ++++ hyperactor/src/mailbox/undeliverable.rs | 47 ++++++------------- hyperactor/src/reference.rs | 1 + hyperactor/src/test_utils/pingpong.rs | 4 +- hyperactor_mesh/src/comm.rs | 8 +++- 8 files changed, 45 insertions(+), 43 deletions(-) diff --git a/docs/source/books/hyperactor-book/src/actors/actor.md b/docs/source/books/hyperactor-book/src/actors/actor.md index 11edc0e4a..11e35cf74 100644 --- a/docs/source/books/hyperactor-book/src/actors/actor.md +++ b/docs/source/books/hyperactor-book/src/actors/actor.md @@ -49,7 +49,7 @@ pub trait Actor: Sized + Send + Debug + 'static { ) -> Result<(), anyhow::Error> { assert_eq!(envelope.sender(), this.self_id()); - anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope)); + anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope }); } } ``` @@ -159,7 +159,7 @@ async fn handle_undeliverable_message( ) -> Result<(), anyhow::Error> { assert_eq!(envelope.sender(), this.self_id()); - anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope)); + anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope }); } ``` This method is called when a message sent by this actor fails to be delivered. diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 5d711f459..4a58539e1 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -142,7 +142,7 @@ pub fn handle_undeliverable_message( ) -> Result<(), anyhow::Error> { assert_eq!(envelope.sender(), cx.self_id()); - anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope)); + anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope }); } /// An actor that does nothing. It is used to represent "client only" actors, diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index 2b31b8124..3eaef43ec 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -428,11 +428,15 @@ impl MessageEnvelope { impl fmt::Display for MessageEnvelope { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.error_msg() { - None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data), + None => write!( + f, + "{} > {}: {} {{{}}}", + self.sender, self.dest, self.data, self.headers + ), Some(err) => write!( f, - "{} > {}: {}: delivery error: {}", - self.sender, self.dest, self.data, err + "{} > {}: {} {{{}}}: delivery error: {}", + self.sender, self.dest, self.data, self.headers, err ), } } @@ -1610,6 +1614,7 @@ impl PortHandle { let mut headers = Attrs::new(); crate::mailbox::headers::set_send_timestamp(&mut headers); + crate::mailbox::headers::set_rust_message_type::(&mut headers); self.sender.send(headers, message).map_err(|err| { MailboxSenderError::new_unbound::( @@ -3060,7 +3065,7 @@ mod tests { assert_eq!( format!("{}", envelope), - r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"# + r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"# ); } diff --git a/hyperactor/src/mailbox/headers.rs b/hyperactor/src/mailbox/headers.rs index a99de34d1..f7f927f20 100644 --- a/hyperactor/src/mailbox/headers.rs +++ b/hyperactor/src/mailbox/headers.rs @@ -11,6 +11,7 @@ //! This module provides header attributes and utilities for message metadata, //! including latency tracking timestamps used to measure message processing times. +use std::any::type_name; use std::time::SystemTime; use crate::attrs::Attrs; @@ -23,6 +24,9 @@ use crate::metrics::MESSAGE_LATENCY_MICROS; declare_attrs! { /// Send timestamp for message latency tracking pub attr SEND_TIMESTAMP: SystemTime; + + /// The rust type of the message. + pub attr RUST_MESSAGE_TYPE: String; } /// Set the send timestamp for latency tracking if timestamp not already set. @@ -33,6 +37,11 @@ pub fn set_send_timestamp(headers: &mut Attrs) { } } +/// Set the send timestamp for latency tracking if timestamp not already set. +pub fn set_rust_message_type(headers: &mut Attrs) { + headers.set(RUST_MESSAGE_TYPE, type_name::().to_string()); +} + /// This function checks the configured sampling rate and, if the random sample passes, /// calculates the latency between the send timestamp and the current time, then records /// the latency metric with the associated actor ID. diff --git a/hyperactor/src/mailbox/undeliverable.rs b/hyperactor/src/mailbox/undeliverable.rs index cff25d173..56a57cf98 100644 --- a/hyperactor/src/mailbox/undeliverable.rs +++ b/hyperactor/src/mailbox/undeliverable.rs @@ -103,47 +103,30 @@ pub(crate) fn return_undeliverable( /// Errors that occur during message delivery and return. pub enum UndeliverableMessageError { /// Delivery of a message to its destination failed. - #[error("a message from {from} to {to} was undeliverable and returned: {error:?}")] + #[error( + "a message from {} to {} was undeliverable and returned: {:?}: {envelope}", + .envelope.sender(), + .envelope.dest(), + .envelope.error_msg() + )] DeliveryFailure { - /// The sender of the message. - from: ActorId, - /// The destination of the message. - to: PortId, - /// Details of why the message couldn't be delivered. - error: Option, + /// The undelivered message. + envelope: MessageEnvelope, }, /// Delivery of an undeliverable message back to its sender /// failed. - #[error("returning an undeliverable message to sender {sender} failed: {error:?}")] + #[error( + "returning an undeliverable message to sender {} failed: {:?}: {envelope}", + .envelope.sender(), + .envelope.error_msg() + )] ReturnFailure { - /// The actor the message was to be returned to. - sender: ActorId, - - /// Details of why the return failed. - error: Option, + /// The undelivered message. + envelope: MessageEnvelope, }, } -impl UndeliverableMessageError { - /// Constructs `DeliveryFailure` from a failed delivery attempt. - pub fn delivery_failure(envelope: &MessageEnvelope) -> Self { - UndeliverableMessageError::DeliveryFailure { - from: envelope.sender().clone(), - to: envelope.dest().clone(), - error: envelope.error_msg(), - } - } - - /// Constructs a `ReturnFailure` from a failed return attempt. - pub fn return_failure(envelope: &MessageEnvelope) -> Self { - UndeliverableMessageError::ReturnFailure { - sender: envelope.sender().clone(), - error: envelope.error_msg(), - } - } -} - /// Drain undeliverables and convert them into /// `ActorSupervisionEvent`, using a caller-provided resolver to /// obtain the (possibly late) sink. If the resolver returns `None`, diff --git a/hyperactor/src/reference.rs b/hyperactor/src/reference.rs index ad3ea04de..9ff34aa0b 100644 --- a/hyperactor/src/reference.rs +++ b/hyperactor/src/reference.rs @@ -1051,6 +1051,7 @@ impl PortRef { message: Serialized, ) { crate::mailbox::headers::set_send_timestamp(&mut headers); + crate::mailbox::headers::set_rust_message_type::(&mut headers); cx.post(self.port_id.clone(), headers, message); } diff --git a/hyperactor/src/test_utils/pingpong.rs b/hyperactor/src/test_utils/pingpong.rs index 30c32cc45..da50eace7 100644 --- a/hyperactor/src/test_utils/pingpong.rs +++ b/hyperactor/src/test_utils/pingpong.rs @@ -90,8 +90,8 @@ impl Actor for PingPongActor { match &self.params.undeliverable_port_ref { Some(port) => port.send(cx, undelivered).unwrap(), None => { - let Undeliverable(envelope) = &undelivered; - anyhow::bail!(UndeliverableMessageError::delivery_failure(envelope)); + let Undeliverable(envelope) = undelivered; + anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope }); } } diff --git a/hyperactor_mesh/src/comm.rs b/hyperactor_mesh/src/comm.rs index 3d1283d48..34b32822e 100644 --- a/hyperactor_mesh/src/comm.rs +++ b/hyperactor_mesh/src/comm.rs @@ -202,7 +202,9 @@ impl Actor for CommActor { err, )); message_envelope.set_error(error); - UndeliverableMessageError::return_failure(&message_envelope) + UndeliverableMessageError::ReturnFailure { + envelope: message_envelope, + } })?; return Ok(()); } @@ -226,7 +228,9 @@ impl Actor for CommActor { err, )); message_envelope.set_error(error); - UndeliverableMessageError::return_failure(&message_envelope) + UndeliverableMessageError::ReturnFailure { + envelope: message_envelope, + } })?; return Ok(()); } From 58e6dc3ebfe2277867496de675155e25bdcd6373 Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Mon, 27 Oct 2025 11:43:28 -0700 Subject: [PATCH 2/3] Update on "[hyperactor] store rust type in message headers for diagnostics" This will be useful for debugging undelivered messages in particular. We now store the undelivered envelope directly and render all of its headers. Differential Revision: [D85572863](https://our.internmc.facebook.com/intern/diff/D85572863/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85572863/)! [ghstack-poisoned] --- hyperactor/src/mailbox/headers.rs | 1 + hyperactor_mesh/src/actor_mesh.rs | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/hyperactor/src/mailbox/headers.rs b/hyperactor/src/mailbox/headers.rs index f7f927f20..5cd8c3cf9 100644 --- a/hyperactor/src/mailbox/headers.rs +++ b/hyperactor/src/mailbox/headers.rs @@ -39,6 +39,7 @@ pub fn set_send_timestamp(headers: &mut Attrs) { /// Set the send timestamp for latency tracking if timestamp not already set. pub fn set_rust_message_type(headers: &mut Attrs) { + eprintln!("rust message type: {}", type_name::()); headers.set(RUST_MESSAGE_TYPE, type_name::().to_string()); } diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 4ea59a057..a68a57c59 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -1508,6 +1508,7 @@ mod tests { let serialized = Serialized::serialize(pay).unwrap(); let mut headers = Attrs::new(); hyperactor::mailbox::headers::set_send_timestamp(&mut headers); + hyperactor::mailbox::headers::set_rust_message_type::(&mut headers); let envelope = MessageEnvelope::new(src.clone(), dst.clone(), serialized, headers); let frame = Frame::Message(0u64, envelope); let message = serde_multipart::serialize_illegal_bincode(&frame).unwrap(); @@ -1547,7 +1548,7 @@ mod tests { // Message sized to exactly max frame length. let payload = Payload { - part: Part::from(Bytes::from(vec![0u8; 698])), + part: Part::from(Bytes::from(vec![0u8; 588])), reply_port: reply_handle.bind(), }; let frame_len = frame_length( @@ -1567,7 +1568,7 @@ mod tests { // Message sized to max frame length + 1. let payload = Payload { - part: Part::from(Bytes::from(vec![0u8; 699])), + part: Part::from(Bytes::from(vec![0u8; 589])), reply_port: reply_handle.bind(), }; let frame_len = frame_length( From 06d5702237dd50617221bc3605cae5fee79800ee Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Mon, 27 Oct 2025 12:37:23 -0700 Subject: [PATCH 3/3] Update on "[hyperactor] store rust type in message headers for diagnostics" This will be useful for debugging undelivered messages in particular. We now store the undelivered envelope directly and render all of its headers. Differential Revision: [D85572863](https://our.internmc.facebook.com/intern/diff/D85572863/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85572863/)! [ghstack-poisoned] --- hyperactor/src/mailbox/headers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/hyperactor/src/mailbox/headers.rs b/hyperactor/src/mailbox/headers.rs index 5cd8c3cf9..f7f927f20 100644 --- a/hyperactor/src/mailbox/headers.rs +++ b/hyperactor/src/mailbox/headers.rs @@ -39,7 +39,6 @@ pub fn set_send_timestamp(headers: &mut Attrs) { /// Set the send timestamp for latency tracking if timestamp not already set. pub fn set_rust_message_type(headers: &mut Attrs) { - eprintln!("rust message type: {}", type_name::()); headers.set(RUST_MESSAGE_TYPE, type_name::().to_string()); }