[Python-modules-commits] [dask.distributed] 01/07: New upstream version 1.19.2+ds.1

Diane Trout diane at moszumanska.debian.org
Fri Dec 8 16:26:40 UTC 2017


This is an automated email from the git hooks/post-receive script.

diane pushed a commit to branch master
in repository dask.distributed.

commit e89c7a7924c5bf4f4239339f4d42f9e0e58936c1
Author: Diane Trout <diane at ghic.org>
Date:   Thu Oct 12 23:57:58 2017 -0700

    New upstream version 1.19.2+ds.1
---
 .travis.yml                                        |   3 +-
 LICENSE.txt                                        |   4 +-
 continuous_integration/setup_conda_environment.cmd |   2 +-
 continuous_integration/travis/install.sh           |   3 +-
 distributed/_concurrent_futures_thread.py          | 161 +++++
 distributed/_ipython_utils.py                      |  13 +-
 distributed/asyncio.py                             |  11 +-
 distributed/batched.py                             |  12 +-
 distributed/bokeh/__init__.py                      |   4 +-
 distributed/bokeh/background/server_lifecycle.py   |   4 +-
 distributed/bokeh/components.py                    | 302 ++++++++-
 distributed/bokeh/core.py                          |  19 -
 distributed/bokeh/scheduler.py                     | 114 ++--
 distributed/bokeh/tests/test_components.py         |  32 +-
 distributed/bokeh/tests/test_scheduler_bokeh.py    |  36 +-
 distributed/bokeh/tests/test_task_stream.py        |   5 +-
 distributed/bokeh/tests/test_worker_bokeh.py       |  11 +-
 distributed/bokeh/tests/test_worker_monitor.py     |   2 -
 distributed/bokeh/worker.py                        |  92 +--
 distributed/cli/dask_mpi.py                        | 100 +++
 distributed/cli/dask_scheduler.py                  |   3 +-
 distributed/cli/dask_ssh.py                        |  19 +-
 distributed/cli/dask_worker.py                     | 106 +--
 distributed/cli/dcluster.py                        |   1 -
 distributed/cli/dscheduler.py                      |   1 -
 distributed/cli/dworker.py                         |   1 -
 distributed/cli/tests/test_dask_mpi.py             |  70 ++
 distributed/cli/tests/test_dask_scheduler.py       |  28 +-
 distributed/cli/tests/test_dask_worker.py          |  71 +-
 distributed/cli/tests/test_tls_cli.py              |   4 +-
 distributed/cli/utils.py                           |   1 -
 distributed/client.py                              | 556 ++++++++++------
 distributed/comm/addressing.py                     |  15 +-
 distributed/comm/inproc.py                         |   6 +-
 distributed/comm/tcp.py                            |  18 +-
 distributed/comm/tests/test_comms.py               |  45 +-
 distributed/compatibility.py                       |   7 +-
 distributed/config.py                              |  25 +-
 distributed/config.yaml                            |  14 +
 distributed/core.py                                |  56 +-
 distributed/deploy/adaptive.py                     | 152 ++++-
 distributed/deploy/local.py                        |  23 +-
 distributed/deploy/ssh.py                          |   8 +-
 distributed/deploy/tests/test_adaptive.py          |  20 +-
 distributed/deploy/tests/test_local.py             |  81 ++-
 distributed/deploy/tests/test_ssh.py               |   9 +-
 distributed/deploy/utils_test.py                   |   2 +-
 distributed/diagnostics/eventstream.py             |   1 +
 distributed/diagnostics/plugin.py                  |   7 +-
 distributed/diagnostics/progress.py                |  45 +-
 distributed/diagnostics/progress_stream.py         |   1 +
 distributed/diagnostics/progressbar.py             |  62 +-
 distributed/diagnostics/scheduler.py               |   3 +-
 distributed/diagnostics/tests/test_eventstream.py  |  11 +-
 distributed/diagnostics/tests/test_progress.py     |  46 +-
 .../diagnostics/tests/test_progress_stream.py      |  13 +-
 distributed/diagnostics/tests/test_progressbar.py  |  12 +-
 .../tests/test_scheduler_diagnostics.py            |   8 +-
 distributed/diagnostics/tests/test_widgets.py      |  16 +-
 distributed/hdfs.py                                |   2 +-
 distributed/http/core.py                           |   4 +-
 distributed/http/scheduler.py                      |  10 +-
 distributed/http/tests/test_scheduler_http.py      |  16 +-
 distributed/http/tests/test_worker_http.py         |  17 +-
 distributed/http/worker.py                         |  19 +-
 distributed/nanny.py                               |  98 ++-
 distributed/process.py                             | 172 +++--
 distributed/profile.py                             | 226 +++++++
 distributed/protocol/__init__.py                   |   6 +-
 distributed/protocol/compression.py                |  11 +-
 distributed/protocol/core.py                       |   2 +-
 distributed/protocol/keras.py                      |   2 +-
 distributed/protocol/pickle.py                     |   2 +-
 distributed/protocol/serialize.py                  |   2 +
 distributed/protocol/tests/test_h5py.py            |   6 +-
 distributed/protocol/tests/test_keras.py           |   3 -
 distributed/protocol/tests/test_netcdf4.py         |   4 +-
 distributed/protocol/tests/test_numpy.py           |  80 ++-
 distributed/protocol/tests/test_pandas.py          |   9 +-
 distributed/protocol/tests/test_pickle.py          |   1 +
 distributed/protocol/tests/test_protocol.py        |  11 +-
 distributed/protocol/tests/test_protocol_utils.py  |   1 +
 distributed/protocol/tests/test_serialize.py       |   8 +-
 distributed/protocol/tests/test_sparse.py          |   5 +-
 distributed/protocol/utils.py                      |   2 +-
 distributed/publish.py                             |   1 +
 distributed/queues.py                              |  15 +-
 distributed/recreate_exceptions.py                 |   8 +-
 distributed/scheduler.py                           | 519 ++++++++++-----
 distributed/sizeof.py                              |   3 +-
 distributed/stealing.py                            |  31 +-
 distributed/system_monitor.py                      |  63 +-
 distributed/tests/make_tls_certs.py                |  35 +-
 distributed/tests/py3_test_asyncio.py              |   9 +-
 distributed/tests/py3_test_client.py               |  31 +-
 distributed/tests/py3_test_utils_tst.py            |   2 +-
 distributed/tests/test_as_completed.py             |  44 +-
 distributed/tests/test_asyncio.py                  |   2 +-
 distributed/tests/test_asyncprocess.py             | 149 ++++-
 distributed/tests/test_batched.py                  |  40 +-
 distributed/tests/test_client.py                   | 508 ++++++++++----
 distributed/tests/test_client_executor.py          |  13 +-
 distributed/tests/test_collections.py              |  22 +-
 distributed/tests/test_compatibility.py            |   1 +
 distributed/tests/test_config.py                   |  31 +-
 distributed/tests/test_core.py                     |  40 +-
 distributed/tests/test_counter.py                  |   5 +-
 distributed/tests/test_hdfs.py                     |  22 +-
 distributed/tests/test_ipython.py                  |  42 +-
 distributed/tests/test_joblib.py                   |   9 +-
 distributed/tests/test_metrics.py                  |   3 +-
 distributed/tests/test_nanny.py                    |  83 ++-
 distributed/tests/test_preload.py                  |   3 +-
 distributed/tests/test_profile.py                  | 108 +++
 distributed/tests/test_publish.py                  |  19 +-
 distributed/tests/test_queues.py                   |  12 +-
 distributed/tests/test_resources.py                |  70 +-
 distributed/tests/test_scheduler.py                | 162 +++--
 distributed/tests/test_security.py                 |  58 +-
 distributed/tests/test_sizeof.py                   |   6 +-
 distributed/tests/test_steal.py                    | 204 +++---
 distributed/tests/test_stress.py                   |  69 +-
 distributed/tests/test_submit_cli.py               |   6 +-
 distributed/tests/test_submit_remote_client.py     |   4 +-
 distributed/tests/test_threadpoolexecutor.py       |  22 +
 distributed/tests/test_tls_functional.py           |   9 +-
 distributed/tests/test_utils.py                    |  29 +-
 distributed/tests/test_utils_comm.py               |  14 +-
 distributed/tests/test_utils_test.py               |  14 +-
 distributed/tests/test_variable.py                 |  15 +-
 distributed/tests/test_worker.py                   | 301 +++++++--
 distributed/tests/test_worker_client.py            |  63 +-
 distributed/tests/test_worker_failure.py           |  72 +-
 distributed/threadpoolexecutor.py                  |  17 +-
 distributed/utils.py                               | 257 ++++++--
 distributed/utils_comm.py                          |   1 +
 distributed/utils_test.py                          | 236 +++++--
 distributed/variable.py                            |  10 +-
 distributed/versions.py                            |   2 +-
 distributed/worker.py                              | 726 +++++++++++++++------
 distributed/worker_client.py                       |   5 +-
 docs/source/adaptive.rst                           |  45 +-
 docs/source/api.rst                                |  15 +-
 docs/source/asynchronous.rst                       |  19 +-
 docs/source/changelog.rst                          |  73 +++
 docs/source/client.rst                             |  16 +-
 docs/source/conf.py                                |   4 +-
 docs/source/develop.rst                            |   2 +-
 docs/source/diagnosing-performance.rst             | 125 ++++
 docs/source/faq.rst                                |   2 +-
 docs/source/index.rst                              |   1 +
 docs/source/install.rst                            |   2 +-
 docs/source/ipython.rst                            |   6 +-
 docs/source/quickstart.rst                         |   2 +-
 docs/source/resources.rst                          |  25 +
 docs/source/setup.rst                              |  26 +
 docs/source/submitting-applications.rst            |   2 +-
 docs/source/worker.rst                             | 129 ++--
 requirements.txt                                   |   4 +-
 setup.cfg                                          |   8 +-
 setup.py                                           |   1 +
 161 files changed, 5842 insertions(+), 2126 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 18e51c7..8f080b1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,7 +44,8 @@ install:
 
 script:
   - source continuous_integration/travis/run_tests.sh
-  - flake8 distributed/*.py distributed/{bokeh,comm,deploy,protocol}/*.py # no tests yet
+  - flake8 distributed
+  - pycodestyle --select=E722 distributed  # check for bare `except:` blocks
 
 after_success:
   - if [[ $COVERAGE == true ]]; then coverage report; pip install -q coveralls ; coveralls ; fi
diff --git a/LICENSE.txt b/LICENSE.txt
index 71ddeb2..4172242 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1,4 +1,4 @@
-Copyright (c) 2015-2016, Continuum Analytics, Inc. and contributors
+Copyright (c) 2015-2017, Anaconda, Inc. and contributors
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without modification,
@@ -11,7 +11,7 @@ Redistributions in binary form must reproduce the above copyright notice,
 this list of conditions and the following disclaimer in the documentation
 and/or other materials provided with the distribution.
 
-Neither the name of Continuum Analytics nor the names of any contributors
+Neither the name of Anaconda nor the names of any contributors
 may be used to endorse or promote products derived from this software
 without specific prior written permission.
 
diff --git a/continuous_integration/setup_conda_environment.cmd b/continuous_integration/setup_conda_environment.cmd
index 9a2c2c3..2df5802 100644
--- a/continuous_integration/setup_conda_environment.cmd
+++ b/continuous_integration/setup_conda_environment.cmd
@@ -32,7 +32,7 @@ call deactivate
     mock ^
     msgpack-python ^
     psutil ^
-    pytest ^
+    pytest=3.1 ^
     python=%PYTHON% ^
     requests ^
     toolz ^
diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh
index 9e91013..cce65bb 100644
--- a/continuous_integration/travis/install.sh
+++ b/continuous_integration/travis/install.sh
@@ -45,7 +45,8 @@ conda install -q -c conda-forge \
     netcdf4 \
     paramiko \
     psutil \
-    pytest \
+    pycodestyle \
+    pytest=3.1 \
     pytest-faulthandler \
     pytest-timeout \
     requests \
diff --git a/distributed/_concurrent_futures_thread.py b/distributed/_concurrent_futures_thread.py
new file mode 100644
index 0000000..06aa810
--- /dev/null
+++ b/distributed/_concurrent_futures_thread.py
@@ -0,0 +1,161 @@
+# This was copied from CPython 3.6
+
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
+
+"""Implements ThreadPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian at sweetapp.com)'
+
+import atexit
+from concurrent.futures import _base
+import itertools
+try:
+    import queue
+except ImportError:
+    import Queue as queue
+import threading
+import weakref
+import os
+
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+#   - The workers would still be running during interpreter shutdown,
+#     meaning that they would fail in unpredictable ways.
+#   - The workers could be killed while evaluating a work item, which could
+#     be bad if the callable being evaluated has external side-effects e.g.
+#     writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
+_threads_queues = weakref.WeakKeyDictionary()
+_shutdown = False
+
+
+def _python_exit():
+    global _shutdown
+    _shutdown = True
+    items = list(_threads_queues.items())
+    for t, q in items:
+        q.put(None)
+    for t, q in items:
+        t.join()
+
+
+atexit.register(_python_exit)
+
+
+class _WorkItem(object):
+    def __init__(self, future, fn, args, kwargs):
+        self.future = future
+        self.fn = fn
+        self.args = args
+        self.kwargs = kwargs
+
+    def run(self):
+        if not self.future.set_running_or_notify_cancel():
+            return
+
+        try:
+            result = self.fn(*self.args, **self.kwargs)
+        except BaseException as e:
+            self.future.set_exception(e)
+        else:
+            self.future.set_result(result)
+
+
+def _worker(executor_reference, work_queue):
+    try:
+        while True:
+            work_item = work_queue.get(block=True)
+            if work_item is not None:
+                work_item.run()
+                # Delete references to object. See issue16284
+                del work_item
+                continue
+            executor = executor_reference()
+            # Exit if:
+            #   - The interpreter is shutting down OR
+            #   - The executor that owns the worker has been collected OR
+            #   - The executor that owns the worker has been shutdown.
+            if _shutdown or executor is None or executor._shutdown:
+                # Notice other workers
+                work_queue.put(None)
+                return
+            del executor
+    except BaseException:
+        _base.LOGGER.critical('Exception in worker', exc_info=True)
+
+
+class ThreadPoolExecutor(_base.Executor):
+
+    # Used to assign unique thread names when thread_name_prefix is not supplied.
+    _counter = itertools.count()
+
+    def __init__(self, max_workers=None, thread_name_prefix=''):
+        """Initializes a new ThreadPoolExecutor instance.
+
+        Args:
+            max_workers: The maximum number of threads that can be used to
+                execute the given calls.
+            thread_name_prefix: An optional name prefix to give our threads.
+        """
+        if max_workers is None:
+            # Use this number because ThreadPoolExecutor is often
+            # used to overlap I/O instead of CPU work.
+            max_workers = (os.cpu_count() or 1) * 5
+        if max_workers <= 0:
+            raise ValueError("max_workers must be greater than 0")
+
+        self._max_workers = max_workers
+        self._work_queue = queue.Queue()
+        self._threads = set()
+        self._shutdown = False
+        self._shutdown_lock = threading.Lock()
+        self._thread_name_prefix = (thread_name_prefix or
+                                    ("ThreadPoolExecutor-%d" % next(self._counter)))
+
+    def submit(self, fn, *args, **kwargs):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after shutdown')
+
+            f = _base.Future()
+            w = _WorkItem(f, fn, args, kwargs)
+
+            self._work_queue.put(w)
+            self._adjust_thread_count()
+            return f
+    submit.__doc__ = _base.Executor.submit.__doc__
+
+    def _adjust_thread_count(self):
+        # When the executor gets lost, the weakref callback will wake up
+        # the worker threads.
+        def weakref_cb(_, q=self._work_queue):
+            q.put(None)
+        # TODO(bquinlan): Should avoid creating new threads if there are more
+        # idle threads than items in the work queue.
+        num_threads = len(self._threads)
+        if num_threads < self._max_workers:
+            thread_name = '%s_%d' % (self._thread_name_prefix or self,
+                                     num_threads)
+            t = threading.Thread(name=thread_name, target=_worker,
+                                 args=(weakref.ref(self, weakref_cb),
+                                       self._work_queue))
+            t.daemon = True
+            t.start()
+            self._threads.add(t)
+            _threads_queues[t] = self._work_queue
+
+    def shutdown(self, wait=True):
+        with self._shutdown_lock:
+            self._shutdown = True
+            self._work_queue.put(None)
+        if wait:
+            for t in self._threads:
+                t.join()
+    shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/distributed/_ipython_utils.py b/distributed/_ipython_utils.py
index 042258c..6f88087 100644
--- a/distributed/_ipython_utils.py
+++ b/distributed/_ipython_utils.py
@@ -79,7 +79,7 @@ def register_worker_magic(connection_info, magic_name='worker'):
     which run the given cell in a remote kernel.
     """
     ip = get_ipython()
-    info = dict(connection_info) # copy
+    info = dict(connection_info)  # copy
     key = info.pop('key')
     kc = BlockingKernelClient(**connection_info)
     kc.session.key = key
@@ -92,7 +92,7 @@ def register_worker_magic(connection_info, magic_name='worker'):
             cell = line
         run_cell_remote(ip, kc, cell)
 
-    remote.client = kc # preserve reference on kc, largely for mocking
+    remote.client = kc  # preserve reference on kc, largely for mocking
     ip.register_magic_function(remote, magic_kind='line', magic_name=magic_name)
     ip.register_magic_function(remote, magic_kind='cell', magic_name=magic_name)
 
@@ -118,7 +118,7 @@ def remote_magic(line, cell=None):
         raise NameError(info_name)
     connection_info = dict(ip.user_ns[info_name])
 
-    if not cell: # line magic, use the rest of the line
+    if not cell:  # line magic, use the rest of the line
         if len(split_line) == 1:
             raise ValueError("I need some code to run!")
         cell = split_line[1]
@@ -151,7 +151,7 @@ def register_remote_magic(magic_name='remote'):
     """
     ip = get_ipython()
     if ip is None:
-        return # do nothing if IPython's not running
+        return  # do nothing if IPython's not running
     ip.register_magic_function(remote_magic, magic_kind='line', magic_name=magic_name)
     ip.register_magic_function(remote_magic, magic_kind='cell', magic_name=magic_name)
 
@@ -217,7 +217,10 @@ def start_ipython(ip=None, ns=None, log=None):
     if ip:
         app.ip = ip
     # disable some signal handling, logging
-    noop = lambda : None
+
+    def noop():
+        return None
+
     app.init_signal = noop
     app.log_connection_info = noop
 
diff --git a/distributed/asyncio.py b/distributed/asyncio.py
index 47595c6..03e0e09 100644
--- a/distributed/asyncio.py
+++ b/distributed/asyncio.py
@@ -69,7 +69,7 @@ class AioClient(Client):
 
             # Use the client....
 
-            await client.shutdown()
+            await client.close()
 
     An ``async with`` statement is a more convenient way to start and shut down
     the client::
@@ -117,17 +117,17 @@ class AioClient(Client):
         return self
 
     async def __aexit__(self, type, value, traceback):
-        await self.shutdown()
+        await self.close()
 
     def __await__(self):
         return to_asyncio_future(self._started).__await__()
 
-    async def shutdown(self, fast=False):
+    async def close(self, fast=False):
         if self.status == 'closed':
             return
 
         try:
-            future = self._shutdown(fast=fast)
+            future = self._close(fast=fast)
             await to_asyncio_future(future)
 
             with ignoring(AttributeError):
@@ -138,7 +138,7 @@ class AioClient(Client):
             BaseAsyncIOLoop.clear_current()
 
     def __del__(self):
-        # Override Client.__del__ to avoid running self.shutdown()
+        # Override Client.__del__ to avoid running self.close()
         assert self.status != 'running'
 
     gather = to_asyncio(Client._gather)
@@ -157,6 +157,7 @@ class AioClient(Client):
     rebalance = to_asyncio(Client._rebalance)
     replicate = to_asyncio(Client._replicate)
     start_ipython_workers = to_asyncio(Client._start_ipython_workers)
+    shutdown = close
 
     def __enter__(self):
         raise RuntimeError("Use AioClient in an 'async with' block, not 'with'")
diff --git a/distributed/batched.py b/distributed/batched.py
index 6fa464e..b8f981d 100644
--- a/distributed/batched.py
+++ b/distributed/batched.py
@@ -7,7 +7,6 @@ from tornado import gen, locks
 from tornado.ioloop import IOLoop
 
 from .core import CommClosedError
-from .utils import ignoring
 
 
 logger = logging.getLogger(__name__)
@@ -65,23 +64,28 @@ class BatchedSend(object):
     @gen.coroutine
     def _background_send(self):
         while not self.please_stop:
-            with ignoring(gen.TimeoutError):
+            try:
                 yield self.waker.wait(self.next_deadline)
                 self.waker.clear()
+            except gen.TimeoutError:
+                pass
             if not self.buffer:
                 # Nothing to send
                 self.next_deadline = None
                 continue
             if (self.next_deadline is not None and
-                self.loop.time() < self.next_deadline):
+                    self.loop.time() < self.next_deadline):
                 # Send interval not expired yet
                 continue
             payload, self.buffer = self.buffer, []
             self.batch_count += 1
             self.next_deadline = self.loop.time() + self.interval
             try:
-                self.recent_message_log.append(payload)
                 nbytes = yield self.comm.write(payload)
+                if nbytes < 1e6:
+                    self.recent_message_log.append(payload)
+                else:
+                    self.recent_message_log.append('large-message')
                 self.byte_count += nbytes
             except CommClosedError as e:
                 logger.info("Batched Comm Closed: %s", e)
diff --git a/distributed/bokeh/__init__.py b/distributed/bokeh/__init__.py
index 188fc36..12dd9ba 100644
--- a/distributed/bokeh/__init__.py
+++ b/distributed/bokeh/__init__.py
@@ -28,8 +28,8 @@ messages = {
                     'deque': deque(maxlen=m),
                     'times': deque(maxlen=m),
                     'index': deque(maxlen=m),
-                    'rectangles':{name: deque(maxlen=m) for name in
-                                 'start duration key name color worker worker_thread y alpha'.split()},
+                    'rectangles': {name: deque(maxlen=m) for name in
+                                   'start duration key name color worker worker_thread y alpha'.split()},
                     'workers': dict(),
                     'last_seen': [time()]}
 }
diff --git a/distributed/bokeh/background/server_lifecycle.py b/distributed/bokeh/background/server_lifecycle.py
index 6f6c1a5..2f6e3b9 100644
--- a/distributed/bokeh/background/server_lifecycle.py
+++ b/distributed/bokeh/background/server_lifecycle.py
@@ -15,7 +15,7 @@ from distributed.metrics import time
 from distributed.protocol.pickle import dumps
 from distributed.diagnostics.eventstream import eventstream
 from distributed.diagnostics.progress_stream import (progress_stream,
-        task_stream_append)
+                                                     task_stream_append)
 from distributed.bokeh.worker_monitor import resource_append
 import distributed.bokeh
 from distributed.bokeh.utils import parse_args
@@ -56,7 +56,7 @@ def workers():
     """ Get data from JSON route, store in messages deques """
     try:
         response = yield client.fetch(
-                'http://%(host)s:%(http-port)d/workers.json' % options)
+            'http://%(host)s:%(http-port)d/workers.json' % options)
     except HTTPError:
         logger.warning("workers http route failed")
         return
diff --git a/distributed/bokeh/components.py b/distributed/bokeh/components.py
index 4c1203b..3282a7e 100644
--- a/distributed/bokeh/components.py
+++ b/distributed/bokeh/components.py
@@ -3,23 +3,27 @@ from __future__ import print_function, division, absolute_import
 from bisect import bisect
 from operator import add
 from time import time
+import weakref
 
 from bokeh.layouts import row, column
 from bokeh.models import (
-    ColumnDataSource, Plot, DataRange1d, Rect, LinearAxis,
-    DatetimeAxis, Grid, BasicTicker, HoverTool, BoxZoomTool, ResetTool,
+    ColumnDataSource, Plot, DataRange1d, LinearAxis,
+    DatetimeAxis, HoverTool, BoxZoomTool, ResetTool,
     PanTool, WheelZoomTool, Title, Range1d, Quad, Text, value, Line,
-    NumeralTickFormatter, ToolbarBox, Legend, BoxSelectTool,
-    Circle
+    NumeralTickFormatter, ToolbarBox, Legend, BoxSelectTool, TapTool,
+    Circle, OpenURL,
 )
-from bokeh.models.widgets import DataTable, TableColumn, NumberFormatter
+from bokeh.models.widgets import (DataTable, TableColumn, NumberFormatter,
+        Button, Select)
 from bokeh.palettes import Spectral9
 from bokeh.plotting import figure
 from toolz import valmap
+from tornado import gen
 
-from distributed.config import config
-from distributed.diagnostics.progress_stream import progress_quads, nbytes_bar
-from distributed.utils import log_errors
+from ..config import config
+from ..diagnostics.progress_stream import progress_quads, nbytes_bar
+from .. import profile
+from ..utils import log_errors
 
 if config.get('bokeh-export-tool', False):
     from .export_tool import ExportTool
@@ -27,6 +31,9 @@ else:
     ExportTool = None
 
 
+profile_interval = config.get('profile-interval', 10) / 1000
+
+
 class DashboardComponent(object):
     """ Base class for Dask.distributed UI dashboard components.
 
@@ -38,13 +45,13 @@ class DashboardComponent(object):
     *  update: a method that consumes the messages dictionary found in
                distributed.bokeh.messages
     """
+
     def __init__(self):
         self.source = None
         self.root = None
 
     def update(self, messages):
         """ Reads from bokeh.distributed.messages and updates self.source """
-        pass
 
 
 class TaskStream(DashboardComponent):
@@ -52,6 +59,7 @@ class TaskStream(DashboardComponent):
 
     The start and stop time of tasks as they occur on each core of the cluster.
     """
+
     def __init__(self, n_rectangles=1000, clear_interval=20000, **kwargs):
         """
         kwargs are applied to the bokeh.models.plots.Plot constructor
@@ -69,25 +77,21 @@ class TaskStream(DashboardComponent):
         x_range = DataRange1d(range_padding=0)
         y_range = DataRange1d(range_padding=0)
 
-        self.root = Plot(
-            title=Title(text="Task Stream"), id='bk-task-stream-plot',
+        self.root = figure(
+            title="Task Stream", id='bk-task-stream-plot',
             x_range=x_range, y_range=y_range, toolbar_location="above",
-            min_border_right=35, **kwargs
-        )
-
-        self.root.add_glyph(
-            self.source,
-            Rect(x="start", y="y", width="duration", height=0.4, fill_color="color",
-                 line_color="color", line_alpha=0.6, fill_alpha="alpha", line_width=3)
-        )
-
-        self.root.add_layout(DatetimeAxis(axis_label="Time"), "below")
+            x_axis_type='datetime', min_border_right=35, tools='', **kwargs)
+        self.root.yaxis.axis_label = 'Worker Core'
 
-        ticker = BasicTicker(num_minor_ticks=0)
-        self.root.add_layout(LinearAxis(axis_label="Worker Core", ticker=ticker), "left")
-        self.root.add_layout(Grid(dimension=1, grid_line_alpha=0.4, ticker=ticker))
+        rect = self.root.rect(source=self.source, x="start", y="y",
+            width="duration", height=0.4, fill_color="color",
+            line_color="color", line_alpha=0.6, fill_alpha="alpha",
+            line_width=3)
+        rect.nonselection_glyph = None
 
         self.root.yaxis.major_label_text_alpha = 0
+        self.root.yaxis.minor_tick_line_alpha = 0
+        self.root.xgrid.visible = False
 
         hover = HoverTool(
             point_policy="follow_mouse",
@@ -100,8 +104,10 @@ class TaskStream(DashboardComponent):
                 """
         )
 
+        tap = TapTool(callback=OpenURL(url='/profile?key=@name'))
+
         self.root.add_tools(
-            hover,
+            hover, tap,
             BoxZoomTool(),
             ResetTool(reset_size=False),
             PanTool(dimensions="width"),
@@ -219,13 +225,14 @@ class TaskProgress(DashboardComponent):
             self.source.data.update(d)
             if messages['tasks']['deque']:
                 self.root.title.text = ("Progress -- total: %(total)s, "
-                    "in-memory: %(in-memory)s, processing: %(processing)s, "
-                    "waiting: %(waiting)s, failed: %(failed)s"
-                    % messages['tasks']['deque'][-1])
+                                        "in-memory: %(in-memory)s, processing: %(processing)s, "
+                                        "waiting: %(waiting)s, failed: %(failed)s"
+                                        % messages['tasks']['deque'][-1])
 
 
 class MemoryUsage(DashboardComponent):
     """ The memory usage across the cluster, grouped by task type """
+
     def __init__(self, **kwargs):
         self.source = ColumnDataSource(data=dict(
             name=[], left=[], right=[], center=[], color=[],
@@ -273,7 +280,7 @@ class MemoryUsage(DashboardComponent):
             nb = nbytes_bar(msg['nbytes'])
             self.source.data.update(nb)
             self.root.title.text = \
-                    "Memory Use: %0.2f MB" % (sum(msg['nbytes'].values()) / 1e6)
+                "Memory Use: %0.2f MB" % (sum(msg['nbytes'].values()) / 1e6)
 
 
 class ResourceProfiles(DashboardComponent):
@@ -281,10 +288,11 @@ class ResourceProfiles(DashboardComponent):
 
     This is two plots, one for CPU and Memory and another for Network I/O
     """
+
     def __init__(self, **kwargs):
         self.source = ColumnDataSource(data={'time': [], 'cpu': [],
-            'memory_percent':[], 'network-send':[], 'network-recv':[]}
-        )
+                                             'memory_percent': [], 'network-send': [], 'network-recv': []}
+                                       )
 
         x_range = DataRange1d(follow='end', follow_interval=30000, range_padding=0)
 
@@ -383,6 +391,7 @@ class WorkerTable(DashboardComponent):
     This is two plots, a text-based table for each host and a thin horizontal
     plot laying out hosts by their current memory use.
     """
+
     def __init__(self, **kwargs):
         names = ['processes', 'disk-read', 'cores', 'cpu', 'disk-write',
                  'memory', 'last-seen', 'memory_percent', 'host']
@@ -476,12 +485,13 @@ class Processing(DashboardComponent):
     This shows how many tasks are actively running on each worker and how many
     tasks are enqueued for each worker and how many are in the common pool
     """
+
     def __init__(self, **kwargs):
         data = self.processing_update({'processing': {}, 'ncores': {}})
         self.source = ColumnDataSource(data)
 
         x_range = Range1d(-1, 1)
-        fig = figure(title='Processing and Pending', tools='resize',
+        fig = figure(title='Processing and Pending', tools='',
                      x_range=x_range, id='bk-processing-stacks-plot', **kwargs)
         fig.quad(source=self.source, left=0, right='right', color=Spectral9[0],
                  top='top', bottom='bottom')
@@ -543,3 +553,231 @@ class Processing(DashboardComponent):
             d['alpha'] = [0.7] * n
 
             return d
+
+
+class ProfilePlot(DashboardComponent):
+    """ Time plots of the current resource usage on the cluster
+
+    This is two plots, one for CPU and Memory and another for Network I/O
+    """
+
+    def __init__(self, **kwargs):
+        state = profile.create()
+        data = profile.plot_data(state, profile_interval)
+        self.states = data.pop('states')
+        self.source = ColumnDataSource(data=data)
+
+        def cb(attr, old, new):
+            with log_errors():
+                try:
+                    ind = new['1d']['indices'][0]
+                except IndexError:
+                    return
+                data = profile.plot_data(self.states[ind], profile_interval)
+                del self.states[:]
+                self.states.extend(data.pop('states'))
+                self.source.data.update(data)
+                self.source.selected = old
+
+        self.source.on_change('selected', cb)
+
+        self.root = figure(tools='tap', **kwargs)
+        self.root.quad('left', 'right', 'top', 'bottom', color='color',
+                      line_color='black', line_width=2, source=self.source)
+
+        hover = HoverTool(
+            point_policy="follow_mouse",
+            tooltips="""
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Name:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Filename:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Line number:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Line:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Time:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span>
+                </div>
+                """
+        )
+        self.root.add_tools(hover)
+
+        self.root.xaxis.visible = False
+        self.root.yaxis.visible = False
+        self.root.grid.visible = False
+
+    def update(self, state):
+        with log_errors():
+            self.state = state
+            data = profile.plot_data(self.state, profile_interval)
+            self.states = data.pop('states')
+            self.source.data.update(data)
+
+
+class ProfileTimePlot(DashboardComponent):
+    """ Time plots of the current resource usage on the cluster
+
+    This is two plots, one for CPU and Memory and another for Network I/O
+    """
+
+    def __init__(self, server, doc=None, **kwargs):
+        if doc is not None:
+            self.doc = weakref.ref(doc)
+            try:
+                self.key = doc.session_context.request.arguments.get('key', None)
+            except AttributeError:
+                self.key = None
+            if isinstance(self.key, list):
+                self.key = self.key[0]
+            if isinstance(self.key, bytes):
+                self.key = self.key.decode()
+            self.task_names = ['All', self.key]
+        else:
+            self.key = None
+            self.task_names = ['All']
+
+        self.server = server
+        self.start = None
+        self.stop = None
+        self.ts = {'count': [], 'time': []}
+        self.state = profile.create()
+        data = profile.plot_data(self.state, profile_interval)
+        self.states = data.pop('states')
+        self.source = ColumnDataSource(data=data)
+
+        def cb(attr, old, new):
+            with log_errors():
+                try:
+                    ind = new['1d']['indices'][0]
+                except IndexError:
+                    return
+                data = profile.plot_data(self.states[ind], profile_interval)
+                del self.states[:]
+                self.states.extend(data.pop('states'))
+                self.source.data.update(data)
+                self.source.selected = old
+
+        self.source.on_change('selected', cb)
+
+        self.profile_plot = figure(tools='tap', height=400, **kwargs)
+        self.profile_plot.quad('left', 'right', 'top', 'bottom', color='color',
+                               line_color='black', source=self.source)
+
+        hover = HoverTool(
+            point_policy="follow_mouse",
+            tooltips="""
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Name:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Filename:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Line number:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Line:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span>
+                </div>
+                <div>
+                    <span style="font-size: 14px; font-weight: bold;">Time:</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span>
+                </div>
+                """
+        )
+        self.profile_plot.add_tools(hover)
+
+        self.profile_plot.xaxis.visible = False
+        self.profile_plot.yaxis.visible = False
+        self.profile_plot.grid.visible = False
+
+        self.ts_source = ColumnDataSource({'time': [], 'count': []})
+        self.ts_plot = figure(title='Activity over time', height=100,
+                              x_axis_type='datetime', active_drag='xbox_select',
+                              tools='xpan,xwheel_zoom,xbox_select,reset',
+                              **kwargs)
+        self.ts_plot.line('time', 'count', source=self.ts_source)
+        self.ts_plot.circle('time', 'count', source=self.ts_source, color=None,
+                            selection_color='orange')
+        self.ts_plot.yaxis.visible = False
+        self.ts_plot.grid.visible = False
+
+        def ts_change(attr, old, new):
+            with log_errors():
+                selected = self.ts_source.selected['1d']['indices']
+                if selected:
+                    start = self.ts_source.data['time'][min(selected)] / 1000
+                    stop = self.ts_source.data['time'][max(selected)] / 1000
+                    self.start, self.stop = min(start, stop), max(start, stop)
+                else:
+                    self.start = self.stop = None
+                self.trigger_update(update_metadata=False)
+
+        self.ts_source.on_change('selected', ts_change)
+
+        self.reset_button = Button(label="Reset", button_type="success")
+        self.reset_button.on_click(lambda: self.update(self.state) )
+
+        self.update_button = Button(label="Update", button_type="success")
+        self.update_button.on_click(self.trigger_update)
+
+        self.select = Select(value=self.task_names[-1], options=self.task_names)
+
+        def select_cb(attr, old, new):
+            if new == 'All':
+                new = None
+            self.key = new
+            self.trigger_update(update_metadata=False)
+
+        self.select.on_change('value', select_cb)
+
+        self.root = column(row(self.select, self.reset_button,
+                               self.update_button, sizing_mode='scale_width'),
+                           self.profile_plot, self.ts_plot, **kwargs)
+
+    def update(self, state, metadata=None):
+        with log_errors():
+            self.state = state
+            data = profile.plot_data(self.state, profile_interval)
+            self.states = data.pop('states')
+            self.source.data.update(data)
+
+            if metadata is not None and metadata['counts']:
+                self.task_names = ['All'] + sorted(metadata['keys'])
+                self.select.options = self.task_names
+                if self.key:
+                    ts = metadata['keys'][self.key]
+                else:
+                    ts = metadata['counts']
+                times, counts = zip(*ts)
+                self.ts = {'count': counts, 'time': [t * 1000 for t in times]}
+
+                self.ts_source.data.update(self.ts)
+
+    def trigger_update(self, update_metadata=True):
+        @gen.coroutine
+        def cb():
+            with log_errors():
+                prof = self.server.get_profile(key=self.key, start=self.start, stop=self.stop)
+                if update_metadata:
+                    metadata = self.server.get_profile_metadata()
+                else:
+                    metadata = None
+                if isinstance(prof, gen.Future):
+                    prof, metadata = yield [prof, metadata]
+                self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
+
+        self.server.loop.add_callback(cb)
diff --git a/distributed/bokeh/core.py b/distributed/bokeh/core.py
index 2d0180a..102b51f 100644
... 16595 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/dask.distributed.git



More information about the Python-modules-commits mailing list