[Python-modules-commits] [ipykernel] 04/15: Import ipykernel_4.3.1.orig.tar.gz

Julien Cristau jcristau at moszumanska.debian.org
Fri Apr 8 16:08:32 UTC 2016


This is an automated email from the git hooks/post-receive script.

jcristau pushed a commit to branch master
in repository ipykernel.

commit dad8c3435089bb8700d9cb387546811501adf414
Author: David Douard <david.douard at logilab.fr>
Date:   Fri Apr 8 14:06:41 2016 +0200

    Import ipykernel_4.3.1.orig.tar.gz
---
 PKG-INFO                        |   2 +-
 docs/changelog.rst              |  24 +++
 ipykernel/_version.py           |   2 +-
 ipykernel/codeutil.py           |   3 +
 ipykernel/connect.py            |  65 ++++----
 ipykernel/datapub.py            |   5 +
 ipykernel/inprocess/ipkernel.py |  21 ++-
 ipykernel/iostream.py           | 352 +++++++++++++++++++++++++---------------
 ipykernel/ipkernel.py           |  45 +++--
 ipykernel/kernelapp.py          |  51 +++++-
 ipykernel/kernelbase.py         | 124 +++++++-------
 ipykernel/kernelspec.py         |   2 +-
 ipykernel/log.py                |   3 +
 ipykernel/pickleutil.py         |   3 +
 ipykernel/serialize.py          |   3 +
 ipykernel/tests/test_connect.py |   8 +-
 ipykernel/tests/test_kernel.py  |  61 +++++--
 ipykernel/zmqshell.py           |  30 +++-
 setup.py                        |   1 +
 19 files changed, 540 insertions(+), 265 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 65a95fa..bec1061 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: ipykernel
-Version: 4.2.2
+Version: 4.3.1
 Summary: IPython Kernel for Jupyter
 Home-page: http://ipython.org
 Author: IPython Development Team
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 91fbaf6..03ba591 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -1,6 +1,30 @@
 Changes in IPython kernel
 =========================
 
+4.3
+---
+
+4.3.1
+*****
+
+- Fix Windows Python 3.5 incompatibility caused by faulthandler patch in 4.3
+
+4.3.0
+*****
+
+`4.3.0 on GitHub <https://github.com/ipython/ipykernel/milestones/4.3>`__
+
+- Publish all IO in a thread, via :class:`IOPubThread`.
+  This solves the problem of requiring :meth:`sys.stdout.flush` to be called in the notebook to produce output promptly during long-running cells.
+- Remove refrences to outdated IPython guiref in kernel banner.
+- Patch faulthandler to use ``sys.__stderr__`` instead of forwarded ``sys.stderr``,
+  which has no fileno when forwarded.
+- Deprecate some vestiges of the Big Split:
+  - :func:`ipykernel.find_connection_file` is deprecated. Use :func:`jupyter_client.find_connection_file` instead.
+  - Various pieces of code specific to IPython parallel are deprecated in ipykernel
+  and moved to ipyparallel.
+
+
 4.2
 ---
 
diff --git a/ipykernel/_version.py b/ipykernel/_version.py
index b0e4d75..194c680 100644
--- a/ipykernel/_version.py
+++ b/ipykernel/_version.py
@@ -1,4 +1,4 @@
-version_info = (4, 2, 2)
+version_info = (4, 3, 1)
 __version__ = '.'.join(map(str, version_info))
 
 kernel_protocol_version_info = (5, 0)
diff --git a/ipykernel/codeutil.py b/ipykernel/codeutil.py
index 887bd90..1d88b11 100644
--- a/ipykernel/codeutil.py
+++ b/ipykernel/codeutil.py
@@ -13,6 +13,9 @@ Reference: A. Tremols, P Cogolo, "Python Cookbook," p 302-305
 # Copyright (c) IPython Development Team.
 # Distributed under the terms of the Modified BSD License.
 
+import warnings
+warnings.warn("ipykernel.codeutil is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)
+
 import sys
 import types
 try:
diff --git a/ipykernel/connect.py b/ipykernel/connect.py
index beadd9f..e49ad10 100644
--- a/ipykernel/connect.py
+++ b/ipykernel/connect.py
@@ -8,6 +8,7 @@ from __future__ import absolute_import
 import json
 import sys
 from subprocess import Popen, PIPE
+import warnings
 
 from IPython.core.profiledir import ProfileDir
 from IPython.paths import get_ipython_dir
@@ -37,18 +38,9 @@ def get_connection_file(app=None):
 
 
 def find_connection_file(filename='kernel-*.json', profile=None):
-    """find a connection file, and return its absolute path.
-
-    The current working directory and the profile's security
-    directory will be searched for the file if it is not given by
-    absolute path.
-
-    If profile is unspecified, then the current running application's
-    profile will be used, or 'default', if not run from IPython.
-
-    If the argument does not match an existing file, it will be interpreted as a
-    fileglob, and the matching file in the profile's security dir with
-    the latest access time will be used.
+    """DEPRECATED: find a connection file, and return its absolute path.
+    
+    THIS FUNCION IS DEPRECATED. Use juptyer_client.find_connection_file instead.
 
     Parameters
     ----------
@@ -62,6 +54,10 @@ def find_connection_file(filename='kernel-*.json', profile=None):
     -------
     str : The absolute path of the connection file.
     """
+    
+    import warnings
+    warnings.warn("""ipykernel.find_connection_file is deprecated, use jupyter_client.find_connection_file""",
+        DeprecationWarning, stacklevel=2)
     from IPython.core.application import BaseIPythonApplication as IPApp
     try:
         # quick check for absolute path, before going through logic
@@ -85,6 +81,28 @@ def find_connection_file(filename='kernel-*.json', profile=None):
     return jupyter_client.find_connection_file(filename, path=['.', security_dir])
 
 
+def _find_connection_file(connection_file, profile=None):
+    """Return the absolute path for a connection file
+    
+    - If nothing specified, return current Kernel's connection file
+    - If profile specified, show deprecation warning about finding connection files in profiles
+    - Otherwise, call jupyter_client.find_connection_file
+    """
+    if connection_file is None:
+        # get connection file from current kernel
+        return get_connection_file()
+    else:
+        # connection file specified, allow shortnames:
+        if profile is not None:
+            warnings.warn(
+                "Finding connection file by profile is deprecated.",
+                DeprecationWarning, stacklevel=3,
+            )
+            return find_connection_file(connection_file, profile=profile)
+        else:
+            return jupyter_client.find_connection_file(connection_file)
+
+
 def get_connection_info(connection_file=None, unpack=False, profile=None):
     """Return the connection information for the current Kernel.
 
@@ -100,22 +118,14 @@ def get_connection_info(connection_file=None, unpack=False, profile=None):
     unpack : bool [default: False]
         if True, return the unpacked dict, otherwise just the string contents
         of the file.
-    profile : str [optional]
-        The name of the profile to use when searching for the connection file,
-        if different from the current IPython session or 'default'.
-
+    profile : DEPRECATED
 
     Returns
     -------
     The connection dictionary of the current kernel, as string or dict,
     depending on `unpack`.
     """
-    if connection_file is None:
-        # get connection file from current kernel
-        cf = get_connection_file()
-    else:
-        # connection file specified, allow shortnames:
-        cf = find_connection_file(connection_file, profile=profile)
+    cf = _find_connection_file(connection_file, profile)
 
     with open(cf) as f:
         info = f.read()
@@ -144,10 +154,7 @@ def connect_qtconsole(connection_file=None, argv=None, profile=None):
         IPython Kernel will be used, which is only allowed from inside a kernel.
     argv : list [optional]
         Any extra args to be passed to the console.
-    profile : str [optional]
-        The name of the profile to use when searching for the connection file,
-        if different from the current IPython session or 'default'.
-
+    profile : DEPRECATED
 
     Returns
     -------
@@ -155,11 +162,7 @@ def connect_qtconsole(connection_file=None, argv=None, profile=None):
     """
     argv = [] if argv is None else argv
 
-    if connection_file is None:
-        # get connection file from current kernel
-        cf = get_connection_file()
-    else:
-        cf = find_connection_file(connection_file, profile=profile)
+    cf = _find_connection_file(connection_file, profile)
 
     cmd = ';'.join([
         "from IPython.qt.console import qtconsoleapp",
diff --git a/ipykernel/datapub.py b/ipykernel/datapub.py
index 31904d7..0f76571 100644
--- a/ipykernel/datapub.py
+++ b/ipykernel/datapub.py
@@ -1,6 +1,9 @@
 """Publishing native (typically pickled) objects.
 """
 
+import warnings
+warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)
+
 # Copyright (c) IPython Development Team.
 # Distributed under the terms of the Modified BSD License.
 
@@ -53,5 +56,7 @@ def publish_data(data):
     data : dict
         The data to be published. Think of it as a namespace.
     """
+    warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)
+    
     from ipykernel.zmqshell import ZMQInteractiveShell
     ZMQInteractiveShell.instance().data_pub.publish_data(data)
diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py
index 2958082..bc877c9 100644
--- a/ipykernel/inprocess/ipkernel.py
+++ b/ipykernel/inprocess/ipkernel.py
@@ -14,6 +14,7 @@ from ipykernel.ipkernel import IPythonKernel
 from ipykernel.zmqshell import ZMQInteractiveShell
 
 from .socket import DummySocket
+from ..iostream import OutStream, BackgroundSocket, IOPubThread
 
 #-----------------------------------------------------------------------------
 # Main kernel class
@@ -49,13 +50,23 @@ class InProcessKernel(IPythonKernel):
     shell_class = Type(allow_none=True)
     shell_streams = List()
     control_stream = Any()
-    iopub_socket = Instance(DummySocket, ())
+    _underlying_iopub_socket = Instance(DummySocket, ())
+    iopub_thread = Instance(IOPubThread)
+    def _iopub_thread_default(self):
+        thread = IOPubThread(self._underlying_iopub_socket)
+        thread.start()
+        return thread
+    
+    iopub_socket = Instance(BackgroundSocket)
+    def _iopub_socket_default(self):
+        return self.iopub_thread.background_socket
+    
     stdin_socket = Instance(DummySocket, ())
 
     def __init__(self, **traits):
         super(InProcessKernel, self).__init__(**traits)
 
-        self.iopub_socket.on_trait_change(self._io_dispatch, 'message_sent')
+        self._underlying_iopub_socket.on_trait_change(self._io_dispatch, 'message_sent')
         self.shell.kernel = self
 
     def execute_request(self, stream, ident, parent):
@@ -128,12 +139,10 @@ class InProcessKernel(IPythonKernel):
         return InProcessInteractiveShell
 
     def _stdout_default(self):
-        from ipykernel.iostream import OutStream
-        return OutStream(self.session, self.iopub_socket, u'stdout', pipe=False)
+        return OutStream(self.session, self.iopub_thread, u'stdout')
 
     def _stderr_default(self):
-        from ipykernel.iostream import OutStream
-        return OutStream(self.session, self.iopub_socket, u'stderr', pipe=False)
+        return OutStream(self.session, self.iopub_thread, u'stderr')
 
 #-----------------------------------------------------------------------------
 # Interactive shell subclass
diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py
index 1cb3119..8f38623 100644
--- a/ipykernel/iostream.py
+++ b/ipykernel/iostream.py
@@ -1,21 +1,27 @@
+# coding: utf-8
 """Wrappers for forwarding stdout/stderr over zmq"""
 
 # Copyright (c) IPython Development Team.
 # Distributed under the terms of the Modified BSD License.
 
+from __future__ import print_function
+import atexit
 import os
 import threading
-import time
+import sys
 import uuid
+import warnings
 from io import StringIO, UnsupportedOperation
 
 import zmq
 from zmq.eventloop.ioloop import IOLoop
+from zmq.eventloop.zmqstream import ZMQStream
 
 from jupyter_client.session import extract_header
 
 from ipython_genutils import py3compat
 from ipython_genutils.py3compat import unicode_type
+
 from IPython.utils.warn import warn
 
 #-----------------------------------------------------------------------------
@@ -26,166 +32,261 @@ MASTER = 0
 CHILD = 1
 
 #-----------------------------------------------------------------------------
-# Stream classes
+# IO classes
 #-----------------------------------------------------------------------------
 
-class OutStream(object):
-    """A file like object that publishes the stream to a 0MQ PUB socket."""
-
-    # The time interval between automatic flushes, in seconds.
-    _subprocess_flush_limit = 256
-    flush_interval = 0.05
-    topic=None
-
-    def __init__(self, session, pub_socket, name, pipe=True):
-        self.encoding = 'UTF-8'
-        self.session = session
-        self.pub_socket = pub_socket
-        self.name = name
-        self.topic = b'stream.' + py3compat.cast_bytes(name)
-        self.parent_header = {}
-        self._new_buffer()
-        self._buffer_lock = threading.Lock()
+class IOPubThread(object):
+    """An object for sending IOPub messages in a background thread
+    
+    prevents a blocking main thread
+    
+    IOPubThread(pub_socket).background_socket is a Socket-API-providing object
+    whose IO is always run in a thread.
+    """
+
+    def __init__(self, socket, pipe=False):
+        self.socket = socket
+        self.background_socket = BackgroundSocket(self)
         self._master_pid = os.getpid()
-        self._master_thread = threading.current_thread().ident
-        self._pipe_pid = os.getpid()
         self._pipe_flag = pipe
+        self.io_loop = IOLoop()
         if pipe:
             self._setup_pipe_in()
+        self.thread = threading.Thread(target=self._thread_main)
+        self.thread.daemon = True
+
+    def _thread_main(self):
+        """The inner loop that's actually run in a thread"""
+        self.io_loop.start()
+        self.io_loop.close()
 
     def _setup_pipe_in(self):
         """setup listening pipe for subprocesses"""
-        ctx = self.pub_socket.context
+        ctx = self.socket.context
 
         # use UUID to authenticate pipe messages
         self._pipe_uuid = uuid.uuid4().bytes
 
-        self._pipe_in = ctx.socket(zmq.PULL)
-        self._pipe_in.linger = 0
+        pipe_in = ctx.socket(zmq.PULL)
+        pipe_in.linger = 0
         try:
-            self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
+            self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
         except zmq.ZMQError as e:
-            warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
+            warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e +
                 "\nsubprocess output will be unavailable."
             )
             self._pipe_flag = False
-            self._pipe_in.close()
-            del self._pipe_in
+            pipe_in.close()
             return
-        self._pipe_poller = zmq.Poller()
-        self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
-        if IOLoop.initialized():
-            # subprocess flush should trigger flush
-            # if kernel is idle
-            IOLoop.instance().add_handler(self._pipe_in,
-                lambda s, event: self.flush(),
-                IOLoop.READ,
-            )
+        self._pipe_in = ZMQStream(pipe_in, self.io_loop)
+        self._pipe_in.on_recv(self._handle_pipe_msg)
+    
+    def _handle_pipe_msg(self, msg):
+        """handle a pipe message from a subprocess"""
+        if not self._pipe_flag or not self._is_master_process():
+            return
+        if msg[0] != self._pipe_uuid:
+            print("Bad pipe message: %s", msg, file=sys.__stderr__)
+            return
+        self.send_multipart(msg[1:])
 
     def _setup_pipe_out(self):
         # must be new context after fork
         ctx = zmq.Context()
-        self._pipe_pid = os.getpid()
-        self._pipe_out = ctx.socket(zmq.PUSH)
-        self._pipe_out_lock = threading.Lock()
-        self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
+        pipe_out = ctx.socket(zmq.PUSH)
+        pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
+        pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
+        return ctx, pipe_out
 
     def _is_master_process(self):
         return os.getpid() == self._master_pid
 
-    def _is_master_thread(self):
-        return threading.current_thread().ident == self._master_thread
-
-    def _have_pipe_out(self):
-        return os.getpid() == self._pipe_pid
-
     def _check_mp_mode(self):
         """check for forks, and switch to zmq pipeline if necessary"""
         if not self._pipe_flag or self._is_master_process():
             return MASTER
         else:
-            if not self._have_pipe_out():
-                self._flush_buffer()
-                # setup a new out pipe
-                self._setup_pipe_out()
             return CHILD
-
-    def set_parent(self, parent):
-        self.parent_header = extract_header(parent)
-
+    
+    def start(self):
+        """Start the IOPub thread"""
+        self.thread.start()
+        # make sure we don't prevent process exit
+        # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
+        atexit.register(self.stop)
+    
+    def stop(self):
+        """Stop the IOPub thread"""
+        if not self.thread.is_alive():
+            return
+        self.io_loop.add_callback(self.io_loop.stop)
+        self.thread.join()
+    
     def close(self):
-        self.pub_socket = None
+        self.socket.close()
+        self.socket = None
 
     @property
     def closed(self):
-        return self.pub_socket is None
-
-    def _flush_from_subprocesses(self):
-        """flush possible pub data from subprocesses into my buffer"""
-        if not self._pipe_flag or not self._is_master_process():
-            return
-        for i in range(self._subprocess_flush_limit):
-            if self._pipe_poller.poll(0):
-                msg = self._pipe_in.recv_multipart()
-                if msg[0] != self._pipe_uuid:
-                    continue
-                else:
-                    self._buffer.write(msg[1].decode(self.encoding, 'replace'))
-                    # this always means a flush,
-                    # so reset our timer
-                    self._start = 0
-            else:
-                break
+        return self.socket is None
+    
+    def send_multipart(self, *args, **kwargs):
+        """send_multipart schedules actual zmq send in my thread.
+        
+        If my thread isn't running (e.g. forked process), send immediately.
+        """
 
-    def _schedule_flush(self):
-        """schedule a flush in the main thread
+        if self.thread.is_alive():
+            self.io_loop.add_callback(lambda : self._really_send(*args, **kwargs))
+        else:
+            self._really_send(*args, **kwargs)
+    
+    def _really_send(self, msg, *args, **kwargs):
+        """The callback that actually sends messages"""
+        mp_mode = self._check_mp_mode()
 
-        only works with a tornado/pyzmq eventloop running
-        """
-        if IOLoop.initialized():
-            IOLoop.instance().add_callback(self.flush)
+        if mp_mode != CHILD:
+            # we are master, do a regular send
+            self.socket.send_multipart(msg, *args, **kwargs)
+        else:
+            # we are a child, pipe to master
+            # new context/socket for every pipe-out
+            # since forks don't teardown politely, use ctx.term to ensure send has completed
+            ctx, pipe_out = self._setup_pipe_out()
+            pipe_out.send_multipart([self._pipe_uuid] + msg, *args, **kwargs)
+            pipe_out.close()
+            ctx.term()
+
+
+class BackgroundSocket(object):
+    """Wrapper around IOPub thread that provides zmq send[_multipart]"""
+    io_thread = None
+    
+    def __init__(self, io_thread):
+        self.io_thread = io_thread
+    
+    def __getattr__(self, attr):
+        """Wrap socket attr access for backward-compatibility"""
+        if attr.startswith('__') and attr.endswith('__'):
+            # don't wrap magic methods
+            super(BackgroundSocket, self).__getattr__(attr)
+        if hasattr(self.io_thread.socket, attr):
+            warnings.warn("Accessing zmq Socket attribute %s on BackgroundSocket" % attr,
+                DeprecationWarning, stacklevel=2)
+            return getattr(self.io_thread.socket, attr)
+        super(BackgroundSocket, self).__getattr__(attr)
+    
+    def __setattr__(self, attr, value):
+        if attr == 'io_thread' or (attr.startswith('__' and attr.endswith('__'))):
+            super(BackgroundSocket, self).__setattr__(attr, value)
         else:
-            # no async loop, at least force the timer
-            self._start = 0
+            warnings.warn("Setting zmq Socket attribute %s on BackgroundSocket" % attr,
+                DeprecationWarning, stacklevel=2)
+            setattr(self.io_thread.socket, attr, value)
+    
+    def send(self, msg, *args, **kwargs):
+        return self.send_multipart([msg], *args, **kwargs)
 
-    def flush(self):
-        """trigger actual zmq send"""
-        if self.pub_socket is None:
-            raise ValueError(u'I/O operation on closed file')
+    def send_multipart(self, *args, **kwargs):
+        """Schedule send in IO thread"""
+        return self.io_thread.send_multipart(*args, **kwargs)
 
-        mp_mode = self._check_mp_mode()
 
-        if mp_mode != CHILD:
-            # we are master
-            if not self._is_master_thread():
-                # sub-threads must not trigger flush directly,
-                # but at least they can schedule an async flush, or force the timer.
-                self._schedule_flush()
-                return
+class OutStream(object):
+    """A file like object that publishes the stream to a 0MQ PUB socket.
+    
+    Output is handed off to an IO Thread
+    """
 
-            self._flush_from_subprocesses()
-            data = self._flush_buffer()
+    # The time interval between automatic flushes, in seconds.
+    flush_interval = 0.2
+    topic=None
 
-            if data:
-                content = {u'name':self.name, u'text':data}
-                msg = self.session.send(self.pub_socket, u'stream', content=content,
-                                       parent=self.parent_header, ident=self.topic)
+    def __init__(self, session, pub_thread, name, pipe=None):
+        if pipe is not None:
+            warnings.warn("pipe argument to OutStream is deprecated and ignored",
+                DeprecationWarning)
+        self.encoding = 'UTF-8'
+        self.session = session
+        if not isinstance(pub_thread, IOPubThread):
+            # Backward-compat: given socket, not thread. Wrap in a thread.
+            warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread,
+                DeprecationWarning, stacklevel=2)
+            pub_thread = IOPubThread(pub_thread)
+            pub_thread.start()
+        self.pub_thread = pub_thread
+        self.name = name
+        self.topic = b'stream.' + py3compat.cast_bytes(name)
+        self.parent_header = {}
+        self._master_pid = os.getpid()
+        self._flush_lock = threading.Lock()
+        self._flush_timeout = None
+        self._io_loop = pub_thread.io_loop
+        self._new_buffer()
 
-                if hasattr(self.pub_socket, 'flush'):
-                    # socket itself has flush (presumably ZMQStream)
-                    self.pub_socket.flush()
-        else:
-            with self._pipe_out_lock:
-                string = self._flush_buffer()
-                tracker = self._pipe_out.send_multipart([
-                    self._pipe_uuid,
-                    string.encode(self.encoding, 'replace'),
-                ], copy=False, track=True)
-                try:
-                    tracker.wait(1)
-                except:
-                    pass
+    def _is_master_process(self):
+        return os.getpid() == self._master_pid
 
+    def set_parent(self, parent):
+        self.parent_header = extract_header(parent)
+
+    def close(self):
+        self.pub_thread = None
+
+    @property
+    def closed(self):
+        return self.pub_thread is None
+
+    def _schedule_flush(self):
+        """schedule a flush in the IO thread
+        
+        call this on write, to indicate that flush should be called soon.
+        """
+        with self._flush_lock:
+            if self._flush_timeout is not None:
+                return
+            # None indicates there's no flush scheduled.
+            # Use a non-None placeholder to indicate that a flush is scheduled
+            # to avoid races while we wait for _schedule_in_thread below to fire in the io thread.
+            self._flush_timeout = 'placeholder'
+        
+        # add_timeout has to be handed to the io thread with add_callback
+        def _schedule_in_thread():
+            self._flush_timeout = self._io_loop.call_later(self.flush_interval, self._flush)
+        self._io_loop.add_callback(_schedule_in_thread)
+
+    def flush(self):
+        """trigger actual zmq send
+        
+        send will happen in the background thread
+        """
+        if self.pub_thread.thread.is_alive():
+            self._io_loop.add_callback(self._flush)
+            # wait for flush to actually get through:
+            evt = threading.Event()
+            self._io_loop.add_callback(evt.set)
+            evt.wait()
+        else:
+            self._flush()
+    
+    def _flush(self):
+        """This is where the actual send happens.
+        
+        _flush should generally be called in the IO thread,
+        unless the thread has been destroyed (e.g. forked subprocess).
+        """
+        with self._flush_lock:
+            self._flush_timeout = None
+            data = self._flush_buffer()
+        if data:
+            # FIXME: this disables Session's fork-safe check,
+            # since pub_thread is itself fork-safe.
+            # There should be a better way to do this.
+            self.session.pid = os.getpid()
+            content = {u'name':self.name, u'text':data}
+            self.session.send(self.pub_thread, u'stream', content=content,
+                parent=self.parent_header, ident=self.topic)
+    
     def isatty(self):
         return False
 
@@ -205,14 +306,14 @@ class OutStream(object):
         raise UnsupportedOperation("IOStream has no fileno.")
 
     def write(self, string):
-        if self.pub_socket is None:
+        if self.pub_thread is None:
             raise ValueError('I/O operation on closed file')
         else:
             # Make sure that we're handling unicode
             if not isinstance(string, unicode_type):
                 string = string.decode(self.encoding, 'replace')
 
-            is_child = (self._check_mp_mode() == CHILD)
+            is_child = (not self._is_master_process())
             self._buffer.write(string)
             if is_child:
                 # newlines imply flush in subprocesses
@@ -220,16 +321,11 @@ class OutStream(object):
                 # and this helps.
                 if '\n' in string:
                     self.flush()
-            # do we want to check subprocess flushes on write?
-            # self._flush_from_subprocesses()
-            current_time = time.time()
-            if self._start < 0:
-                self._start = current_time
-            elif current_time - self._start > self.flush_interval:
-                self.flush()
+            else:
+                self._schedule_flush()
 
     def writelines(self, sequence):
-        if self.pub_socket is None:
+        if self.pub_thread is None:
             raise ValueError('I/O operation on closed file')
         else:
             for string in sequence:
@@ -239,11 +335,11 @@ class OutStream(object):
         """clear the current buffer and return the current buffer data"""
         data = u''
         if self._buffer is not None:
-            data = self._buffer.getvalue()
-            self._buffer.close()
-        self._new_buffer()
+            buf = self._buffer
+            self._new_buffer()
+            data = buf.getvalue()
+            buf.close()
         return data
 
     def _new_buffer(self):
         self._buffer = StringIO()
-        self._start = -1
diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py
index 8f7414d..72a1c10 100644
--- a/ipykernel/ipkernel.py
+++ b/ipykernel/ipkernel.py
@@ -11,7 +11,6 @@ from traitlets import Instance, Type, Any, List
 
 from .comm import CommManager
 from .kernelbase import Kernel as KernelBase
-from .serialize import serialize_object, unpack_apply_message
 from .zmqshell import ZMQInteractiveShell
 
 
@@ -51,8 +50,6 @@ class IPythonKernel(KernelBase):
         self.shell.displayhook.topic = self._topic('execute_result')
         self.shell.display_pub.session = self.session
         self.shell.display_pub.pub_socket = self.iopub_socket
-        self.shell.data_pub.session = self.session
-        self.shell.data_pub.pub_socket = self.iopub_socket
 
         # TMP - hack while developing
         self.shell._reply_content = None
@@ -124,6 +121,33 @@ class IPythonKernel(KernelBase):
         super(IPythonKernel, self).set_parent(ident, parent)
         self.shell.set_parent(parent)
 
+    def init_metadata(self, parent):
+        """Initialize metadata.
+        
+        Run at the beginning of each execution request.
+        """
+        md = super(IPythonKernel, self).init_metadata(parent)
+        # FIXME: remove deprecated ipyparallel-specific code
+        # This is required for ipyparallel < 5.0
+        md.update({
+            'dependencies_met' : True,
+            'engine' : self.ident,
+        })
+        return md
+    
+    def finish_metadata(self, parent, metadata, reply_content):
+        """Finish populating metadata.
+        
+        Run after completing an execution request.
+        """
+        # FIXME: remove deprecated ipyparallel-specific code
+        # This is required by ipyparallel < 5.0
+        metadata['status'] = reply_content['status']
+        if reply_content['status'] == 'error' and reply_content['ename'] == 'UnmetDependency':
+                metadata['dependencies_met'] = False
+
+        return metadata
+
     def _forward_input(self, allow_stdin=False):
         """Forward raw_input and getpass to the current frontend.
 
@@ -198,10 +222,11 @@ class IPythonKernel(KernelBase):
         # runlines.  We'll need to clean up this logic later.
         if shell._reply_content is not None:
             reply_content.update(shell._reply_content)
-            e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
-            reply_content['engine_info'] = e_info
             # reset after use
             shell._reply_content = None
+            # FIXME: deprecate piece for ipyparallel:
+            e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
+            reply_content['engine_info'] = e_info
 
         if 'traceback' in reply_content:
             self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
@@ -289,6 +314,7 @@ class IPythonKernel(KernelBase):
         return r
 
     def do_apply(self, content, bufs, msg_id, reply_metadata):
+        from .serialize import serialize_object, unpack_apply_message
         shell = self.shell
         try:
             working = shell.user_ns
@@ -328,18 +354,17 @@ class IPythonKernel(KernelBase):
             reply_content = {}
             if shell._reply_content is not None:
                 reply_content.update(shell._reply_content)
-                e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
-                reply_content['engine_info'] = e_info
                 # reset after use
                 shell._reply_content = None
+                
+                # FIXME: deprecate piece for ipyparallel:
+                e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
+                reply_content['engine_info'] = e_info
 
             self.send_response(self.iopub_socket, u'error', reply_content,
                                 ident=self._topic('error'))
             self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
             result_buf = []
-
-            if reply_content['ename'] == 'UnmetDependency':
-                reply_metadata['dependencies_met'] = False
         else:
             reply_content = {'status' : 'ok'}
 
diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py
index 1fd958f..8785e10 100644
--- a/ipykernel/kernelapp.py
+++ b/ipykernel/kernelapp.py
@@ -33,6 +33,7 @@ from jupyter_client import write_connection_file
 from jupyter_client.connect import ConnectionFileMixin
 
 # local imports
+from .iostream import IOPubThread
 from .heartbeat import Heartbeat
 from .ipkernel import IPythonKernel
 from .parentpoller import ParentPollerUnix, ParentPollerWindows
@@ -231,11 +232,6 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
         self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
         self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port)
 
-        self.iopub_socket = context.socket(zmq.PUB)
-        self.iopub_socket.linger = 1000
-        self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
-        self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port)
-
         self.stdin_socket = context.socket(zmq.ROUTER)
         self.stdin_socket.linger = 1000
         self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
@@ -245,6 +241,19 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
         self.control_socket.linger = 1000
         self.control_port = self._bind_socket(self.control_socket, self.control_port)
         self.log.debug("control ROUTER Channel on port: %i" % self.control_port)
+        
+        self.init_iopub(context)
+
+    def init_iopub(self, context):
+        self.iopub_socket = context.socket(zmq.PUB)
+        self.iopub_socket.linger = 1000
+        self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
+        self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port)
+        self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True)
+        self.iopub_thread.start()
+        # backward-compat: wrap iopub socket API in background thread
+        self.iopub_socket = self.iopub_thread.background_socket
+        
 
     def init_heartbeat(self):
         """start the heart beating"""
@@ -298,11 +307,38 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
         """Redirect input streams and set a display hook."""
         if self.outstream_class:
             outstream_factory = import_item(str(self.outstream_class))
-            sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
-            sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
+            sys.stdout = outstream_factory(self.session, self.iopub_thread, u'stdout')
+            sys.stderr = outstream_factory(self.session, self.iopub_thread, u'stderr')
         if self.displayhook_class:
             displayhook_factory = import_item(str(self.displayhook_class))
             sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
+        
+        self.patch_io()
+    
+    def patch_io(self):
+        """Patch important libraries that can't handle sys.stdout forwarding"""
+        try:
+            import faulthandler
+        except ImportError:
+            pass
+        else:
+            # Warning: this is a monkeypatch of `faulthandler.enable`, watch for possible
+            # updates to the upstream API and update accordingly (up-to-date as of Python 3.5):
+            # https://docs.python.org/3/library/faulthandler.html#faulthandler.enable
+
+            # change default file to __stderr__ from forwarded stderr
+            faulthandler_enable = faulthandler.enable
+            def enable(file=sys.__stderr__, all_threads=True, **kwargs):
+                return faulthandler_enable(file=file, all_threads=all_threads, **kwargs)
+
+            faulthandler.enable = enable
+
+            if hasattr(faulthandler, 'register'):
+                faulthandler_register = faulthandler.register
+                def register(signum, file=sys.__stderr__, all_threads=True, chain=False, **kwargs):
+                    return faulthandler_register(signum, file=file, all_threads=all_threads,
+                                                 chain=chain, **kwargs)
+                faulthandler.register = register
 
     def init_signal(self):
         signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -316,6 +352,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
 
         kernel = kernel_factory(parent=self, session=self.session,
                                 shell_streams=[shell_stream, control_stream],
+                                iopub_thread=self.iopub_thread,
                                 iopub_socket=self.iopub_socket,
                                 stdin_socket=self.stdin_socket,
                                 log=self.log,
diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py
index 20e56c9..e8eb71f 100644
--- a/ipykernel/kernelbase.py
+++ b/ipykernel/kernelbase.py
@@ -49,8 +49,9 @@ class Kernel(SingletonConfigurable):
     profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
     shell_streams = List()
     control_stream = Instance(ZMQStream, allow_none=True)
-    iopub_socket = Instance(zmq.Socket, allow_none=True)
-    stdin_socket = Instance(zmq.Socket, allow_none=True)
+    iopub_socket = Any()
+    iopub_thread = Any()
+    stdin_socket = Any()
     log = Instance(logging.Logger, allow_none=True)
 
     # identities:
@@ -111,25 +112,29 @@ class Kernel(SingletonConfigurable):
     # Track execution count here. For IPython, we override this to use the
     # execution count we store in the shell.
     execution_count = 0
-
+    
+    msg_types = [
+        'execute_request', 'complete_request',
+        'inspect_request', 'history_request',
+        'comm_info_request', 'kernel_info_request',
+        'connect_request', 'shutdown_request',
+        'is_complete_request',
+        # deprecated:
+        'apply_request',
+    ]
+    # add deprecated ipyparallel control messages
+    control_msg_types = msg_types + ['clear_request', 'abort_request']
 
     def __init__(self, **kwargs):
         super(Kernel, self).__init__(**kwargs)
 
         # Build dict of handlers for message types
-        msg_types = [ 'execute_request', 'complete_request',
-                      'inspect_request', 'history_request',
-                      'comm_info_request', 'kernel_info_request',
-                      'connect_request', 'shutdown_request',
-                      'apply_request', 'is_complete_request',
-                    ]
         self.shell_handlers = {}
-        for msg_type in msg_types:
+        for msg_type in self.msg_types:
             self.shell_handlers[msg_type] = getattr(self, msg_type)
 
... 502 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/ipykernel.git



More information about the Python-modules-commits mailing list