@@ -10,8 +10,12 @@ use std::error::Error;
1010use  std:: future:: Future ; 
1111use  std:: pin:: Pin ; 
1212
13+ use  hyperactor:: attrs:: declare_attrs; 
1314use  hyperactor:: clock:: Clock ; 
1415use  hyperactor:: clock:: RealClock ; 
16+ use  hyperactor:: config; 
17+ use  hyperactor:: config:: CONFIG ; 
18+ use  hyperactor:: config:: ConfigAttr ; 
1519use  monarch_types:: SerializablePyErr ; 
1620use  pyo3:: IntoPyObjectExt ; 
1721use  pyo3:: exceptions:: PyRuntimeError ; 
@@ -20,6 +24,7 @@ use pyo3::exceptions::PyTimeoutError;
2024use  pyo3:: exceptions:: PyValueError ; 
2125use  pyo3:: prelude:: * ; 
2226use  pyo3:: types:: PyNone ; 
27+ use  pyo3:: types:: PyString ; 
2328use  pyo3:: types:: PyType ; 
2429use  tokio:: sync:: Mutex ; 
2530use  tokio:: sync:: watch; 
@@ -28,20 +33,68 @@ use tokio::task::JoinHandle;
2833use  crate :: runtime:: get_tokio_runtime; 
2934use  crate :: runtime:: signal_safe_block_on; 
3035
36+ declare_attrs !  { 
37+     /// If true, when a pytokio PythonTask fails, the traceback of the original callsite 
38+      /// will be logged. 
39+      @meta( CONFIG  = ConfigAttr  { 
40+         env_name:  Some ( "MONARCH_HYPERACTOR_UNAWAITED_PYTOKIO_TRACEBACK" . to_string( ) ) , 
41+         py_name:  Some ( "unawaited_pytokio_traceback" . to_string( ) ) , 
42+     } ) 
43+     pub  attr UNAWAITED_PYTOKIO_TRACEBACK :  u8  = 0 ; 
44+ } 
45+ 
46+ fn  current_traceback ( )  -> PyResult < Option < PyObject > >  { 
47+     if  config:: global:: get ( UNAWAITED_PYTOKIO_TRACEBACK )  != 0  { 
48+         Python :: with_gil ( |py| { 
49+             Ok ( Some ( 
50+                 py. import ( "traceback" ) ?
51+                     . call_method0 ( "extract_stack" ) ?
52+                     . unbind ( ) , 
53+             ) ) 
54+         } ) 
55+     }  else  { 
56+         Ok ( None ) 
57+     } 
58+ } 
59+ 
60+ fn  format_traceback ( py :  Python < ' _ > ,  traceback :  & PyObject )  -> PyResult < String >  { 
61+     let  tb = py
62+         . import ( "traceback" ) ?
63+         . call_method1 ( "format_list" ,  ( traceback, ) ) ?; 
64+     PyString :: new ( py,  "" ) 
65+         . call_method1 ( "join" ,  ( tb, ) ) ?
66+         . extract :: < String > ( ) 
67+ } 
68+ 
3169/// Helper struct to make a Python future passable in an actor message. 
3270/// 
3371/// Also so that we don't have to write this massive type signature everywhere 
3472pub ( crate )  struct  PythonTask  { 
3573    future :  Mutex < Pin < Box < dyn  Future < Output  = PyResult < PyObject > >  + Send  + ' static > > > , 
74+     traceback :  Option < PyObject > , 
3675} 
3776
3877impl  PythonTask  { 
39-     pub ( crate )  fn  new ( fut :  impl  Future < Output  = PyResult < PyObject > >  + Send  + ' static )  -> Self  { 
78+     fn  new_with_traceback ( 
79+         fut :  impl  Future < Output  = PyResult < PyObject > >  + Send  + ' static , 
80+         traceback :  Option < PyObject > , 
81+     )  -> Self  { 
4082        Self  { 
4183            future :  Mutex :: new ( Box :: pin ( fut) ) , 
84+             traceback, 
4285        } 
4386    } 
4487
88+     pub ( crate )  fn  new ( 
89+         fut :  impl  Future < Output  = PyResult < PyObject > >  + Send  + ' static , 
90+     )  -> PyResult < Self >  { 
91+         Ok ( Self :: new_with_traceback ( fut,  current_traceback ( ) ?) ) 
92+     } 
93+ 
94+     fn  traceback ( & self )  -> & Option < PyObject >  { 
95+         & self . traceback 
96+     } 
97+ 
4598    pub ( crate )  fn  take ( self )  -> Pin < Box < dyn  Future < Output  = PyResult < PyObject > >  + Send  + ' static > >  { 
4699        self . future . into_inner ( ) 
47100    } 
@@ -101,17 +154,28 @@ impl PythonTaskAwaitIterator {
101154} 
102155
103156impl  PyPythonTask  { 
104-     pub   fn  new < F ,  T > ( fut :  F )  -> PyResult < Self > 
157+     fn  new_with_traceback < F ,  T > ( fut :  F ,   traceback :   Option < PyObject > )  -> PyResult < Self > 
105158    where 
106159        F :  Future < Output  = PyResult < T > >  + Send  + ' static , 
107160        T :  for < ' py >  IntoPyObject < ' py > , 
108161    { 
109-         Ok ( PythonTask :: new ( async  { 
110-             fut. await 
111-                 . and_then ( |t| Python :: with_gil ( |py| t. into_py_any ( py) ) ) 
112-         } ) 
162+         Ok ( PythonTask :: new_with_traceback ( 
163+             async  { 
164+                 fut. await 
165+                     . and_then ( |t| Python :: with_gil ( |py| t. into_py_any ( py) ) ) 
166+             } , 
167+             traceback, 
168+         ) 
113169        . into ( ) ) 
114170    } 
171+ 
172+     pub  fn  new < F ,  T > ( fut :  F )  -> PyResult < Self > 
173+     where 
174+         F :  Future < Output  = PyResult < T > >  + Send  + ' static , 
175+         T :  for < ' py >  IntoPyObject < ' py > , 
176+     { 
177+         Self :: new_with_traceback ( fut,  current_traceback ( ) ?) 
178+     } 
115179} 
116180
117181fn  to_py_error < T > ( e :  T )  -> PyErr 
@@ -131,6 +195,16 @@ impl PyPythonTask {
131195            . ok_or_else ( || PyValueError :: new_err ( "PythonTask already consumed" ) ) 
132196    } 
133197
198+     fn  traceback ( & self )  -> PyResult < Option < PyObject > >  { 
199+         if  let  Some ( task)  = & self . inner  { 
200+             Ok ( Python :: with_gil ( |py| { 
201+                 task. traceback ( ) . as_ref ( ) . map ( |t| t. clone_ref ( py) ) 
202+             } ) ) 
203+         }  else  { 
204+             Err ( PyValueError :: new_err ( "PythonTask already consumed" ) ) 
205+         } 
206+     } 
207+ 
134208    /// Prefer spawn_abortable over spawn if the future can be safely cancelled 
135209     /// when it is dropped. 
136210     /// This way any resources it is using will be freed up. This is especially 
@@ -141,30 +215,41 @@ impl PyPythonTask {
141215     /// PyShared is dropped. 
142216     pub ( crate )  fn  spawn_abortable ( & mut  self )  -> PyResult < PyShared >  { 
143217        let  ( tx,  rx)  = watch:: channel ( None ) ; 
218+         let  traceback = self . traceback ( ) ?; 
219+         let  traceback1 = self . traceback ( ) ?; 
144220        let  task = self . take_task ( ) ?; 
145221        let  handle = get_tokio_runtime ( ) . spawn ( async  move  { 
146-             send_result ( tx,  task. await ) ; 
222+             send_result ( tx,  task. await ,  traceback1 ) ; 
147223        } ) ; 
148224        Ok ( PyShared  { 
149225            rx, 
150226            handle, 
151227            abort :  true , 
228+             traceback, 
152229        } ) 
153230    } 
154231} 
155232
156233fn  send_result ( 
157234    tx :  tokio:: sync:: watch:: Sender < Option < PyResult < PyObject > > > , 
158235    result :  PyResult < PyObject > , 
236+     traceback :  Option < PyObject > , 
159237)  { 
160238    // a SendErr just means that there are no consumers of the value left. 
161239    match  tx. send ( Some ( result) )  { 
162240        Err ( tokio:: sync:: watch:: error:: SendError ( Some ( Err ( pyerr) ) ) )  => { 
163241            Python :: with_gil ( |py| { 
164-                 panic ! ( 
165-                     "PythonTask errored but is not being awaited: {}" , 
166-                     SerializablePyErr :: from( py,  & pyerr) 
167-                 ) 
242+                 let  tb = if  let  Some ( tb)  = traceback { 
243+                     format_traceback ( py,  & tb) . unwrap ( ) 
244+                 }  else  { 
245+                     "None (run with `MONARCH_HYPERACTOR_UNAWAITED_PYTOKIO_TRACEBACK=1` to see a traceback here)\n " . into ( ) 
246+                 } ; 
247+                 tracing:: error!( 
248+                     "PythonTask errored but is not being awaited; this will not crash your program, but indicates that \  
249+                      something went wrong.\n {}\n Traceback where the task was created (most recent call last):\n {}", 
250+                     SerializablePyErr :: from( py,  & pyerr) , 
251+                     tb
252+                 ) ; 
168253            } ) ; 
169254        } 
170255        _ => { } 
@@ -175,6 +260,7 @@ fn send_result(
175260impl  PyPythonTask  { 
176261    fn  block_on ( mut  slf :  PyRefMut < PyPythonTask > ,  py :  Python < ' _ > )  -> PyResult < PyObject >  { 
177262        let  task = slf. take_task ( ) ?; 
263+ 
178264        // mutable references to python objects must be dropped before calling 
179265        // signal_safe_block_on. It will release the GIL, and any other thread 
180266        // trying to access slf will throw. 
@@ -184,14 +270,17 @@ impl PyPythonTask {
184270
185271    pub ( crate )  fn  spawn ( & mut  self )  -> PyResult < PyShared >  { 
186272        let  ( tx,  rx)  = watch:: channel ( None ) ; 
273+         let  traceback = self . traceback ( ) ?; 
274+         let  traceback1 = self . traceback ( ) ?; 
187275        let  task = self . take_task ( ) ?; 
188276        let  handle = get_tokio_runtime ( ) . spawn ( async  move  { 
189-             send_result ( tx,  task. await ) ; 
277+             send_result ( tx,  task. await ,  traceback1 ) ; 
190278        } ) ; 
191279        Ok ( PyShared  { 
192280            rx, 
193281            handle, 
194282            abort :  false , 
283+             traceback, 
195284        } ) 
196285    } 
197286
@@ -278,26 +367,35 @@ impl PyPythonTask {
278367    } 
279368
280369    fn  with_timeout ( & mut  self ,  seconds :  f64 )  -> PyResult < PyPythonTask >  { 
370+         let  tb = self . traceback ( ) ?; 
281371        let  task = self . take_task ( ) ?; 
282-         PyPythonTask :: new ( async  move  { 
283-             RealClock 
284-                 . timeout ( std:: time:: Duration :: from_secs_f64 ( seconds) ,  task) 
285-                 . await 
286-                 . map_err ( |_| PyTimeoutError :: new_err ( ( ) ) ) ?
287-         } ) 
372+         PyPythonTask :: new_with_traceback ( 
373+             async  move  { 
374+                 RealClock 
375+                     . timeout ( std:: time:: Duration :: from_secs_f64 ( seconds) ,  task) 
376+                     . await 
377+                     . map_err ( |_| PyTimeoutError :: new_err ( ( ) ) ) ?
378+             } , 
379+             tb, 
380+         ) 
288381    } 
289382
290383    #[ staticmethod]  
291384    fn  spawn_blocking ( f :  PyObject )  -> PyResult < PyShared >  { 
292385        let  ( tx,  rx)  = watch:: channel ( None ) ; 
386+         let  traceback = current_traceback ( ) ?; 
387+         let  traceback1 = traceback
388+             . as_ref ( ) 
389+             . map_or_else ( || None ,  |t| Python :: with_gil ( |py| Some ( t. clone_ref ( py) ) ) ) ; 
293390        let  handle = get_tokio_runtime ( ) . spawn_blocking ( move  || { 
294391            let  result = Python :: with_gil ( |py| f. call0 ( py) ) ; 
295-             send_result ( tx,  result) ; 
392+             send_result ( tx,  result,  traceback1 ) ; 
296393        } ) ; 
297394        Ok ( PyShared  { 
298395            rx, 
299396            handle, 
300397            abort :  false , 
398+             traceback, 
301399        } ) 
302400    } 
303401
@@ -342,6 +440,7 @@ pub struct PyShared {
342440    rx :  watch:: Receiver < Option < PyResult < PyObject > > > , 
343441    handle :  JoinHandle < ( ) > , 
344442    abort :  bool , 
443+     traceback :  Option < PyObject > , 
345444} 
346445
347446impl  Drop  for  PyShared  { 
@@ -363,20 +462,27 @@ impl PyShared {
363462        // we can have multiple awaiters get triggered by the same change. 
364463        // self.rx will always be in the state where it hasn't see the change yet. 
365464        let  mut  rx = self . rx . clone ( ) ; 
366-         PyPythonTask :: new ( async  move  { 
367-             rx. changed ( ) . await . map_err ( to_py_error) ?; 
368-             let  b = rx. borrow ( ) ; 
369-             let  r = b. as_ref ( ) . unwrap ( ) ; 
370-             Python :: with_gil ( |py| match  r { 
371-                 Ok ( v)  => Ok ( v. bind ( py) . clone ( ) . unbind ( ) ) , 
372-                 Err ( err)  => Err ( err. clone_ref ( py) ) , 
373-             } ) 
374-         } ) 
465+         PyPythonTask :: new_with_traceback ( 
466+             async  move  { 
467+                 rx. changed ( ) . await . map_err ( to_py_error) ?; 
468+                 let  b = rx. borrow ( ) ; 
469+                 let  r = b. as_ref ( ) . unwrap ( ) ; 
470+                 Python :: with_gil ( |py| match  r { 
471+                     Ok ( v)  => Ok ( v. bind ( py) . clone ( ) . unbind ( ) ) , 
472+                     Err ( err)  => Err ( err. clone_ref ( py) ) , 
473+                 } ) 
474+             } , 
475+             self . traceback 
476+                 . as_ref ( ) 
477+                 . map_or_else ( || None ,  |t| Python :: with_gil ( |py| Some ( t. clone_ref ( py) ) ) ) , 
478+         ) 
375479    } 
480+ 
376481    fn  __await__ ( & mut  self ,  py :  Python < ' _ > )  -> PyResult < PythonTaskAwaitIterator >  { 
377482        let  task = self . task ( ) ?; 
378483        Ok ( PythonTaskAwaitIterator :: new ( task. into_py_any ( py) ?) ) 
379484    } 
485+ 
380486    pub  fn  block_on ( mut  slf :  PyRefMut < PyShared > ,  py :  Python < ' _ > )  -> PyResult < PyObject >  { 
381487        let  task = slf. task ( ) ?. take_task ( ) ?; 
382488        // mutable references to python objects must be dropped before calling 
0 commit comments