[Python-modules-commits] [dask.distributed] 01/07: New upstream version 1.19.2+ds.1
Diane Trout
diane at moszumanska.debian.org
Fri Dec 8 16:26:40 UTC 2017
This is an automated email from the git hooks/post-receive script.
diane pushed a commit to branch master
in repository dask.distributed.
commit e89c7a7924c5bf4f4239339f4d42f9e0e58936c1
Author: Diane Trout <diane at ghic.org>
Date: Thu Oct 12 23:57:58 2017 -0700
New upstream version 1.19.2+ds.1
---
.travis.yml | 3 +-
LICENSE.txt | 4 +-
continuous_integration/setup_conda_environment.cmd | 2 +-
continuous_integration/travis/install.sh | 3 +-
distributed/_concurrent_futures_thread.py | 161 +++++
distributed/_ipython_utils.py | 13 +-
distributed/asyncio.py | 11 +-
distributed/batched.py | 12 +-
distributed/bokeh/__init__.py | 4 +-
distributed/bokeh/background/server_lifecycle.py | 4 +-
distributed/bokeh/components.py | 302 ++++++++-
distributed/bokeh/core.py | 19 -
distributed/bokeh/scheduler.py | 114 ++--
distributed/bokeh/tests/test_components.py | 32 +-
distributed/bokeh/tests/test_scheduler_bokeh.py | 36 +-
distributed/bokeh/tests/test_task_stream.py | 5 +-
distributed/bokeh/tests/test_worker_bokeh.py | 11 +-
distributed/bokeh/tests/test_worker_monitor.py | 2 -
distributed/bokeh/worker.py | 92 +--
distributed/cli/dask_mpi.py | 100 +++
distributed/cli/dask_scheduler.py | 3 +-
distributed/cli/dask_ssh.py | 19 +-
distributed/cli/dask_worker.py | 106 +--
distributed/cli/dcluster.py | 1 -
distributed/cli/dscheduler.py | 1 -
distributed/cli/dworker.py | 1 -
distributed/cli/tests/test_dask_mpi.py | 70 ++
distributed/cli/tests/test_dask_scheduler.py | 28 +-
distributed/cli/tests/test_dask_worker.py | 71 +-
distributed/cli/tests/test_tls_cli.py | 4 +-
distributed/cli/utils.py | 1 -
distributed/client.py | 556 ++++++++++------
distributed/comm/addressing.py | 15 +-
distributed/comm/inproc.py | 6 +-
distributed/comm/tcp.py | 18 +-
distributed/comm/tests/test_comms.py | 45 +-
distributed/compatibility.py | 7 +-
distributed/config.py | 25 +-
distributed/config.yaml | 14 +
distributed/core.py | 56 +-
distributed/deploy/adaptive.py | 152 ++++-
distributed/deploy/local.py | 23 +-
distributed/deploy/ssh.py | 8 +-
distributed/deploy/tests/test_adaptive.py | 20 +-
distributed/deploy/tests/test_local.py | 81 ++-
distributed/deploy/tests/test_ssh.py | 9 +-
distributed/deploy/utils_test.py | 2 +-
distributed/diagnostics/eventstream.py | 1 +
distributed/diagnostics/plugin.py | 7 +-
distributed/diagnostics/progress.py | 45 +-
distributed/diagnostics/progress_stream.py | 1 +
distributed/diagnostics/progressbar.py | 62 +-
distributed/diagnostics/scheduler.py | 3 +-
distributed/diagnostics/tests/test_eventstream.py | 11 +-
distributed/diagnostics/tests/test_progress.py | 46 +-
.../diagnostics/tests/test_progress_stream.py | 13 +-
distributed/diagnostics/tests/test_progressbar.py | 12 +-
.../tests/test_scheduler_diagnostics.py | 8 +-
distributed/diagnostics/tests/test_widgets.py | 16 +-
distributed/hdfs.py | 2 +-
distributed/http/core.py | 4 +-
distributed/http/scheduler.py | 10 +-
distributed/http/tests/test_scheduler_http.py | 16 +-
distributed/http/tests/test_worker_http.py | 17 +-
distributed/http/worker.py | 19 +-
distributed/nanny.py | 98 ++-
distributed/process.py | 172 +++--
distributed/profile.py | 226 +++++++
distributed/protocol/__init__.py | 6 +-
distributed/protocol/compression.py | 11 +-
distributed/protocol/core.py | 2 +-
distributed/protocol/keras.py | 2 +-
distributed/protocol/pickle.py | 2 +-
distributed/protocol/serialize.py | 2 +
distributed/protocol/tests/test_h5py.py | 6 +-
distributed/protocol/tests/test_keras.py | 3 -
distributed/protocol/tests/test_netcdf4.py | 4 +-
distributed/protocol/tests/test_numpy.py | 80 ++-
distributed/protocol/tests/test_pandas.py | 9 +-
distributed/protocol/tests/test_pickle.py | 1 +
distributed/protocol/tests/test_protocol.py | 11 +-
distributed/protocol/tests/test_protocol_utils.py | 1 +
distributed/protocol/tests/test_serialize.py | 8 +-
distributed/protocol/tests/test_sparse.py | 5 +-
distributed/protocol/utils.py | 2 +-
distributed/publish.py | 1 +
distributed/queues.py | 15 +-
distributed/recreate_exceptions.py | 8 +-
distributed/scheduler.py | 519 ++++++++++-----
distributed/sizeof.py | 3 +-
distributed/stealing.py | 31 +-
distributed/system_monitor.py | 63 +-
distributed/tests/make_tls_certs.py | 35 +-
distributed/tests/py3_test_asyncio.py | 9 +-
distributed/tests/py3_test_client.py | 31 +-
distributed/tests/py3_test_utils_tst.py | 2 +-
distributed/tests/test_as_completed.py | 44 +-
distributed/tests/test_asyncio.py | 2 +-
distributed/tests/test_asyncprocess.py | 149 ++++-
distributed/tests/test_batched.py | 40 +-
distributed/tests/test_client.py | 508 ++++++++++----
distributed/tests/test_client_executor.py | 13 +-
distributed/tests/test_collections.py | 22 +-
distributed/tests/test_compatibility.py | 1 +
distributed/tests/test_config.py | 31 +-
distributed/tests/test_core.py | 40 +-
distributed/tests/test_counter.py | 5 +-
distributed/tests/test_hdfs.py | 22 +-
distributed/tests/test_ipython.py | 42 +-
distributed/tests/test_joblib.py | 9 +-
distributed/tests/test_metrics.py | 3 +-
distributed/tests/test_nanny.py | 83 ++-
distributed/tests/test_preload.py | 3 +-
distributed/tests/test_profile.py | 108 +++
distributed/tests/test_publish.py | 19 +-
distributed/tests/test_queues.py | 12 +-
distributed/tests/test_resources.py | 70 +-
distributed/tests/test_scheduler.py | 162 +++--
distributed/tests/test_security.py | 58 +-
distributed/tests/test_sizeof.py | 6 +-
distributed/tests/test_steal.py | 204 +++---
distributed/tests/test_stress.py | 69 +-
distributed/tests/test_submit_cli.py | 6 +-
distributed/tests/test_submit_remote_client.py | 4 +-
distributed/tests/test_threadpoolexecutor.py | 22 +
distributed/tests/test_tls_functional.py | 9 +-
distributed/tests/test_utils.py | 29 +-
distributed/tests/test_utils_comm.py | 14 +-
distributed/tests/test_utils_test.py | 14 +-
distributed/tests/test_variable.py | 15 +-
distributed/tests/test_worker.py | 301 +++++++--
distributed/tests/test_worker_client.py | 63 +-
distributed/tests/test_worker_failure.py | 72 +-
distributed/threadpoolexecutor.py | 17 +-
distributed/utils.py | 257 ++++++--
distributed/utils_comm.py | 1 +
distributed/utils_test.py | 236 +++++--
distributed/variable.py | 10 +-
distributed/versions.py | 2 +-
distributed/worker.py | 726 +++++++++++++++------
distributed/worker_client.py | 5 +-
docs/source/adaptive.rst | 45 +-
docs/source/api.rst | 15 +-
docs/source/asynchronous.rst | 19 +-
docs/source/changelog.rst | 73 +++
docs/source/client.rst | 16 +-
docs/source/conf.py | 4 +-
docs/source/develop.rst | 2 +-
docs/source/diagnosing-performance.rst | 125 ++++
docs/source/faq.rst | 2 +-
docs/source/index.rst | 1 +
docs/source/install.rst | 2 +-
docs/source/ipython.rst | 6 +-
docs/source/quickstart.rst | 2 +-
docs/source/resources.rst | 25 +
docs/source/setup.rst | 26 +
docs/source/submitting-applications.rst | 2 +-
docs/source/worker.rst | 129 ++--
requirements.txt | 4 +-
setup.cfg | 8 +-
setup.py | 1 +
161 files changed, 5842 insertions(+), 2126 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 18e51c7..8f080b1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,7 +44,8 @@ install:
script:
- source continuous_integration/travis/run_tests.sh
- - flake8 distributed/*.py distributed/{bokeh,comm,deploy,protocol}/*.py # no tests yet
+ - flake8 distributed
+ - pycodestyle --select=E722 distributed # check for bare `except:` blocks
after_success:
- if [[ $COVERAGE == true ]]; then coverage report; pip install -q coveralls ; coveralls ; fi
diff --git a/LICENSE.txt b/LICENSE.txt
index 71ddeb2..4172242 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1,4 +1,4 @@
-Copyright (c) 2015-2016, Continuum Analytics, Inc. and contributors
+Copyright (c) 2015-2017, Anaconda, Inc. and contributors
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
@@ -11,7 +11,7 @@ Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
-Neither the name of Continuum Analytics nor the names of any contributors
+Neither the name of Anaconda nor the names of any contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
diff --git a/continuous_integration/setup_conda_environment.cmd b/continuous_integration/setup_conda_environment.cmd
index 9a2c2c3..2df5802 100644
--- a/continuous_integration/setup_conda_environment.cmd
+++ b/continuous_integration/setup_conda_environment.cmd
@@ -32,7 +32,7 @@ call deactivate
mock ^
msgpack-python ^
psutil ^
- pytest ^
+ pytest=3.1 ^
python=%PYTHON% ^
requests ^
toolz ^
diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh
index 9e91013..cce65bb 100644
--- a/continuous_integration/travis/install.sh
+++ b/continuous_integration/travis/install.sh
@@ -45,7 +45,8 @@ conda install -q -c conda-forge \
netcdf4 \
paramiko \
psutil \
- pytest \
+ pycodestyle \
+ pytest=3.1 \
pytest-faulthandler \
pytest-timeout \
requests \
diff --git a/distributed/_concurrent_futures_thread.py b/distributed/_concurrent_futures_thread.py
new file mode 100644
index 0000000..06aa810
--- /dev/null
+++ b/distributed/_concurrent_futures_thread.py
@@ -0,0 +1,161 @@
+# This was copied from CPython 3.6
+
+# Copyright 2009 Brian Quinlan. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
+
+"""Implements ThreadPoolExecutor."""
+
+__author__ = 'Brian Quinlan (brian at sweetapp.com)'
+
+import atexit
+from concurrent.futures import _base
+import itertools
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+import threading
+import weakref
+import os
+
+# Workers are created as daemon threads. This is done to allow the interpreter
+# to exit when there are still idle threads in a ThreadPoolExecutor's thread
+# pool (i.e. shutdown() was not called). However, allowing workers to die with
+# the interpreter has two undesirable properties:
+# - The workers would still be running during interpreter shutdown,
+# meaning that they would fail in unpredictable ways.
+# - The workers could be killed while evaluating a work item, which could
+# be bad if the callable being evaluated has external side-effects e.g.
+# writing to a file.
+#
+# To work around this problem, an exit handler is installed which tells the
+# workers to exit when their work queues are empty and then waits until the
+# threads finish.
+
+_threads_queues = weakref.WeakKeyDictionary()
+_shutdown = False
+
+
+def _python_exit():
+ global _shutdown
+ _shutdown = True
+ items = list(_threads_queues.items())
+ for t, q in items:
+ q.put(None)
+ for t, q in items:
+ t.join()
+
+
+atexit.register(_python_exit)
+
+
+class _WorkItem(object):
+ def __init__(self, future, fn, args, kwargs):
+ self.future = future
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+
+ def run(self):
+ if not self.future.set_running_or_notify_cancel():
+ return
+
+ try:
+ result = self.fn(*self.args, **self.kwargs)
+ except BaseException as e:
+ self.future.set_exception(e)
+ else:
+ self.future.set_result(result)
+
+
+def _worker(executor_reference, work_queue):
+ try:
+ while True:
+ work_item = work_queue.get(block=True)
+ if work_item is not None:
+ work_item.run()
+ # Delete references to object. See issue16284
+ del work_item
+ continue
+ executor = executor_reference()
+ # Exit if:
+ # - The interpreter is shutting down OR
+ # - The executor that owns the worker has been collected OR
+ # - The executor that owns the worker has been shutdown.
+ if _shutdown or executor is None or executor._shutdown:
+ # Notice other workers
+ work_queue.put(None)
+ return
+ del executor
+ except BaseException:
+ _base.LOGGER.critical('Exception in worker', exc_info=True)
+
+
+class ThreadPoolExecutor(_base.Executor):
+
+ # Used to assign unique thread names when thread_name_prefix is not supplied.
+ _counter = itertools.count()
+
+ def __init__(self, max_workers=None, thread_name_prefix=''):
+ """Initializes a new ThreadPoolExecutor instance.
+
+ Args:
+ max_workers: The maximum number of threads that can be used to
+ execute the given calls.
+ thread_name_prefix: An optional name prefix to give our threads.
+ """
+ if max_workers is None:
+ # Use this number because ThreadPoolExecutor is often
+ # used to overlap I/O instead of CPU work.
+ max_workers = (os.cpu_count() or 1) * 5
+ if max_workers <= 0:
+ raise ValueError("max_workers must be greater than 0")
+
+ self._max_workers = max_workers
+ self._work_queue = queue.Queue()
+ self._threads = set()
+ self._shutdown = False
+ self._shutdown_lock = threading.Lock()
+ self._thread_name_prefix = (thread_name_prefix or
+ ("ThreadPoolExecutor-%d" % next(self._counter)))
+
+ def submit(self, fn, *args, **kwargs):
+ with self._shutdown_lock:
+ if self._shutdown:
+ raise RuntimeError('cannot schedule new futures after shutdown')
+
+ f = _base.Future()
+ w = _WorkItem(f, fn, args, kwargs)
+
+ self._work_queue.put(w)
+ self._adjust_thread_count()
+ return f
+ submit.__doc__ = _base.Executor.submit.__doc__
+
+ def _adjust_thread_count(self):
+ # When the executor gets lost, the weakref callback will wake up
+ # the worker threads.
+ def weakref_cb(_, q=self._work_queue):
+ q.put(None)
+ # TODO(bquinlan): Should avoid creating new threads if there are more
+ # idle threads than items in the work queue.
+ num_threads = len(self._threads)
+ if num_threads < self._max_workers:
+ thread_name = '%s_%d' % (self._thread_name_prefix or self,
+ num_threads)
+ t = threading.Thread(name=thread_name, target=_worker,
+ args=(weakref.ref(self, weakref_cb),
+ self._work_queue))
+ t.daemon = True
+ t.start()
+ self._threads.add(t)
+ _threads_queues[t] = self._work_queue
+
+ def shutdown(self, wait=True):
+ with self._shutdown_lock:
+ self._shutdown = True
+ self._work_queue.put(None)
+ if wait:
+ for t in self._threads:
+ t.join()
+ shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/distributed/_ipython_utils.py b/distributed/_ipython_utils.py
index 042258c..6f88087 100644
--- a/distributed/_ipython_utils.py
+++ b/distributed/_ipython_utils.py
@@ -79,7 +79,7 @@ def register_worker_magic(connection_info, magic_name='worker'):
which run the given cell in a remote kernel.
"""
ip = get_ipython()
- info = dict(connection_info) # copy
+ info = dict(connection_info) # copy
key = info.pop('key')
kc = BlockingKernelClient(**connection_info)
kc.session.key = key
@@ -92,7 +92,7 @@ def register_worker_magic(connection_info, magic_name='worker'):
cell = line
run_cell_remote(ip, kc, cell)
- remote.client = kc # preserve reference on kc, largely for mocking
+ remote.client = kc # preserve reference on kc, largely for mocking
ip.register_magic_function(remote, magic_kind='line', magic_name=magic_name)
ip.register_magic_function(remote, magic_kind='cell', magic_name=magic_name)
@@ -118,7 +118,7 @@ def remote_magic(line, cell=None):
raise NameError(info_name)
connection_info = dict(ip.user_ns[info_name])
- if not cell: # line magic, use the rest of the line
+ if not cell: # line magic, use the rest of the line
if len(split_line) == 1:
raise ValueError("I need some code to run!")
cell = split_line[1]
@@ -151,7 +151,7 @@ def register_remote_magic(magic_name='remote'):
"""
ip = get_ipython()
if ip is None:
- return # do nothing if IPython's not running
+ return # do nothing if IPython's not running
ip.register_magic_function(remote_magic, magic_kind='line', magic_name=magic_name)
ip.register_magic_function(remote_magic, magic_kind='cell', magic_name=magic_name)
@@ -217,7 +217,10 @@ def start_ipython(ip=None, ns=None, log=None):
if ip:
app.ip = ip
# disable some signal handling, logging
- noop = lambda : None
+
+ def noop():
+ return None
+
app.init_signal = noop
app.log_connection_info = noop
diff --git a/distributed/asyncio.py b/distributed/asyncio.py
index 47595c6..03e0e09 100644
--- a/distributed/asyncio.py
+++ b/distributed/asyncio.py
@@ -69,7 +69,7 @@ class AioClient(Client):
# Use the client....
- await client.shutdown()
+ await client.close()
An ``async with`` statement is a more convenient way to start and shut down
the client::
@@ -117,17 +117,17 @@ class AioClient(Client):
return self
async def __aexit__(self, type, value, traceback):
- await self.shutdown()
+ await self.close()
def __await__(self):
return to_asyncio_future(self._started).__await__()
- async def shutdown(self, fast=False):
+ async def close(self, fast=False):
if self.status == 'closed':
return
try:
- future = self._shutdown(fast=fast)
+ future = self._close(fast=fast)
await to_asyncio_future(future)
with ignoring(AttributeError):
@@ -138,7 +138,7 @@ class AioClient(Client):
BaseAsyncIOLoop.clear_current()
def __del__(self):
- # Override Client.__del__ to avoid running self.shutdown()
+ # Override Client.__del__ to avoid running self.close()
assert self.status != 'running'
gather = to_asyncio(Client._gather)
@@ -157,6 +157,7 @@ class AioClient(Client):
rebalance = to_asyncio(Client._rebalance)
replicate = to_asyncio(Client._replicate)
start_ipython_workers = to_asyncio(Client._start_ipython_workers)
+ shutdown = close
def __enter__(self):
raise RuntimeError("Use AioClient in an 'async with' block, not 'with'")
diff --git a/distributed/batched.py b/distributed/batched.py
index 6fa464e..b8f981d 100644
--- a/distributed/batched.py
+++ b/distributed/batched.py
@@ -7,7 +7,6 @@ from tornado import gen, locks
from tornado.ioloop import IOLoop
from .core import CommClosedError
-from .utils import ignoring
logger = logging.getLogger(__name__)
@@ -65,23 +64,28 @@ class BatchedSend(object):
@gen.coroutine
def _background_send(self):
while not self.please_stop:
- with ignoring(gen.TimeoutError):
+ try:
yield self.waker.wait(self.next_deadline)
self.waker.clear()
+ except gen.TimeoutError:
+ pass
if not self.buffer:
# Nothing to send
self.next_deadline = None
continue
if (self.next_deadline is not None and
- self.loop.time() < self.next_deadline):
+ self.loop.time() < self.next_deadline):
# Send interval not expired yet
continue
payload, self.buffer = self.buffer, []
self.batch_count += 1
self.next_deadline = self.loop.time() + self.interval
try:
- self.recent_message_log.append(payload)
nbytes = yield self.comm.write(payload)
+ if nbytes < 1e6:
+ self.recent_message_log.append(payload)
+ else:
+ self.recent_message_log.append('large-message')
self.byte_count += nbytes
except CommClosedError as e:
logger.info("Batched Comm Closed: %s", e)
diff --git a/distributed/bokeh/__init__.py b/distributed/bokeh/__init__.py
index 188fc36..12dd9ba 100644
--- a/distributed/bokeh/__init__.py
+++ b/distributed/bokeh/__init__.py
@@ -28,8 +28,8 @@ messages = {
'deque': deque(maxlen=m),
'times': deque(maxlen=m),
'index': deque(maxlen=m),
- 'rectangles':{name: deque(maxlen=m) for name in
- 'start duration key name color worker worker_thread y alpha'.split()},
+ 'rectangles': {name: deque(maxlen=m) for name in
+ 'start duration key name color worker worker_thread y alpha'.split()},
'workers': dict(),
'last_seen': [time()]}
}
diff --git a/distributed/bokeh/background/server_lifecycle.py b/distributed/bokeh/background/server_lifecycle.py
index 6f6c1a5..2f6e3b9 100644
--- a/distributed/bokeh/background/server_lifecycle.py
+++ b/distributed/bokeh/background/server_lifecycle.py
@@ -15,7 +15,7 @@ from distributed.metrics import time
from distributed.protocol.pickle import dumps
from distributed.diagnostics.eventstream import eventstream
from distributed.diagnostics.progress_stream import (progress_stream,
- task_stream_append)
+ task_stream_append)
from distributed.bokeh.worker_monitor import resource_append
import distributed.bokeh
from distributed.bokeh.utils import parse_args
@@ -56,7 +56,7 @@ def workers():
""" Get data from JSON route, store in messages deques """
try:
response = yield client.fetch(
- 'http://%(host)s:%(http-port)d/workers.json' % options)
+ 'http://%(host)s:%(http-port)d/workers.json' % options)
except HTTPError:
logger.warning("workers http route failed")
return
diff --git a/distributed/bokeh/components.py b/distributed/bokeh/components.py
index 4c1203b..3282a7e 100644
--- a/distributed/bokeh/components.py
+++ b/distributed/bokeh/components.py
@@ -3,23 +3,27 @@ from __future__ import print_function, division, absolute_import
from bisect import bisect
from operator import add
from time import time
+import weakref
from bokeh.layouts import row, column
from bokeh.models import (
- ColumnDataSource, Plot, DataRange1d, Rect, LinearAxis,
- DatetimeAxis, Grid, BasicTicker, HoverTool, BoxZoomTool, ResetTool,
+ ColumnDataSource, Plot, DataRange1d, LinearAxis,
+ DatetimeAxis, HoverTool, BoxZoomTool, ResetTool,
PanTool, WheelZoomTool, Title, Range1d, Quad, Text, value, Line,
- NumeralTickFormatter, ToolbarBox, Legend, BoxSelectTool,
- Circle
+ NumeralTickFormatter, ToolbarBox, Legend, BoxSelectTool, TapTool,
+ Circle, OpenURL,
)
-from bokeh.models.widgets import DataTable, TableColumn, NumberFormatter
+from bokeh.models.widgets import (DataTable, TableColumn, NumberFormatter,
+ Button, Select)
from bokeh.palettes import Spectral9
from bokeh.plotting import figure
from toolz import valmap
+from tornado import gen
-from distributed.config import config
-from distributed.diagnostics.progress_stream import progress_quads, nbytes_bar
-from distributed.utils import log_errors
+from ..config import config
+from ..diagnostics.progress_stream import progress_quads, nbytes_bar
+from .. import profile
+from ..utils import log_errors
if config.get('bokeh-export-tool', False):
from .export_tool import ExportTool
@@ -27,6 +31,9 @@ else:
ExportTool = None
+profile_interval = config.get('profile-interval', 10) / 1000
+
+
class DashboardComponent(object):
""" Base class for Dask.distributed UI dashboard components.
@@ -38,13 +45,13 @@ class DashboardComponent(object):
* update: a method that consumes the messages dictionary found in
distributed.bokeh.messages
"""
+
def __init__(self):
self.source = None
self.root = None
def update(self, messages):
""" Reads from bokeh.distributed.messages and updates self.source """
- pass
class TaskStream(DashboardComponent):
@@ -52,6 +59,7 @@ class TaskStream(DashboardComponent):
The start and stop time of tasks as they occur on each core of the cluster.
"""
+
def __init__(self, n_rectangles=1000, clear_interval=20000, **kwargs):
"""
kwargs are applied to the bokeh.models.plots.Plot constructor
@@ -69,25 +77,21 @@ class TaskStream(DashboardComponent):
x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
- self.root = Plot(
- title=Title(text="Task Stream"), id='bk-task-stream-plot',
+ self.root = figure(
+ title="Task Stream", id='bk-task-stream-plot',
x_range=x_range, y_range=y_range, toolbar_location="above",
- min_border_right=35, **kwargs
- )
-
- self.root.add_glyph(
- self.source,
- Rect(x="start", y="y", width="duration", height=0.4, fill_color="color",
- line_color="color", line_alpha=0.6, fill_alpha="alpha", line_width=3)
- )
-
- self.root.add_layout(DatetimeAxis(axis_label="Time"), "below")
+ x_axis_type='datetime', min_border_right=35, tools='', **kwargs)
+ self.root.yaxis.axis_label = 'Worker Core'
- ticker = BasicTicker(num_minor_ticks=0)
- self.root.add_layout(LinearAxis(axis_label="Worker Core", ticker=ticker), "left")
- self.root.add_layout(Grid(dimension=1, grid_line_alpha=0.4, ticker=ticker))
+ rect = self.root.rect(source=self.source, x="start", y="y",
+ width="duration", height=0.4, fill_color="color",
+ line_color="color", line_alpha=0.6, fill_alpha="alpha",
+ line_width=3)
+ rect.nonselection_glyph = None
self.root.yaxis.major_label_text_alpha = 0
+ self.root.yaxis.minor_tick_line_alpha = 0
+ self.root.xgrid.visible = False
hover = HoverTool(
point_policy="follow_mouse",
@@ -100,8 +104,10 @@ class TaskStream(DashboardComponent):
"""
)
+ tap = TapTool(callback=OpenURL(url='/profile?key=@name'))
+
self.root.add_tools(
- hover,
+ hover, tap,
BoxZoomTool(),
ResetTool(reset_size=False),
PanTool(dimensions="width"),
@@ -219,13 +225,14 @@ class TaskProgress(DashboardComponent):
self.source.data.update(d)
if messages['tasks']['deque']:
self.root.title.text = ("Progress -- total: %(total)s, "
- "in-memory: %(in-memory)s, processing: %(processing)s, "
- "waiting: %(waiting)s, failed: %(failed)s"
- % messages['tasks']['deque'][-1])
+ "in-memory: %(in-memory)s, processing: %(processing)s, "
+ "waiting: %(waiting)s, failed: %(failed)s"
+ % messages['tasks']['deque'][-1])
class MemoryUsage(DashboardComponent):
""" The memory usage across the cluster, grouped by task type """
+
def __init__(self, **kwargs):
self.source = ColumnDataSource(data=dict(
name=[], left=[], right=[], center=[], color=[],
@@ -273,7 +280,7 @@ class MemoryUsage(DashboardComponent):
nb = nbytes_bar(msg['nbytes'])
self.source.data.update(nb)
self.root.title.text = \
- "Memory Use: %0.2f MB" % (sum(msg['nbytes'].values()) / 1e6)
+ "Memory Use: %0.2f MB" % (sum(msg['nbytes'].values()) / 1e6)
class ResourceProfiles(DashboardComponent):
@@ -281,10 +288,11 @@ class ResourceProfiles(DashboardComponent):
This is two plots, one for CPU and Memory and another for Network I/O
"""
+
def __init__(self, **kwargs):
self.source = ColumnDataSource(data={'time': [], 'cpu': [],
- 'memory_percent':[], 'network-send':[], 'network-recv':[]}
- )
+ 'memory_percent': [], 'network-send': [], 'network-recv': []}
+ )
x_range = DataRange1d(follow='end', follow_interval=30000, range_padding=0)
@@ -383,6 +391,7 @@ class WorkerTable(DashboardComponent):
This is two plots, a text-based table for each host and a thin horizontal
plot laying out hosts by their current memory use.
"""
+
def __init__(self, **kwargs):
names = ['processes', 'disk-read', 'cores', 'cpu', 'disk-write',
'memory', 'last-seen', 'memory_percent', 'host']
@@ -476,12 +485,13 @@ class Processing(DashboardComponent):
This shows how many tasks are actively running on each worker and how many
tasks are enqueued for each worker and how many are in the common pool
"""
+
def __init__(self, **kwargs):
data = self.processing_update({'processing': {}, 'ncores': {}})
self.source = ColumnDataSource(data)
x_range = Range1d(-1, 1)
- fig = figure(title='Processing and Pending', tools='resize',
+ fig = figure(title='Processing and Pending', tools='',
x_range=x_range, id='bk-processing-stacks-plot', **kwargs)
fig.quad(source=self.source, left=0, right='right', color=Spectral9[0],
top='top', bottom='bottom')
@@ -543,3 +553,231 @@ class Processing(DashboardComponent):
d['alpha'] = [0.7] * n
return d
+
+
+class ProfilePlot(DashboardComponent):
+ """ Time plots of the current resource usage on the cluster
+
+ This is two plots, one for CPU and Memory and another for Network I/O
+ """
+
+ def __init__(self, **kwargs):
+ state = profile.create()
+ data = profile.plot_data(state, profile_interval)
+ self.states = data.pop('states')
+ self.source = ColumnDataSource(data=data)
+
+ def cb(attr, old, new):
+ with log_errors():
+ try:
+ ind = new['1d']['indices'][0]
+ except IndexError:
+ return
+ data = profile.plot_data(self.states[ind], profile_interval)
+ del self.states[:]
+ self.states.extend(data.pop('states'))
+ self.source.data.update(data)
+ self.source.selected = old
+
+ self.source.on_change('selected', cb)
+
+ self.root = figure(tools='tap', **kwargs)
+ self.root.quad('left', 'right', 'top', 'bottom', color='color',
+ line_color='black', line_width=2, source=self.source)
+
+ hover = HoverTool(
+ point_policy="follow_mouse",
+ tooltips="""
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Name:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Filename:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Line number:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Line:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Time:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span>
+ </div>
+ """
+ )
+ self.root.add_tools(hover)
+
+ self.root.xaxis.visible = False
+ self.root.yaxis.visible = False
+ self.root.grid.visible = False
+
+ def update(self, state):
+ with log_errors():
+ self.state = state
+ data = profile.plot_data(self.state, profile_interval)
+ self.states = data.pop('states')
+ self.source.data.update(data)
+
+
+class ProfileTimePlot(DashboardComponent):
+ """ Time plots of the current resource usage on the cluster
+
+ This is two plots, one for CPU and Memory and another for Network I/O
+ """
+
+ def __init__(self, server, doc=None, **kwargs):
+ if doc is not None:
+ self.doc = weakref.ref(doc)
+ try:
+ self.key = doc.session_context.request.arguments.get('key', None)
+ except AttributeError:
+ self.key = None
+ if isinstance(self.key, list):
+ self.key = self.key[0]
+ if isinstance(self.key, bytes):
+ self.key = self.key.decode()
+ self.task_names = ['All', self.key]
+ else:
+ self.key = None
+ self.task_names = ['All']
+
+ self.server = server
+ self.start = None
+ self.stop = None
+ self.ts = {'count': [], 'time': []}
+ self.state = profile.create()
+ data = profile.plot_data(self.state, profile_interval)
+ self.states = data.pop('states')
+ self.source = ColumnDataSource(data=data)
+
+ def cb(attr, old, new):
+ with log_errors():
+ try:
+ ind = new['1d']['indices'][0]
+ except IndexError:
+ return
+ data = profile.plot_data(self.states[ind], profile_interval)
+ del self.states[:]
+ self.states.extend(data.pop('states'))
+ self.source.data.update(data)
+ self.source.selected = old
+
+ self.source.on_change('selected', cb)
+
+ self.profile_plot = figure(tools='tap', height=400, **kwargs)
+ self.profile_plot.quad('left', 'right', 'top', 'bottom', color='color',
+ line_color='black', source=self.source)
+
+ hover = HoverTool(
+ point_policy="follow_mouse",
+ tooltips="""
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Name:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Filename:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Line number:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Line:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span>
+ </div>
+ <div>
+ <span style="font-size: 14px; font-weight: bold;">Time:</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span>
+ </div>
+ """
+ )
+ self.profile_plot.add_tools(hover)
+
+ self.profile_plot.xaxis.visible = False
+ self.profile_plot.yaxis.visible = False
+ self.profile_plot.grid.visible = False
+
+ self.ts_source = ColumnDataSource({'time': [], 'count': []})
+ self.ts_plot = figure(title='Activity over time', height=100,
+ x_axis_type='datetime', active_drag='xbox_select',
+ tools='xpan,xwheel_zoom,xbox_select,reset',
+ **kwargs)
+ self.ts_plot.line('time', 'count', source=self.ts_source)
+ self.ts_plot.circle('time', 'count', source=self.ts_source, color=None,
+ selection_color='orange')
+ self.ts_plot.yaxis.visible = False
+ self.ts_plot.grid.visible = False
+
+ def ts_change(attr, old, new):
+ with log_errors():
+ selected = self.ts_source.selected['1d']['indices']
+ if selected:
+ start = self.ts_source.data['time'][min(selected)] / 1000
+ stop = self.ts_source.data['time'][max(selected)] / 1000
+ self.start, self.stop = min(start, stop), max(start, stop)
+ else:
+ self.start = self.stop = None
+ self.trigger_update(update_metadata=False)
+
+ self.ts_source.on_change('selected', ts_change)
+
+ self.reset_button = Button(label="Reset", button_type="success")
+ self.reset_button.on_click(lambda: self.update(self.state) )
+
+ self.update_button = Button(label="Update", button_type="success")
+ self.update_button.on_click(self.trigger_update)
+
+ self.select = Select(value=self.task_names[-1], options=self.task_names)
+
+ def select_cb(attr, old, new):
+ if new == 'All':
+ new = None
+ self.key = new
+ self.trigger_update(update_metadata=False)
+
+ self.select.on_change('value', select_cb)
+
+ self.root = column(row(self.select, self.reset_button,
+ self.update_button, sizing_mode='scale_width'),
+ self.profile_plot, self.ts_plot, **kwargs)
+
+ def update(self, state, metadata=None):
+ with log_errors():
+ self.state = state
+ data = profile.plot_data(self.state, profile_interval)
+ self.states = data.pop('states')
+ self.source.data.update(data)
+
+ if metadata is not None and metadata['counts']:
+ self.task_names = ['All'] + sorted(metadata['keys'])
+ self.select.options = self.task_names
+ if self.key:
+ ts = metadata['keys'][self.key]
+ else:
+ ts = metadata['counts']
+ times, counts = zip(*ts)
+ self.ts = {'count': counts, 'time': [t * 1000 for t in times]}
+
+ self.ts_source.data.update(self.ts)
+
+ def trigger_update(self, update_metadata=True):
+ @gen.coroutine
+ def cb():
+ with log_errors():
+ prof = self.server.get_profile(key=self.key, start=self.start, stop=self.stop)
+ if update_metadata:
+ metadata = self.server.get_profile_metadata()
+ else:
+ metadata = None
+ if isinstance(prof, gen.Future):
+ prof, metadata = yield [prof, metadata]
+ self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
+
+ self.server.loop.add_callback(cb)
diff --git a/distributed/bokeh/core.py b/distributed/bokeh/core.py
index 2d0180a..102b51f 100644
... 16595 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/dask.distributed.git
More information about the Python-modules-commits
mailing list