@@ -501,10 +501,83 @@ def start(self):
501501 self .poller .start ()
502502 self .kernel .start ()
503503 self .io_loop = ioloop .IOLoop .current ()
504- try :
505- self .io_loop .start ()
506- except KeyboardInterrupt :
507- pass
504+ keep_running = True
505+ while keep_running :
506+ try :
507+ self .io_loop .start ()
508+ except KeyboardInterrupt :
509+ pass
510+ if not getattr (self .io_loop , '_fork_requested' , False ):
511+ keep_running = False
512+ else :
513+ pid = os .fork ()
514+ self .io_loop ._fork_requested = False # reset for parent AND child
515+ if pid == 0 :
516+ import asyncio
517+ self .log .debug ('Child kernel with pid ' , os .getpid ())
518+
519+ # try to aggresively close all sockets/ioloops etc
520+ for socket in [self .control_socket , self .iopub_socket , self .shell_socket , self .stdin_socket ]:
521+ socket .close ()
522+ self .io_loop .close (all_fds = True )
523+ self .io_loop .clear_current ()
524+ ioloop .IOLoop .clear_current ()
525+
526+ # check if the event loop is really being replaced
527+ loop = asyncio .get_event_loop ()
528+ self .log .debug ('asyncioloop: %r %r' , loop , id (loop ))
529+
530+ # install a new policy, sources:
531+ # https://bugs.python.org/issue21998
532+ # it will give each pid a new io loop
533+ _default_policy = asyncio .get_event_loop_policy ()
534+ _pid_loop = {}
535+ class MultiprocessingPolicy (asyncio .AbstractEventLoopPolicy ):
536+ def get_event_loop (self ):
537+ pid = os .getpid ()
538+ loop = _pid_loop .get (pid )
539+ if loop is None :
540+ loop = self .new_event_loop ()
541+ _pid_loop [pid ] = loop
542+ return loop
543+
544+ def set_event_loop (self , loop ):
545+ pid = os .getpid ()
546+ _pid_loop [pid ] = loop
547+
548+ def new_event_loop (self ):
549+ return _default_policy .new_event_loop ()
550+
551+ asyncio .set_event_loop_policy (MultiprocessingPolicy ())
552+ # del _pid_loop[os.getpid()]
553+ asyncio .new_event_loop ()
554+ loop = asyncio .get_event_loop ()
555+ self .log .debug ('asyncioloop: %r %r' , loop , id (loop ))
556+
557+ import tornado .platform .asyncio as tasio
558+ # explicitly create a new io look that will also be the current
559+ self .io_loop = tasio .AsyncIOMainLoop (make_current = True )
560+ assert self .io_loop == ioloop .IOLoop .current ()
561+
562+ # reset ports, so they will actually get updated
563+ self .hb_port = 0
564+ self .shell_port = 0
565+ self .iopub_port = 0
566+ self .stdin_port = 0
567+ self .control_port = 0
568+ # TODO: we might want to pass the same arguments, except the file
569+ # NOTE: we actually start a new kernel, but once this works
570+ # we can actually think about reusing the kernel object
571+ self .initialize (argv = ['-f' , 'conn_fork.json' , '--debug' ])
572+ self .start ()
573+ pass
574+ else :
575+ self .log .debug ('Parent kernel will resume' )
576+ # keep a reference, since the will set this to None
577+ post_fork_callback = self .io_loop ._post_fork_callback
578+ self .io_loop .add_callback (lambda : post_fork_callback (pid ))
579+ self .io_loop ._post_fork_callback = None
580+
508581
509582launch_new_instance = IPKernelApp .launch_instance
510583
0 commit comments