Skip to content

Commit 17de318

Browse files
samluryemeta-codesync[bot]
authored andcommitted
Add configurable stacktrace on unawaited pytokio PythonTask (#1679)
Summary: Pull Request resolved: #1679 This diff adds the environment variable `MONARCH_HYPERACTOR_UNAWAITED_PYTOKIO_TRACEBACK`, which is 0 by default. When it is set to 1, errors in unawaited pytokio `PythonTask` instances will print a traceback indicating where the `PythonTask` was created in addition to the error itself. This should only be used for development/debugging, because it adds overhead to compute the traceback for each new `PythonTask`. This diff also makes it so that the panic trace isn't printed anymore, and the new warning makes it clear that the unawaited task is *not* the cause of a crash. This should ideally make it less scary and less likely to cause people to go down a fruitless rabbit hole. ghstack-source-id: 319407613 exported-using-ghexport Reviewed By: shayne-fletcher Differential Revision: D85614449 fbshipit-source-id: 10b85568becf2470be735790051501ecf7ab0bea
1 parent dd2fe54 commit 17de318

File tree

4 files changed

+139
-33
lines changed

4 files changed

+139
-33
lines changed

monarch_hyperactor/src/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ impl Handler<PythonMessage> for PythonActor {
738738
handle_async_endpoint_panic(
739739
cx.port(),
740740
// self.panic_sender.clone(),
741-
PythonTask::new(future),
741+
PythonTask::new(future)?,
742742
receiver,
743743
)
744744
.instrument(

monarch_hyperactor/src/mailbox.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,9 @@ async fn recv_async(
336336

337337
#[pymethods]
338338
impl PythonPortReceiver {
339-
fn recv_task(&mut self) -> PyPythonTask {
339+
fn recv_task(&mut self) -> PyResult<PyPythonTask> {
340340
let receiver = self.inner.clone();
341-
PythonTask::new(recv_async(receiver)).into()
341+
Ok(PythonTask::new(recv_async(receiver))?.into())
342342
}
343343
}
344344

@@ -516,7 +516,7 @@ impl PythonOncePortReceiver {
516516
.map_err(|err| PyErr::new::<PyEOFError, _>(format!("Port closed: {}", err)))
517517
.and_then(|message| Python::with_gil(|py| message.into_py_any(py)))
518518
};
519-
Ok(PythonTask::new(fut).into())
519+
Ok(PythonTask::new(fut)?.into())
520520
}
521521
}
522522

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl PyProcMesh {
429429
Ok(PythonTask::new(async move {
430430
Self::stop_mesh(inner, proc_events).await?;
431431
Python::with_gil(|py| Ok(py.None()))
432-
})
432+
})?
433433
.into())
434434
}
435435
}

monarch_hyperactor/src/pytokio.rs

Lines changed: 134 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ use std::error::Error;
1010
use std::future::Future;
1111
use std::pin::Pin;
1212

13+
use hyperactor::attrs::declare_attrs;
1314
use hyperactor::clock::Clock;
1415
use hyperactor::clock::RealClock;
16+
use hyperactor::config;
17+
use hyperactor::config::CONFIG;
18+
use hyperactor::config::ConfigAttr;
1519
use monarch_types::SerializablePyErr;
1620
use pyo3::IntoPyObjectExt;
1721
use pyo3::exceptions::PyRuntimeError;
@@ -20,6 +24,7 @@ use pyo3::exceptions::PyTimeoutError;
2024
use pyo3::exceptions::PyValueError;
2125
use pyo3::prelude::*;
2226
use pyo3::types::PyNone;
27+
use pyo3::types::PyString;
2328
use pyo3::types::PyType;
2429
use tokio::sync::Mutex;
2530
use tokio::sync::watch;
@@ -28,20 +33,68 @@ use tokio::task::JoinHandle;
2833
use crate::runtime::get_tokio_runtime;
2934
use crate::runtime::signal_safe_block_on;
3035

36+
declare_attrs! {
37+
/// If true, when a pytokio PythonTask fails, the traceback of the original callsite
38+
/// will be logged.
39+
@meta(CONFIG = ConfigAttr {
40+
env_name: Some("MONARCH_HYPERACTOR_UNAWAITED_PYTOKIO_TRACEBACK".to_string()),
41+
py_name: Some("unawaited_pytokio_traceback".to_string()),
42+
})
43+
pub attr UNAWAITED_PYTOKIO_TRACEBACK: u8 = 0;
44+
}
45+
46+
fn current_traceback() -> PyResult<Option<PyObject>> {
47+
if config::global::get(UNAWAITED_PYTOKIO_TRACEBACK) != 0 {
48+
Python::with_gil(|py| {
49+
Ok(Some(
50+
py.import("traceback")?
51+
.call_method0("extract_stack")?
52+
.unbind(),
53+
))
54+
})
55+
} else {
56+
Ok(None)
57+
}
58+
}
59+
60+
fn format_traceback(py: Python<'_>, traceback: &PyObject) -> PyResult<String> {
61+
let tb = py
62+
.import("traceback")?
63+
.call_method1("format_list", (traceback,))?;
64+
PyString::new(py, "")
65+
.call_method1("join", (tb,))?
66+
.extract::<String>()
67+
}
68+
3169
/// Helper struct to make a Python future passable in an actor message.
3270
///
3371
/// Also so that we don't have to write this massive type signature everywhere
3472
pub(crate) struct PythonTask {
3573
future: Mutex<Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send + 'static>>>,
74+
traceback: Option<PyObject>,
3675
}
3776

3877
impl PythonTask {
39-
pub(crate) fn new(fut: impl Future<Output = PyResult<PyObject>> + Send + 'static) -> Self {
78+
fn new_with_traceback(
79+
fut: impl Future<Output = PyResult<PyObject>> + Send + 'static,
80+
traceback: Option<PyObject>,
81+
) -> Self {
4082
Self {
4183
future: Mutex::new(Box::pin(fut)),
84+
traceback,
4285
}
4386
}
4487

88+
pub(crate) fn new(
89+
fut: impl Future<Output = PyResult<PyObject>> + Send + 'static,
90+
) -> PyResult<Self> {
91+
Ok(Self::new_with_traceback(fut, current_traceback()?))
92+
}
93+
94+
fn traceback(&self) -> &Option<PyObject> {
95+
&self.traceback
96+
}
97+
4598
pub(crate) fn take(self) -> Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send + 'static>> {
4699
self.future.into_inner()
47100
}
@@ -101,17 +154,28 @@ impl PythonTaskAwaitIterator {
101154
}
102155

103156
impl PyPythonTask {
104-
pub fn new<F, T>(fut: F) -> PyResult<Self>
157+
fn new_with_traceback<F, T>(fut: F, traceback: Option<PyObject>) -> PyResult<Self>
105158
where
106159
F: Future<Output = PyResult<T>> + Send + 'static,
107160
T: for<'py> IntoPyObject<'py>,
108161
{
109-
Ok(PythonTask::new(async {
110-
fut.await
111-
.and_then(|t| Python::with_gil(|py| t.into_py_any(py)))
112-
})
162+
Ok(PythonTask::new_with_traceback(
163+
async {
164+
fut.await
165+
.and_then(|t| Python::with_gil(|py| t.into_py_any(py)))
166+
},
167+
traceback,
168+
)
113169
.into())
114170
}
171+
172+
pub fn new<F, T>(fut: F) -> PyResult<Self>
173+
where
174+
F: Future<Output = PyResult<T>> + Send + 'static,
175+
T: for<'py> IntoPyObject<'py>,
176+
{
177+
Self::new_with_traceback(fut, current_traceback()?)
178+
}
115179
}
116180

117181
fn to_py_error<T>(e: T) -> PyErr
@@ -131,6 +195,16 @@ impl PyPythonTask {
131195
.ok_or_else(|| PyValueError::new_err("PythonTask already consumed"))
132196
}
133197

198+
fn traceback(&self) -> PyResult<Option<PyObject>> {
199+
if let Some(task) = &self.inner {
200+
Ok(Python::with_gil(|py| {
201+
task.traceback().as_ref().map(|t| t.clone_ref(py))
202+
}))
203+
} else {
204+
Err(PyValueError::new_err("PythonTask already consumed"))
205+
}
206+
}
207+
134208
/// Prefer spawn_abortable over spawn if the future can be safely cancelled
135209
/// when it is dropped.
136210
/// This way any resources it is using will be freed up. This is especially
@@ -141,30 +215,41 @@ impl PyPythonTask {
141215
/// PyShared is dropped.
142216
pub(crate) fn spawn_abortable(&mut self) -> PyResult<PyShared> {
143217
let (tx, rx) = watch::channel(None);
218+
let traceback = self.traceback()?;
219+
let traceback1 = self.traceback()?;
144220
let task = self.take_task()?;
145221
let handle = get_tokio_runtime().spawn(async move {
146-
send_result(tx, task.await);
222+
send_result(tx, task.await, traceback1);
147223
});
148224
Ok(PyShared {
149225
rx,
150226
handle,
151227
abort: true,
228+
traceback,
152229
})
153230
}
154231
}
155232

156233
fn send_result(
157234
tx: tokio::sync::watch::Sender<Option<PyResult<PyObject>>>,
158235
result: PyResult<PyObject>,
236+
traceback: Option<PyObject>,
159237
) {
160238
// a SendErr just means that there are no consumers of the value left.
161239
match tx.send(Some(result)) {
162240
Err(tokio::sync::watch::error::SendError(Some(Err(pyerr)))) => {
163241
Python::with_gil(|py| {
164-
panic!(
165-
"PythonTask errored but is not being awaited: {}",
166-
SerializablePyErr::from(py, &pyerr)
167-
)
242+
let tb = if let Some(tb) = traceback {
243+
format_traceback(py, &tb).unwrap()
244+
} else {
245+
"None (run with `MONARCH_HYPERACTOR_UNAWAITED_PYTOKIO_TRACEBACK=1` to see a traceback here)\n".into()
246+
};
247+
tracing::error!(
248+
"PythonTask errored but is not being awaited; this will not crash your program, but indicates that \
249+
something went wrong.\n{}\nTraceback where the task was created (most recent call last):\n{}",
250+
SerializablePyErr::from(py, &pyerr),
251+
tb
252+
);
168253
});
169254
}
170255
_ => {}
@@ -175,6 +260,7 @@ fn send_result(
175260
impl PyPythonTask {
176261
fn block_on(mut slf: PyRefMut<PyPythonTask>, py: Python<'_>) -> PyResult<PyObject> {
177262
let task = slf.take_task()?;
263+
178264
// mutable references to python objects must be dropped before calling
179265
// signal_safe_block_on. It will release the GIL, and any other thread
180266
// trying to access slf will throw.
@@ -184,14 +270,17 @@ impl PyPythonTask {
184270

185271
pub(crate) fn spawn(&mut self) -> PyResult<PyShared> {
186272
let (tx, rx) = watch::channel(None);
273+
let traceback = self.traceback()?;
274+
let traceback1 = self.traceback()?;
187275
let task = self.take_task()?;
188276
let handle = get_tokio_runtime().spawn(async move {
189-
send_result(tx, task.await);
277+
send_result(tx, task.await, traceback1);
190278
});
191279
Ok(PyShared {
192280
rx,
193281
handle,
194282
abort: false,
283+
traceback,
195284
})
196285
}
197286

@@ -278,26 +367,35 @@ impl PyPythonTask {
278367
}
279368

280369
fn with_timeout(&mut self, seconds: f64) -> PyResult<PyPythonTask> {
370+
let tb = self.traceback()?;
281371
let task = self.take_task()?;
282-
PyPythonTask::new(async move {
283-
RealClock
284-
.timeout(std::time::Duration::from_secs_f64(seconds), task)
285-
.await
286-
.map_err(|_| PyTimeoutError::new_err(()))?
287-
})
372+
PyPythonTask::new_with_traceback(
373+
async move {
374+
RealClock
375+
.timeout(std::time::Duration::from_secs_f64(seconds), task)
376+
.await
377+
.map_err(|_| PyTimeoutError::new_err(()))?
378+
},
379+
tb,
380+
)
288381
}
289382

290383
#[staticmethod]
291384
fn spawn_blocking(f: PyObject) -> PyResult<PyShared> {
292385
let (tx, rx) = watch::channel(None);
386+
let traceback = current_traceback()?;
387+
let traceback1 = traceback
388+
.as_ref()
389+
.map_or_else(|| None, |t| Python::with_gil(|py| Some(t.clone_ref(py))));
293390
let handle = get_tokio_runtime().spawn_blocking(move || {
294391
let result = Python::with_gil(|py| f.call0(py));
295-
send_result(tx, result);
392+
send_result(tx, result, traceback1);
296393
});
297394
Ok(PyShared {
298395
rx,
299396
handle,
300397
abort: false,
398+
traceback,
301399
})
302400
}
303401

@@ -342,6 +440,7 @@ pub struct PyShared {
342440
rx: watch::Receiver<Option<PyResult<PyObject>>>,
343441
handle: JoinHandle<()>,
344442
abort: bool,
443+
traceback: Option<PyObject>,
345444
}
346445

347446
impl Drop for PyShared {
@@ -363,20 +462,27 @@ impl PyShared {
363462
// we can have multiple awaiters get triggered by the same change.
364463
// self.rx will always be in the state where it hasn't see the change yet.
365464
let mut rx = self.rx.clone();
366-
PyPythonTask::new(async move {
367-
rx.changed().await.map_err(to_py_error)?;
368-
let b = rx.borrow();
369-
let r = b.as_ref().unwrap();
370-
Python::with_gil(|py| match r {
371-
Ok(v) => Ok(v.bind(py).clone().unbind()),
372-
Err(err) => Err(err.clone_ref(py)),
373-
})
374-
})
465+
PyPythonTask::new_with_traceback(
466+
async move {
467+
rx.changed().await.map_err(to_py_error)?;
468+
let b = rx.borrow();
469+
let r = b.as_ref().unwrap();
470+
Python::with_gil(|py| match r {
471+
Ok(v) => Ok(v.bind(py).clone().unbind()),
472+
Err(err) => Err(err.clone_ref(py)),
473+
})
474+
},
475+
self.traceback
476+
.as_ref()
477+
.map_or_else(|| None, |t| Python::with_gil(|py| Some(t.clone_ref(py)))),
478+
)
375479
}
480+
376481
fn __await__(&mut self, py: Python<'_>) -> PyResult<PythonTaskAwaitIterator> {
377482
let task = self.task()?;
378483
Ok(PythonTaskAwaitIterator::new(task.into_py_any(py)?))
379484
}
485+
380486
pub fn block_on(mut slf: PyRefMut<PyShared>, py: Python<'_>) -> PyResult<PyObject> {
381487
let task = slf.task()?.take_task()?;
382488
// mutable references to python objects must be dropped before calling

0 commit comments

Comments
 (0)