[Python-modules-commits] [ipykernel] 04/15: Import ipykernel_4.3.1.orig.tar.gz
Julien Cristau
jcristau at moszumanska.debian.org
Fri Apr 8 16:08:32 UTC 2016
This is an automated email from the git hooks/post-receive script.
jcristau pushed a commit to branch master
in repository ipykernel.
commit dad8c3435089bb8700d9cb387546811501adf414
Author: David Douard <david.douard at logilab.fr>
Date: Fri Apr 8 14:06:41 2016 +0200
Import ipykernel_4.3.1.orig.tar.gz
---
PKG-INFO | 2 +-
docs/changelog.rst | 24 +++
ipykernel/_version.py | 2 +-
ipykernel/codeutil.py | 3 +
ipykernel/connect.py | 65 ++++----
ipykernel/datapub.py | 5 +
ipykernel/inprocess/ipkernel.py | 21 ++-
ipykernel/iostream.py | 352 +++++++++++++++++++++++++---------------
ipykernel/ipkernel.py | 45 +++--
ipykernel/kernelapp.py | 51 +++++-
ipykernel/kernelbase.py | 124 +++++++-------
ipykernel/kernelspec.py | 2 +-
ipykernel/log.py | 3 +
ipykernel/pickleutil.py | 3 +
ipykernel/serialize.py | 3 +
ipykernel/tests/test_connect.py | 8 +-
ipykernel/tests/test_kernel.py | 61 +++++--
ipykernel/zmqshell.py | 30 +++-
setup.py | 1 +
19 files changed, 540 insertions(+), 265 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 65a95fa..bec1061 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: ipykernel
-Version: 4.2.2
+Version: 4.3.1
Summary: IPython Kernel for Jupyter
Home-page: http://ipython.org
Author: IPython Development Team
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 91fbaf6..03ba591 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -1,6 +1,30 @@
Changes in IPython kernel
=========================
+4.3
+---
+
+4.3.1
+*****
+
+- Fix Windows Python 3.5 incompatibility caused by faulthandler patch in 4.3
+
+4.3.0
+*****
+
+`4.3.0 on GitHub <https://github.com/ipython/ipykernel/milestones/4.3>`__
+
+- Publish all IO in a thread, via :class:`IOPubThread`.
+ This solves the problem of requiring :meth:`sys.stdout.flush` to be called in the notebook to produce output promptly during long-running cells.
+- Remove refrences to outdated IPython guiref in kernel banner.
+- Patch faulthandler to use ``sys.__stderr__`` instead of forwarded ``sys.stderr``,
+ which has no fileno when forwarded.
+- Deprecate some vestiges of the Big Split:
+ - :func:`ipykernel.find_connection_file` is deprecated. Use :func:`jupyter_client.find_connection_file` instead.
+ - Various pieces of code specific to IPython parallel are deprecated in ipykernel
+ and moved to ipyparallel.
+
+
4.2
---
diff --git a/ipykernel/_version.py b/ipykernel/_version.py
index b0e4d75..194c680 100644
--- a/ipykernel/_version.py
+++ b/ipykernel/_version.py
@@ -1,4 +1,4 @@
-version_info = (4, 2, 2)
+version_info = (4, 3, 1)
__version__ = '.'.join(map(str, version_info))
kernel_protocol_version_info = (5, 0)
diff --git a/ipykernel/codeutil.py b/ipykernel/codeutil.py
index 887bd90..1d88b11 100644
--- a/ipykernel/codeutil.py
+++ b/ipykernel/codeutil.py
@@ -13,6 +13,9 @@ Reference: A. Tremols, P Cogolo, "Python Cookbook," p 302-305
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
+import warnings
+warnings.warn("ipykernel.codeutil is deprecated. It has moved to ipyparallel.serialize", DeprecationWarning)
+
import sys
import types
try:
diff --git a/ipykernel/connect.py b/ipykernel/connect.py
index beadd9f..e49ad10 100644
--- a/ipykernel/connect.py
+++ b/ipykernel/connect.py
@@ -8,6 +8,7 @@ from __future__ import absolute_import
import json
import sys
from subprocess import Popen, PIPE
+import warnings
from IPython.core.profiledir import ProfileDir
from IPython.paths import get_ipython_dir
@@ -37,18 +38,9 @@ def get_connection_file(app=None):
def find_connection_file(filename='kernel-*.json', profile=None):
- """find a connection file, and return its absolute path.
-
- The current working directory and the profile's security
- directory will be searched for the file if it is not given by
- absolute path.
-
- If profile is unspecified, then the current running application's
- profile will be used, or 'default', if not run from IPython.
-
- If the argument does not match an existing file, it will be interpreted as a
- fileglob, and the matching file in the profile's security dir with
- the latest access time will be used.
+ """DEPRECATED: find a connection file, and return its absolute path.
+
+ THIS FUNCION IS DEPRECATED. Use juptyer_client.find_connection_file instead.
Parameters
----------
@@ -62,6 +54,10 @@ def find_connection_file(filename='kernel-*.json', profile=None):
-------
str : The absolute path of the connection file.
"""
+
+ import warnings
+ warnings.warn("""ipykernel.find_connection_file is deprecated, use jupyter_client.find_connection_file""",
+ DeprecationWarning, stacklevel=2)
from IPython.core.application import BaseIPythonApplication as IPApp
try:
# quick check for absolute path, before going through logic
@@ -85,6 +81,28 @@ def find_connection_file(filename='kernel-*.json', profile=None):
return jupyter_client.find_connection_file(filename, path=['.', security_dir])
+def _find_connection_file(connection_file, profile=None):
+ """Return the absolute path for a connection file
+
+ - If nothing specified, return current Kernel's connection file
+ - If profile specified, show deprecation warning about finding connection files in profiles
+ - Otherwise, call jupyter_client.find_connection_file
+ """
+ if connection_file is None:
+ # get connection file from current kernel
+ return get_connection_file()
+ else:
+ # connection file specified, allow shortnames:
+ if profile is not None:
+ warnings.warn(
+ "Finding connection file by profile is deprecated.",
+ DeprecationWarning, stacklevel=3,
+ )
+ return find_connection_file(connection_file, profile=profile)
+ else:
+ return jupyter_client.find_connection_file(connection_file)
+
+
def get_connection_info(connection_file=None, unpack=False, profile=None):
"""Return the connection information for the current Kernel.
@@ -100,22 +118,14 @@ def get_connection_info(connection_file=None, unpack=False, profile=None):
unpack : bool [default: False]
if True, return the unpacked dict, otherwise just the string contents
of the file.
- profile : str [optional]
- The name of the profile to use when searching for the connection file,
- if different from the current IPython session or 'default'.
-
+ profile : DEPRECATED
Returns
-------
The connection dictionary of the current kernel, as string or dict,
depending on `unpack`.
"""
- if connection_file is None:
- # get connection file from current kernel
- cf = get_connection_file()
- else:
- # connection file specified, allow shortnames:
- cf = find_connection_file(connection_file, profile=profile)
+ cf = _find_connection_file(connection_file, profile)
with open(cf) as f:
info = f.read()
@@ -144,10 +154,7 @@ def connect_qtconsole(connection_file=None, argv=None, profile=None):
IPython Kernel will be used, which is only allowed from inside a kernel.
argv : list [optional]
Any extra args to be passed to the console.
- profile : str [optional]
- The name of the profile to use when searching for the connection file,
- if different from the current IPython session or 'default'.
-
+ profile : DEPRECATED
Returns
-------
@@ -155,11 +162,7 @@ def connect_qtconsole(connection_file=None, argv=None, profile=None):
"""
argv = [] if argv is None else argv
- if connection_file is None:
- # get connection file from current kernel
- cf = get_connection_file()
- else:
- cf = find_connection_file(connection_file, profile=profile)
+ cf = _find_connection_file(connection_file, profile)
cmd = ';'.join([
"from IPython.qt.console import qtconsoleapp",
diff --git a/ipykernel/datapub.py b/ipykernel/datapub.py
index 31904d7..0f76571 100644
--- a/ipykernel/datapub.py
+++ b/ipykernel/datapub.py
@@ -1,6 +1,9 @@
"""Publishing native (typically pickled) objects.
"""
+import warnings
+warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)
+
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
@@ -53,5 +56,7 @@ def publish_data(data):
data : dict
The data to be published. Think of it as a namespace.
"""
+ warnings.warn("ipykernel.datapub is deprecated. It has moved to ipyparallel.datapub", DeprecationWarning)
+
from ipykernel.zmqshell import ZMQInteractiveShell
ZMQInteractiveShell.instance().data_pub.publish_data(data)
diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py
index 2958082..bc877c9 100644
--- a/ipykernel/inprocess/ipkernel.py
+++ b/ipykernel/inprocess/ipkernel.py
@@ -14,6 +14,7 @@ from ipykernel.ipkernel import IPythonKernel
from ipykernel.zmqshell import ZMQInteractiveShell
from .socket import DummySocket
+from ..iostream import OutStream, BackgroundSocket, IOPubThread
#-----------------------------------------------------------------------------
# Main kernel class
@@ -49,13 +50,23 @@ class InProcessKernel(IPythonKernel):
shell_class = Type(allow_none=True)
shell_streams = List()
control_stream = Any()
- iopub_socket = Instance(DummySocket, ())
+ _underlying_iopub_socket = Instance(DummySocket, ())
+ iopub_thread = Instance(IOPubThread)
+ def _iopub_thread_default(self):
+ thread = IOPubThread(self._underlying_iopub_socket)
+ thread.start()
+ return thread
+
+ iopub_socket = Instance(BackgroundSocket)
+ def _iopub_socket_default(self):
+ return self.iopub_thread.background_socket
+
stdin_socket = Instance(DummySocket, ())
def __init__(self, **traits):
super(InProcessKernel, self).__init__(**traits)
- self.iopub_socket.on_trait_change(self._io_dispatch, 'message_sent')
+ self._underlying_iopub_socket.on_trait_change(self._io_dispatch, 'message_sent')
self.shell.kernel = self
def execute_request(self, stream, ident, parent):
@@ -128,12 +139,10 @@ class InProcessKernel(IPythonKernel):
return InProcessInteractiveShell
def _stdout_default(self):
- from ipykernel.iostream import OutStream
- return OutStream(self.session, self.iopub_socket, u'stdout', pipe=False)
+ return OutStream(self.session, self.iopub_thread, u'stdout')
def _stderr_default(self):
- from ipykernel.iostream import OutStream
- return OutStream(self.session, self.iopub_socket, u'stderr', pipe=False)
+ return OutStream(self.session, self.iopub_thread, u'stderr')
#-----------------------------------------------------------------------------
# Interactive shell subclass
diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py
index 1cb3119..8f38623 100644
--- a/ipykernel/iostream.py
+++ b/ipykernel/iostream.py
@@ -1,21 +1,27 @@
+# coding: utf-8
"""Wrappers for forwarding stdout/stderr over zmq"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
+from __future__ import print_function
+import atexit
import os
import threading
-import time
+import sys
import uuid
+import warnings
from io import StringIO, UnsupportedOperation
import zmq
from zmq.eventloop.ioloop import IOLoop
+from zmq.eventloop.zmqstream import ZMQStream
from jupyter_client.session import extract_header
from ipython_genutils import py3compat
from ipython_genutils.py3compat import unicode_type
+
from IPython.utils.warn import warn
#-----------------------------------------------------------------------------
@@ -26,166 +32,261 @@ MASTER = 0
CHILD = 1
#-----------------------------------------------------------------------------
-# Stream classes
+# IO classes
#-----------------------------------------------------------------------------
-class OutStream(object):
- """A file like object that publishes the stream to a 0MQ PUB socket."""
-
- # The time interval between automatic flushes, in seconds.
- _subprocess_flush_limit = 256
- flush_interval = 0.05
- topic=None
-
- def __init__(self, session, pub_socket, name, pipe=True):
- self.encoding = 'UTF-8'
- self.session = session
- self.pub_socket = pub_socket
- self.name = name
- self.topic = b'stream.' + py3compat.cast_bytes(name)
- self.parent_header = {}
- self._new_buffer()
- self._buffer_lock = threading.Lock()
+class IOPubThread(object):
+ """An object for sending IOPub messages in a background thread
+
+ prevents a blocking main thread
+
+ IOPubThread(pub_socket).background_socket is a Socket-API-providing object
+ whose IO is always run in a thread.
+ """
+
+ def __init__(self, socket, pipe=False):
+ self.socket = socket
+ self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
- self._master_thread = threading.current_thread().ident
- self._pipe_pid = os.getpid()
self._pipe_flag = pipe
+ self.io_loop = IOLoop()
if pipe:
self._setup_pipe_in()
+ self.thread = threading.Thread(target=self._thread_main)
+ self.thread.daemon = True
+
+ def _thread_main(self):
+ """The inner loop that's actually run in a thread"""
+ self.io_loop.start()
+ self.io_loop.close()
def _setup_pipe_in(self):
"""setup listening pipe for subprocesses"""
- ctx = self.pub_socket.context
+ ctx = self.socket.context
# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes
- self._pipe_in = ctx.socket(zmq.PULL)
- self._pipe_in.linger = 0
+ pipe_in = ctx.socket(zmq.PULL)
+ pipe_in.linger = 0
try:
- self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
+ self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
except zmq.ZMQError as e:
- warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
+ warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e +
"\nsubprocess output will be unavailable."
)
self._pipe_flag = False
- self._pipe_in.close()
- del self._pipe_in
+ pipe_in.close()
return
- self._pipe_poller = zmq.Poller()
- self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
- if IOLoop.initialized():
- # subprocess flush should trigger flush
- # if kernel is idle
- IOLoop.instance().add_handler(self._pipe_in,
- lambda s, event: self.flush(),
- IOLoop.READ,
- )
+ self._pipe_in = ZMQStream(pipe_in, self.io_loop)
+ self._pipe_in.on_recv(self._handle_pipe_msg)
+
+ def _handle_pipe_msg(self, msg):
+ """handle a pipe message from a subprocess"""
+ if not self._pipe_flag or not self._is_master_process():
+ return
+ if msg[0] != self._pipe_uuid:
+ print("Bad pipe message: %s", msg, file=sys.__stderr__)
+ return
+ self.send_multipart(msg[1:])
def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
- self._pipe_pid = os.getpid()
- self._pipe_out = ctx.socket(zmq.PUSH)
- self._pipe_out_lock = threading.Lock()
- self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
+ pipe_out = ctx.socket(zmq.PUSH)
+ pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
+ pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
+ return ctx, pipe_out
def _is_master_process(self):
return os.getpid() == self._master_pid
- def _is_master_thread(self):
- return threading.current_thread().ident == self._master_thread
-
- def _have_pipe_out(self):
- return os.getpid() == self._pipe_pid
-
def _check_mp_mode(self):
"""check for forks, and switch to zmq pipeline if necessary"""
if not self._pipe_flag or self._is_master_process():
return MASTER
else:
- if not self._have_pipe_out():
- self._flush_buffer()
- # setup a new out pipe
- self._setup_pipe_out()
return CHILD
-
- def set_parent(self, parent):
- self.parent_header = extract_header(parent)
-
+
+ def start(self):
+ """Start the IOPub thread"""
+ self.thread.start()
+ # make sure we don't prevent process exit
+ # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
+ atexit.register(self.stop)
+
+ def stop(self):
+ """Stop the IOPub thread"""
+ if not self.thread.is_alive():
+ return
+ self.io_loop.add_callback(self.io_loop.stop)
+ self.thread.join()
+
def close(self):
- self.pub_socket = None
+ self.socket.close()
+ self.socket = None
@property
def closed(self):
- return self.pub_socket is None
-
- def _flush_from_subprocesses(self):
- """flush possible pub data from subprocesses into my buffer"""
- if not self._pipe_flag or not self._is_master_process():
- return
- for i in range(self._subprocess_flush_limit):
- if self._pipe_poller.poll(0):
- msg = self._pipe_in.recv_multipart()
- if msg[0] != self._pipe_uuid:
- continue
- else:
- self._buffer.write(msg[1].decode(self.encoding, 'replace'))
- # this always means a flush,
- # so reset our timer
- self._start = 0
- else:
- break
+ return self.socket is None
+
+ def send_multipart(self, *args, **kwargs):
+ """send_multipart schedules actual zmq send in my thread.
+
+ If my thread isn't running (e.g. forked process), send immediately.
+ """
- def _schedule_flush(self):
- """schedule a flush in the main thread
+ if self.thread.is_alive():
+ self.io_loop.add_callback(lambda : self._really_send(*args, **kwargs))
+ else:
+ self._really_send(*args, **kwargs)
+
+ def _really_send(self, msg, *args, **kwargs):
+ """The callback that actually sends messages"""
+ mp_mode = self._check_mp_mode()
- only works with a tornado/pyzmq eventloop running
- """
- if IOLoop.initialized():
- IOLoop.instance().add_callback(self.flush)
+ if mp_mode != CHILD:
+ # we are master, do a regular send
+ self.socket.send_multipart(msg, *args, **kwargs)
+ else:
+ # we are a child, pipe to master
+ # new context/socket for every pipe-out
+ # since forks don't teardown politely, use ctx.term to ensure send has completed
+ ctx, pipe_out = self._setup_pipe_out()
+ pipe_out.send_multipart([self._pipe_uuid] + msg, *args, **kwargs)
+ pipe_out.close()
+ ctx.term()
+
+
+class BackgroundSocket(object):
+ """Wrapper around IOPub thread that provides zmq send[_multipart]"""
+ io_thread = None
+
+ def __init__(self, io_thread):
+ self.io_thread = io_thread
+
+ def __getattr__(self, attr):
+ """Wrap socket attr access for backward-compatibility"""
+ if attr.startswith('__') and attr.endswith('__'):
+ # don't wrap magic methods
+ super(BackgroundSocket, self).__getattr__(attr)
+ if hasattr(self.io_thread.socket, attr):
+ warnings.warn("Accessing zmq Socket attribute %s on BackgroundSocket" % attr,
+ DeprecationWarning, stacklevel=2)
+ return getattr(self.io_thread.socket, attr)
+ super(BackgroundSocket, self).__getattr__(attr)
+
+ def __setattr__(self, attr, value):
+ if attr == 'io_thread' or (attr.startswith('__' and attr.endswith('__'))):
+ super(BackgroundSocket, self).__setattr__(attr, value)
else:
- # no async loop, at least force the timer
- self._start = 0
+ warnings.warn("Setting zmq Socket attribute %s on BackgroundSocket" % attr,
+ DeprecationWarning, stacklevel=2)
+ setattr(self.io_thread.socket, attr, value)
+
+ def send(self, msg, *args, **kwargs):
+ return self.send_multipart([msg], *args, **kwargs)
- def flush(self):
- """trigger actual zmq send"""
- if self.pub_socket is None:
- raise ValueError(u'I/O operation on closed file')
+ def send_multipart(self, *args, **kwargs):
+ """Schedule send in IO thread"""
+ return self.io_thread.send_multipart(*args, **kwargs)
- mp_mode = self._check_mp_mode()
- if mp_mode != CHILD:
- # we are master
- if not self._is_master_thread():
- # sub-threads must not trigger flush directly,
- # but at least they can schedule an async flush, or force the timer.
- self._schedule_flush()
- return
+class OutStream(object):
+ """A file like object that publishes the stream to a 0MQ PUB socket.
+
+ Output is handed off to an IO Thread
+ """
- self._flush_from_subprocesses()
- data = self._flush_buffer()
+ # The time interval between automatic flushes, in seconds.
+ flush_interval = 0.2
+ topic=None
- if data:
- content = {u'name':self.name, u'text':data}
- msg = self.session.send(self.pub_socket, u'stream', content=content,
- parent=self.parent_header, ident=self.topic)
+ def __init__(self, session, pub_thread, name, pipe=None):
+ if pipe is not None:
+ warnings.warn("pipe argument to OutStream is deprecated and ignored",
+ DeprecationWarning)
+ self.encoding = 'UTF-8'
+ self.session = session
+ if not isinstance(pub_thread, IOPubThread):
+ # Backward-compat: given socket, not thread. Wrap in a thread.
+ warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread,
+ DeprecationWarning, stacklevel=2)
+ pub_thread = IOPubThread(pub_thread)
+ pub_thread.start()
+ self.pub_thread = pub_thread
+ self.name = name
+ self.topic = b'stream.' + py3compat.cast_bytes(name)
+ self.parent_header = {}
+ self._master_pid = os.getpid()
+ self._flush_lock = threading.Lock()
+ self._flush_timeout = None
+ self._io_loop = pub_thread.io_loop
+ self._new_buffer()
- if hasattr(self.pub_socket, 'flush'):
- # socket itself has flush (presumably ZMQStream)
- self.pub_socket.flush()
- else:
- with self._pipe_out_lock:
- string = self._flush_buffer()
- tracker = self._pipe_out.send_multipart([
- self._pipe_uuid,
- string.encode(self.encoding, 'replace'),
- ], copy=False, track=True)
- try:
- tracker.wait(1)
- except:
- pass
+ def _is_master_process(self):
+ return os.getpid() == self._master_pid
+ def set_parent(self, parent):
+ self.parent_header = extract_header(parent)
+
+ def close(self):
+ self.pub_thread = None
+
+ @property
+ def closed(self):
+ return self.pub_thread is None
+
+ def _schedule_flush(self):
+ """schedule a flush in the IO thread
+
+ call this on write, to indicate that flush should be called soon.
+ """
+ with self._flush_lock:
+ if self._flush_timeout is not None:
+ return
+ # None indicates there's no flush scheduled.
+ # Use a non-None placeholder to indicate that a flush is scheduled
+ # to avoid races while we wait for _schedule_in_thread below to fire in the io thread.
+ self._flush_timeout = 'placeholder'
+
+ # add_timeout has to be handed to the io thread with add_callback
+ def _schedule_in_thread():
+ self._flush_timeout = self._io_loop.call_later(self.flush_interval, self._flush)
+ self._io_loop.add_callback(_schedule_in_thread)
+
+ def flush(self):
+ """trigger actual zmq send
+
+ send will happen in the background thread
+ """
+ if self.pub_thread.thread.is_alive():
+ self._io_loop.add_callback(self._flush)
+ # wait for flush to actually get through:
+ evt = threading.Event()
+ self._io_loop.add_callback(evt.set)
+ evt.wait()
+ else:
+ self._flush()
+
+ def _flush(self):
+ """This is where the actual send happens.
+
+ _flush should generally be called in the IO thread,
+ unless the thread has been destroyed (e.g. forked subprocess).
+ """
+ with self._flush_lock:
+ self._flush_timeout = None
+ data = self._flush_buffer()
+ if data:
+ # FIXME: this disables Session's fork-safe check,
+ # since pub_thread is itself fork-safe.
+ # There should be a better way to do this.
+ self.session.pid = os.getpid()
+ content = {u'name':self.name, u'text':data}
+ self.session.send(self.pub_thread, u'stream', content=content,
+ parent=self.parent_header, ident=self.topic)
+
def isatty(self):
return False
@@ -205,14 +306,14 @@ class OutStream(object):
raise UnsupportedOperation("IOStream has no fileno.")
def write(self, string):
- if self.pub_socket is None:
+ if self.pub_thread is None:
raise ValueError('I/O operation on closed file')
else:
# Make sure that we're handling unicode
if not isinstance(string, unicode_type):
string = string.decode(self.encoding, 'replace')
- is_child = (self._check_mp_mode() == CHILD)
+ is_child = (not self._is_master_process())
self._buffer.write(string)
if is_child:
# newlines imply flush in subprocesses
@@ -220,16 +321,11 @@ class OutStream(object):
# and this helps.
if '\n' in string:
self.flush()
- # do we want to check subprocess flushes on write?
- # self._flush_from_subprocesses()
- current_time = time.time()
- if self._start < 0:
- self._start = current_time
- elif current_time - self._start > self.flush_interval:
- self.flush()
+ else:
+ self._schedule_flush()
def writelines(self, sequence):
- if self.pub_socket is None:
+ if self.pub_thread is None:
raise ValueError('I/O operation on closed file')
else:
for string in sequence:
@@ -239,11 +335,11 @@ class OutStream(object):
"""clear the current buffer and return the current buffer data"""
data = u''
if self._buffer is not None:
- data = self._buffer.getvalue()
- self._buffer.close()
- self._new_buffer()
+ buf = self._buffer
+ self._new_buffer()
+ data = buf.getvalue()
+ buf.close()
return data
def _new_buffer(self):
self._buffer = StringIO()
- self._start = -1
diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py
index 8f7414d..72a1c10 100644
--- a/ipykernel/ipkernel.py
+++ b/ipykernel/ipkernel.py
@@ -11,7 +11,6 @@ from traitlets import Instance, Type, Any, List
from .comm import CommManager
from .kernelbase import Kernel as KernelBase
-from .serialize import serialize_object, unpack_apply_message
from .zmqshell import ZMQInteractiveShell
@@ -51,8 +50,6 @@ class IPythonKernel(KernelBase):
self.shell.displayhook.topic = self._topic('execute_result')
self.shell.display_pub.session = self.session
self.shell.display_pub.pub_socket = self.iopub_socket
- self.shell.data_pub.session = self.session
- self.shell.data_pub.pub_socket = self.iopub_socket
# TMP - hack while developing
self.shell._reply_content = None
@@ -124,6 +121,33 @@ class IPythonKernel(KernelBase):
super(IPythonKernel, self).set_parent(ident, parent)
self.shell.set_parent(parent)
+ def init_metadata(self, parent):
+ """Initialize metadata.
+
+ Run at the beginning of each execution request.
+ """
+ md = super(IPythonKernel, self).init_metadata(parent)
+ # FIXME: remove deprecated ipyparallel-specific code
+ # This is required for ipyparallel < 5.0
+ md.update({
+ 'dependencies_met' : True,
+ 'engine' : self.ident,
+ })
+ return md
+
+ def finish_metadata(self, parent, metadata, reply_content):
+ """Finish populating metadata.
+
+ Run after completing an execution request.
+ """
+ # FIXME: remove deprecated ipyparallel-specific code
+ # This is required by ipyparallel < 5.0
+ metadata['status'] = reply_content['status']
+ if reply_content['status'] == 'error' and reply_content['ename'] == 'UnmetDependency':
+ metadata['dependencies_met'] = False
+
+ return metadata
+
def _forward_input(self, allow_stdin=False):
"""Forward raw_input and getpass to the current frontend.
@@ -198,10 +222,11 @@ class IPythonKernel(KernelBase):
# runlines. We'll need to clean up this logic later.
if shell._reply_content is not None:
reply_content.update(shell._reply_content)
- e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
- reply_content['engine_info'] = e_info
# reset after use
shell._reply_content = None
+ # FIXME: deprecate piece for ipyparallel:
+ e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
+ reply_content['engine_info'] = e_info
if 'traceback' in reply_content:
self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
@@ -289,6 +314,7 @@ class IPythonKernel(KernelBase):
return r
def do_apply(self, content, bufs, msg_id, reply_metadata):
+ from .serialize import serialize_object, unpack_apply_message
shell = self.shell
try:
working = shell.user_ns
@@ -328,18 +354,17 @@ class IPythonKernel(KernelBase):
reply_content = {}
if shell._reply_content is not None:
reply_content.update(shell._reply_content)
- e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
- reply_content['engine_info'] = e_info
# reset after use
shell._reply_content = None
+
+ # FIXME: deprecate piece for ipyparallel:
+ e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
+ reply_content['engine_info'] = e_info
self.send_response(self.iopub_socket, u'error', reply_content,
ident=self._topic('error'))
self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
result_buf = []
-
- if reply_content['ename'] == 'UnmetDependency':
- reply_metadata['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}
diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py
index 1fd958f..8785e10 100644
--- a/ipykernel/kernelapp.py
+++ b/ipykernel/kernelapp.py
@@ -33,6 +33,7 @@ from jupyter_client import write_connection_file
from jupyter_client.connect import ConnectionFileMixin
# local imports
+from .iostream import IOPubThread
from .heartbeat import Heartbeat
from .ipkernel import IPythonKernel
from .parentpoller import ParentPollerUnix, ParentPollerWindows
@@ -231,11 +232,6 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port)
- self.iopub_socket = context.socket(zmq.PUB)
- self.iopub_socket.linger = 1000
- self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
- self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port)
-
self.stdin_socket = context.socket(zmq.ROUTER)
self.stdin_socket.linger = 1000
self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
@@ -245,6 +241,19 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
self.control_socket.linger = 1000
self.control_port = self._bind_socket(self.control_socket, self.control_port)
self.log.debug("control ROUTER Channel on port: %i" % self.control_port)
+
+ self.init_iopub(context)
+
+ def init_iopub(self, context):
+ self.iopub_socket = context.socket(zmq.PUB)
+ self.iopub_socket.linger = 1000
+ self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
+ self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port)
+ self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True)
+ self.iopub_thread.start()
+ # backward-compat: wrap iopub socket API in background thread
+ self.iopub_socket = self.iopub_thread.background_socket
+
def init_heartbeat(self):
"""start the heart beating"""
@@ -298,11 +307,38 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
"""Redirect input streams and set a display hook."""
if self.outstream_class:
outstream_factory = import_item(str(self.outstream_class))
- sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
- sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
+ sys.stdout = outstream_factory(self.session, self.iopub_thread, u'stdout')
+ sys.stderr = outstream_factory(self.session, self.iopub_thread, u'stderr')
if self.displayhook_class:
displayhook_factory = import_item(str(self.displayhook_class))
sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
+
+ self.patch_io()
+
+ def patch_io(self):
+ """Patch important libraries that can't handle sys.stdout forwarding"""
+ try:
+ import faulthandler
+ except ImportError:
+ pass
+ else:
+ # Warning: this is a monkeypatch of `faulthandler.enable`, watch for possible
+ # updates to the upstream API and update accordingly (up-to-date as of Python 3.5):
+ # https://docs.python.org/3/library/faulthandler.html#faulthandler.enable
+
+ # change default file to __stderr__ from forwarded stderr
+ faulthandler_enable = faulthandler.enable
+ def enable(file=sys.__stderr__, all_threads=True, **kwargs):
+ return faulthandler_enable(file=file, all_threads=all_threads, **kwargs)
+
+ faulthandler.enable = enable
+
+ if hasattr(faulthandler, 'register'):
+ faulthandler_register = faulthandler.register
+ def register(signum, file=sys.__stderr__, all_threads=True, chain=False, **kwargs):
+ return faulthandler_register(signum, file=file, all_threads=all_threads,
+ chain=chain, **kwargs)
+ faulthandler.register = register
def init_signal(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -316,6 +352,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp,
kernel = kernel_factory(parent=self, session=self.session,
shell_streams=[shell_stream, control_stream],
+ iopub_thread=self.iopub_thread,
iopub_socket=self.iopub_socket,
stdin_socket=self.stdin_socket,
log=self.log,
diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py
index 20e56c9..e8eb71f 100644
--- a/ipykernel/kernelbase.py
+++ b/ipykernel/kernelbase.py
@@ -49,8 +49,9 @@ class Kernel(SingletonConfigurable):
profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
shell_streams = List()
control_stream = Instance(ZMQStream, allow_none=True)
- iopub_socket = Instance(zmq.Socket, allow_none=True)
- stdin_socket = Instance(zmq.Socket, allow_none=True)
+ iopub_socket = Any()
+ iopub_thread = Any()
+ stdin_socket = Any()
log = Instance(logging.Logger, allow_none=True)
# identities:
@@ -111,25 +112,29 @@ class Kernel(SingletonConfigurable):
# Track execution count here. For IPython, we override this to use the
# execution count we store in the shell.
execution_count = 0
-
+
+ msg_types = [
+ 'execute_request', 'complete_request',
+ 'inspect_request', 'history_request',
+ 'comm_info_request', 'kernel_info_request',
+ 'connect_request', 'shutdown_request',
+ 'is_complete_request',
+ # deprecated:
+ 'apply_request',
+ ]
+ # add deprecated ipyparallel control messages
+ control_msg_types = msg_types + ['clear_request', 'abort_request']
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
# Build dict of handlers for message types
- msg_types = [ 'execute_request', 'complete_request',
- 'inspect_request', 'history_request',
- 'comm_info_request', 'kernel_info_request',
- 'connect_request', 'shutdown_request',
- 'apply_request', 'is_complete_request',
- ]
self.shell_handlers = {}
- for msg_type in msg_types:
+ for msg_type in self.msg_types:
self.shell_handlers[msg_type] = getattr(self, msg_type)
... 502 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/ipykernel.git
More information about the Python-modules-commits
mailing list