Skip to content

Commit ac1d6fb

Browse files
mariusaemeta-codesync[bot]
authored andcommitted
store rust type in message headers for diagnostics (#1669)
Summary: Pull Request resolved: #1669 This will be useful for debugging undelivered messages in particular. We now store the undelivered envelope directly and render all of its headers. ghstack-source-id: 318994665 exported-using-ghexport Reviewed By: vidhyav, shayne-fletcher Differential Revision: D85572863 fbshipit-source-id: 0b68368465396d8c851b612423ce5c3026148453
1 parent 3a5a9b2 commit ac1d6fb

File tree

9 files changed

+48
-45
lines changed

9 files changed

+48
-45
lines changed

docs/source/books/hyperactor-book/src/actors/actor.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub trait Actor: Sized + Send + Debug + 'static {
4949
) -> Result<(), anyhow::Error> {
5050
assert_eq!(envelope.sender(), this.self_id());
5151

52-
anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
52+
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
5353
}
5454
}
5555
```
@@ -159,7 +159,7 @@ async fn handle_undeliverable_message(
159159
) -> Result<(), anyhow::Error> {
160160
assert_eq!(envelope.sender(), this.self_id());
161161

162-
anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
162+
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
163163
}
164164
```
165165
This method is called when a message sent by this actor fails to be delivered.

hyperactor/src/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ pub fn handle_undeliverable_message<A: Actor>(
142142
) -> Result<(), anyhow::Error> {
143143
assert_eq!(envelope.sender(), cx.self_id());
144144

145-
anyhow::bail!(UndeliverableMessageError::delivery_failure(&envelope));
145+
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
146146
}
147147

148148
/// An actor that does nothing. It is used to represent "client only" actors,

hyperactor/src/mailbox.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,15 @@ impl MessageEnvelope {
428428
impl fmt::Display for MessageEnvelope {
429429
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430430
match &self.error_msg() {
431-
None => write!(f, "{} > {}: {}", self.sender, self.dest, self.data),
431+
None => write!(
432+
f,
433+
"{} > {}: {} {{{}}}",
434+
self.sender, self.dest, self.data, self.headers
435+
),
432436
Some(err) => write!(
433437
f,
434-
"{} > {}: {}: delivery error: {}",
435-
self.sender, self.dest, self.data, err
438+
"{} > {}: {} {{{}}}: delivery error: {}",
439+
self.sender, self.dest, self.data, self.headers, err
436440
),
437441
}
438442
}
@@ -1623,6 +1627,7 @@ impl<M: Message> PortHandle<M> {
16231627
let mut headers = Attrs::new();
16241628

16251629
crate::mailbox::headers::set_send_timestamp(&mut headers);
1630+
crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
16261631

16271632
self.sender.send(headers, message).map_err(|err| {
16281633
MailboxSenderError::new_unbound::<M>(
@@ -3074,7 +3079,7 @@ mod tests {
30743079

30753080
assert_eq!(
30763081
format!("{}", envelope),
3077-
r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"}"#
3082+
r#"source[0].actor[0] > dest[1].actor[0][123]: MyTest{"a":123,"b":"hello"} {}"#
30783083
);
30793084
}
30803085

hyperactor/src/mailbox/headers.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//! This module provides header attributes and utilities for message metadata,
1212
//! including latency tracking timestamps used to measure message processing times.
1313
14+
use std::any::type_name;
1415
use std::time::SystemTime;
1516

1617
use crate::attrs::Attrs;
@@ -23,6 +24,9 @@ use crate::metrics::MESSAGE_LATENCY_MICROS;
2324
declare_attrs! {
2425
/// Send timestamp for message latency tracking
2526
pub attr SEND_TIMESTAMP: SystemTime;
27+
28+
/// The rust type of the message.
29+
pub attr RUST_MESSAGE_TYPE: String;
2630
}
2731

2832
/// Set the send timestamp for latency tracking if timestamp not already set.
@@ -33,6 +37,11 @@ pub fn set_send_timestamp(headers: &mut Attrs) {
3337
}
3438
}
3539

40+
/// Set the send timestamp for latency tracking if timestamp not already set.
41+
pub fn set_rust_message_type<M>(headers: &mut Attrs) {
42+
headers.set(RUST_MESSAGE_TYPE, type_name::<M>().to_string());
43+
}
44+
3645
/// This function checks the configured sampling rate and, if the random sample passes,
3746
/// calculates the latency between the send timestamp and the current time, then records
3847
/// the latency metric with the associated actor ID.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -103,47 +103,30 @@ pub(crate) fn return_undeliverable(
103103
/// Errors that occur during message delivery and return.
104104
pub enum UndeliverableMessageError {
105105
/// Delivery of a message to its destination failed.
106-
#[error("a message from {from} to {to} was undeliverable and returned: {error:?}")]
106+
#[error(
107+
"a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
108+
.envelope.sender(),
109+
.envelope.dest(),
110+
.envelope.error_msg()
111+
)]
107112
DeliveryFailure {
108-
/// The sender of the message.
109-
from: ActorId,
110-
/// The destination of the message.
111-
to: PortId,
112-
/// Details of why the message couldn't be delivered.
113-
error: Option<String>,
113+
/// The undelivered message.
114+
envelope: MessageEnvelope,
114115
},
115116

116117
/// Delivery of an undeliverable message back to its sender
117118
/// failed.
118-
#[error("returning an undeliverable message to sender {sender} failed: {error:?}")]
119+
#[error(
120+
"returning an undeliverable message to sender {} failed: {:?}: {envelope}",
121+
.envelope.sender(),
122+
.envelope.error_msg()
123+
)]
119124
ReturnFailure {
120-
/// The actor the message was to be returned to.
121-
sender: ActorId,
122-
123-
/// Details of why the return failed.
124-
error: Option<String>,
125+
/// The undelivered message.
126+
envelope: MessageEnvelope,
125127
},
126128
}
127129

128-
impl UndeliverableMessageError {
129-
/// Constructs `DeliveryFailure` from a failed delivery attempt.
130-
pub fn delivery_failure(envelope: &MessageEnvelope) -> Self {
131-
UndeliverableMessageError::DeliveryFailure {
132-
from: envelope.sender().clone(),
133-
to: envelope.dest().clone(),
134-
error: envelope.error_msg(),
135-
}
136-
}
137-
138-
/// Constructs a `ReturnFailure` from a failed return attempt.
139-
pub fn return_failure(envelope: &MessageEnvelope) -> Self {
140-
UndeliverableMessageError::ReturnFailure {
141-
sender: envelope.sender().clone(),
142-
error: envelope.error_msg(),
143-
}
144-
}
145-
}
146-
147130
/// Drain undeliverables and convert them into
148131
/// `ActorSupervisionEvent`, using a caller-provided resolver to
149132
/// obtain the (possibly late) sink. If the resolver returns `None`,

hyperactor/src/reference.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ impl<M: RemoteMessage> PortRef<M> {
10511051
message: Serialized,
10521052
) {
10531053
crate::mailbox::headers::set_send_timestamp(&mut headers);
1054+
crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
10541055
cx.post(self.port_id.clone(), headers, message);
10551056
}
10561057

hyperactor/src/test_utils/pingpong.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ impl Actor for PingPongActor {
9090
match &self.params.undeliverable_port_ref {
9191
Some(port) => port.send(cx, undelivered).unwrap(),
9292
None => {
93-
let Undeliverable(envelope) = &undelivered;
94-
anyhow::bail!(UndeliverableMessageError::delivery_failure(envelope));
93+
let Undeliverable(envelope) = undelivered;
94+
anyhow::bail!(UndeliverableMessageError::DeliveryFailure { envelope });
9595
}
9696
}
9797

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,6 +1508,7 @@ mod tests {
15081508
let serialized = Serialized::serialize(pay).unwrap();
15091509
let mut headers = Attrs::new();
15101510
hyperactor::mailbox::headers::set_send_timestamp(&mut headers);
1511+
hyperactor::mailbox::headers::set_rust_message_type::<Payload>(&mut headers);
15111512
let envelope = MessageEnvelope::new(src.clone(), dst.clone(), serialized, headers);
15121513
let frame = Frame::Message(0u64, envelope);
15131514
let message = serde_multipart::serialize_illegal_bincode(&frame).unwrap();
@@ -1547,7 +1548,7 @@ mod tests {
15471548

15481549
// Message sized to exactly max frame length.
15491550
let payload = Payload {
1550-
part: Part::from(Bytes::from(vec![0u8; 698])),
1551+
part: Part::from(Bytes::from(vec![0u8; 588])),
15511552
reply_port: reply_handle.bind(),
15521553
};
15531554
let frame_len = frame_length(
@@ -1567,7 +1568,7 @@ mod tests {
15671568

15681569
// Message sized to max frame length + 1.
15691570
let payload = Payload {
1570-
part: Part::from(Bytes::from(vec![0u8; 699])),
1571+
part: Part::from(Bytes::from(vec![0u8; 589])),
15711572
reply_port: reply_handle.bind(),
15721573
};
15731574
let frame_len = frame_length(

hyperactor_mesh/src/comm.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ impl Actor for CommActor {
202202
err,
203203
));
204204
message_envelope.set_error(error);
205-
UndeliverableMessageError::return_failure(&message_envelope)
205+
UndeliverableMessageError::ReturnFailure {
206+
envelope: message_envelope,
207+
}
206208
})?;
207209
return Ok(());
208210
}
@@ -226,7 +228,9 @@ impl Actor for CommActor {
226228
err,
227229
));
228230
message_envelope.set_error(error);
229-
UndeliverableMessageError::return_failure(&message_envelope)
231+
UndeliverableMessageError::ReturnFailure {
232+
envelope: message_envelope,
233+
}
230234
})?;
231235
return Ok(());
232236
}

0 commit comments

Comments
 (0)