@@ -254,6 +254,9 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
254254 else :
255255 self .upstreams = []
256256
257+ # Lazily loaded exception handler to avoid recursion
258+ self ._on_exception = None
259+
257260 self ._set_asynchronous (asynchronous )
258261 self ._set_loop (loop )
259262 if ensure_io_loop and not self .loop :
@@ -445,14 +448,18 @@ def _emit(self, x, metadata=None):
445448
446449 result = []
447450 for downstream in list (self .downstreams ):
448- r = downstream .update (x , who = self , metadata = metadata )
449-
450- if type (r ) is list :
451- result .extend (r )
452- else :
453- result .append (r )
451+ try :
452+ r = downstream .update (x , who = self , metadata = metadata )
454453
455- self ._release_refs (metadata )
454+ if type (r ) is list :
455+ result .extend (r )
456+ else :
457+ result .append (r )
458+ except Exception as exc :
459+ # Push this exception to the on_exception handler on the downstream that raised
460+ downstream .on_exception ().update ((x , exc ) , who = self , metadata = metadata )
461+ finally :
462+ self ._release_refs (metadata )
456463
457464 return [element for element in result if element is not None ]
458465
@@ -671,6 +678,30 @@ def _release_refs(self, metadata, n=1):
671678 if 'ref' in m :
672679 m ['ref' ].release (n )
673680
681+ def on_exception (self ):
682+ """Returns the exception handler associated with this stream
683+ """
684+ self ._on_exception = self ._on_exception or _on_exception ()
685+ return self ._on_exception
686+
687+
688+ class InvalidDataError (Exception ):
689+ pass
690+
691+ class _on_exception (Stream ):
692+
693+ def __init__ (self , * args , ** kwargs ):
694+ self .silent = False
695+ Stream .__init__ (self , * args , ** kwargs )
696+
697+ def update (self , x , who = None , metadata = None ):
698+ cause , exc = x
699+
700+ if self .silent or len (self .downstreams ) > 0 :
701+ self ._emit (x , metadata = metadata )
702+ else :
703+ logger .exception (exc )
704+ raise InvalidDataError (cause ) from exc
674705
675706@Stream .register_api ()
676707class map (Stream ):
@@ -706,13 +737,8 @@ def __init__(self, upstream, func, *args, **kwargs):
706737 Stream .__init__ (self , upstream , stream_name = stream_name )
707738
708739 def update (self , x , who = None , metadata = None ):
709- try :
710- result = self .func (x , * self .args , ** self .kwargs )
711- except Exception as e :
712- logger .exception (e )
713- raise
714- else :
715- return self ._emit (result , metadata = metadata )
740+ result = self .func (x , * self .args , ** self .kwargs )
741+ self ._emit (result , metadata = metadata )
716742
717743
718744@Stream .register_api ()
@@ -890,11 +916,7 @@ def update(self, x, who=None, metadata=None):
890916 else :
891917 return self ._emit (x , metadata = metadata )
892918 else :
893- try :
894- result = self .func (self .state , x , ** self .kwargs )
895- except Exception as e :
896- logger .exception (e )
897- raise
919+ result = self .func (self .state , x , ** self .kwargs )
898920 if self .returns_state :
899921 state , result = result
900922 else :
0 commit comments