[Python-modules-commits] [dask.distributed] 01/05: New upstream version 1.20.2+ds.1
Diane Trout
diane at moszumanska.debian.org
Sun Dec 10 05:34:42 UTC 2017
This is an automated email from the git hooks/post-receive script.
diane pushed a commit to branch master
in repository dask.distributed.
commit 1dfce3ce940167a205914ba0457cf992d2b03738
Author: Diane Trout <diane at ghic.org>
Date: Fri Dec 8 08:27:23 2017 -0800
New upstream version 1.20.2+ds.1
---
.travis.yml | 46 +-
conftest.py | 17 +
continuous_integration/Dockerfile | 27 -
continuous_integration/README.md | 37 --
continuous_integration/docker-files/core-site.xml | 7 -
continuous_integration/docker-files/hdfs-site.xml | 51 --
continuous_integration/docker-files/start.sh | 22 -
continuous_integration/run-hdfs.sh | 33 -
continuous_integration/setup_conda_environment.cmd | 2 +-
continuous_integration/travis/install.sh | 15 +-
continuous_integration/travis/run_tests.sh | 12 +-
distributed/__init__.py | 4 +-
distributed/asyncio.py | 65 +-
distributed/batched.py | 6 +-
distributed/bokeh/background/__init__.py | 0
distributed/bokeh/background/main.py | 0
distributed/bokeh/background/server_lifecycle.py | 139 -----
distributed/bokeh/components.py | 214 +------
distributed/bokeh/core.py | 23 +-
distributed/bokeh/memory-usage.py | 13 -
distributed/bokeh/processing-stacks.py | 13 -
distributed/bokeh/resource-profiles.py | 14 -
distributed/bokeh/scheduler.py | 60 +-
distributed/bokeh/scheduler_html.py | 144 +++++
distributed/bokeh/task-progress.py | 13 -
distributed/bokeh/task-stream.py | 14 -
distributed/bokeh/task_stream.py | 5 +-
distributed/bokeh/template.html | 1 +
distributed/bokeh/templates/call-stack.html | 15 +
distributed/bokeh/templates/json-index.html | 11 +
distributed/bokeh/templates/logs.html | 8 +
distributed/bokeh/templates/main.html | 13 +
distributed/bokeh/templates/task.html | 127 ++++
distributed/bokeh/templates/worker-table.html | 30 +
distributed/bokeh/templates/worker.html | 52 ++
distributed/bokeh/templates/workers.html | 9 +
distributed/bokeh/tests/test_components.py | 25 +-
distributed/bokeh/tests/test_scheduler_bokeh.py | 2 +-
.../bokeh/tests/test_scheduler_bokeh_html.py | 41 ++
distributed/bokeh/tests/test_worker_bokeh.py | 1 +
distributed/bokeh/tests/test_worker_monitor.py | 52 --
distributed/bokeh/worker-table.py | 14 -
distributed/bokeh/worker.py | 4 +
distributed/bokeh/worker_monitor.py | 101 ---
distributed/cli/dask_mpi.py | 23 +-
distributed/cli/dask_scheduler.py | 34 +-
distributed/cli/dask_worker.py | 50 +-
distributed/cli/dcluster.py | 11 -
distributed/cli/dscheduler.py | 11 -
distributed/cli/dworker.py | 11 -
distributed/cli/tests/test_dask_mpi.py | 26 +-
distributed/cli/tests/test_dask_scheduler.py | 10 +-
distributed/cli/tests/test_dask_worker.py | 4 +-
distributed/cli/utils.py | 44 +-
distributed/client.py | 677 ++++++++++++++-------
distributed/comm/inproc.py | 1 +
distributed/comm/tcp.py | 6 +-
distributed/comm/tests/test_comms.py | 164 +++--
distributed/config.py | 6 +-
distributed/config.yaml | 5 +
distributed/core.py | 48 +-
distributed/counter.py | 11 +-
distributed/deploy/adaptive.py | 30 +-
distributed/deploy/local.py | 84 ++-
distributed/deploy/tests/test_adaptive.py | 37 +-
distributed/deploy/tests/test_local.py | 23 +-
distributed/diagnostics/progressbar.py | 5 +-
distributed/diagnostics/scheduler.py | 71 ---
distributed/diagnostics/tests/test_progress.py | 3 +-
.../diagnostics/tests/test_progress_stream.py | 6 +-
distributed/diagnostics/tests/test_progressbar.py | 4 +-
.../tests/test_scheduler_diagnostics.py | 59 --
distributed/diskutils.py | 219 +++++++
distributed/hdfs.py | 80 ---
distributed/http/__init__.py | 4 -
distributed/http/core.py | 92 ---
distributed/http/scheduler.py | 116 ----
distributed/http/tests/test_scheduler_http.py | 189 ------
distributed/http/tests/test_worker_http.py | 106 ----
distributed/http/worker.py | 70 ---
distributed/lock.py | 146 +++++
distributed/locket.py | 198 ++++++
distributed/nanny.py | 30 +-
distributed/process.py | 33 +-
distributed/proctitle.py | 44 ++
distributed/profile.py | 12 +-
distributed/protocol/core.py | 5 +-
distributed/protocol/serialize.py | 9 +-
distributed/protocol/tests/test_pickle.py | 28 +-
distributed/publish.py | 32 +
distributed/pytest_resourceleaks.py | 438 +++++++++++++
distributed/queues.py | 21 +-
distributed/scheduler.py | 347 +++++++----
distributed/security.py | 5 +-
distributed/stealing.py | 227 +++++--
distributed/system_monitor.py | 12 +-
distributed/tests/py3_test_asyncio.py | 44 +-
distributed/tests/py3_test_client.py | 14 +-
distributed/tests/test_asyncprocess.py | 11 +-
distributed/tests/test_batched.py | 20 +-
distributed/tests/test_client.py | 427 +++++++++++--
distributed/tests/test_client_executor.py | 2 +-
distributed/tests/test_diskutils.py | 257 ++++++++
distributed/tests/test_hdfs.py | 436 -------------
distributed/tests/test_ipython.py | 35 +-
distributed/tests/test_locks.py | 95 +++
distributed/tests/test_nanny.py | 25 +-
distributed/tests/test_profile.py | 9 +-
distributed/tests/test_publish.py | 50 ++
distributed/tests/test_queues.py | 14 +
distributed/tests/test_resources.py | 22 +-
distributed/tests/test_scheduler.py | 183 ++++--
distributed/tests/test_security.py | 6 +-
distributed/tests/test_steal.py | 95 +--
distributed/tests/test_stress.py | 30 +-
distributed/tests/test_system_monitor.py | 16 +-
distributed/tests/test_threadpoolexecutor.py | 80 ++-
distributed/tests/test_tls_functional.py | 5 +-
distributed/tests/test_utils.py | 277 +++++++--
distributed/tests/test_utils_test.py | 17 +-
distributed/tests/test_variable.py | 46 +-
distributed/tests/test_worker.py | 122 ++--
distributed/threadpoolexecutor.py | 51 +-
distributed/utils.py | 385 +++++++++++-
distributed/utils_comm.py | 3 -
distributed/utils_test.py | 448 ++++++++------
distributed/variable.py | 10 +-
distributed/worker.py | 337 +++++-----
docs/Makefile | 2 +-
docs/source/api.rst | 8 +
docs/source/changelog.rst | 91 +++
docs/source/locality.rst | 2 +-
docs/source/protocol.rst | 5 +-
docs/source/publish.rst | 18 +
docs/source/related-work.rst | 2 +-
docs/source/resources.rst | 4 +-
docs/source/scheduling-policies.rst | 2 +
docs/source/scheduling-state.rst | 414 +++++++++----
docs/source/setup.rst | 9 +
docs/source/work-stealing.rst | 20 +-
requirements.txt | 15 +-
setup.cfg | 4 +-
setup.py | 7 +-
143 files changed, 5846 insertions(+), 3758 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 8f080b1..d6f1b62 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,51 +1,39 @@
-language: generic
-sudo: false
+language: python
+# sudo shouldn't be required, but currently tests fail when run in a container
+# on travis instead of a vm. See https://github.com/dask/distributed/pull/1563.
+sudo: required
dist: trusty
-services:
- - docker
env:
matrix:
- - PYTHON=2.7 PACKAGES="python-blosc futures faulthandler"
- - PYTHON=3.5 COVERAGE=true PACKAGES=python-blosc CRICK=true
- - PYTHON=3.6
- - HDFS=true PYTHON=2.7
- - HDFS=true PYTHON=3.5 PACKAGES=python-blosc
+ - PYTHON=2.7 TESTS=true PACKAGES="python-blosc futures faulthandler"
+ - PYTHON=3.5.4 TESTS=true COVERAGE=true PACKAGES=python-blosc CRICK=true
+ - PYTHON=3.6 TESTS=true
matrix:
fast_finish: true
include:
+ - os: linux
+ # Using Travis-CI's python makes job faster by not downloading miniconda
+ python: 3.6
+ env: LINT=true
- os: osx
env: PYTHON=3.6 RUNSLOW=false
+ # Tornado dev builds disabled, as Bokeh and IPython choke on it
+ #- env: PYTHON=3.6 TORNADO=dev
+ #- env: PYTHON=2.7 TORNADO=dev RUNSLOW=false PACKAGES="futures faulthandler"
# Together with fast_finish, allow build to be marked successful before the OS X job finishes
allow_failures:
- os: osx
# This needs to be the exact same line as above
env: PYTHON=3.6 RUNSLOW=false
-addons:
- hosts:
- # Need to make the container's hostname resolvable, since the HDFS
- # client will contact it back.
- - hdfs-container
-
-before_install:
- - |
- if [[ $HDFS == true ]]; then
- pushd continuous_integration
- docker build -t distributed-hdfs-test .
- # Run HDFS
- ./run-hdfs.sh || exit 1
- popd
- fi;
-
install:
- - source continuous_integration/travis/install.sh
+ - if [[ $TESTS == true ]]; then source continuous_integration/travis/install.sh ; fi
script:
- - source continuous_integration/travis/run_tests.sh
- - flake8 distributed
- - pycodestyle --select=E722 distributed # check for bare `except:` blocks
+ - if [[ $TESTS == true ]]; then source continuous_integration/travis/run_tests.sh ; fi
+ - if [[ $LINT == true ]]; then pip install flake8 ; flake8 distributed ; fi
after_success:
- if [[ $COVERAGE == true ]]; then coverage report; pip install -q coveralls ; coveralls ; fi
diff --git a/conftest.py b/conftest.py
index b5972ff..cba68bd 100644
--- a/conftest.py
+++ b/conftest.py
@@ -1,5 +1,22 @@
# https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option
+import os
import pytest
+
+
+# Uncomment to enable more logging and checks
+# (https://docs.python.org/3/library/asyncio-dev.html)
+# Note this makes things slower and might consume much memory.
+#os.environ["PYTHONASYNCIODEBUG"] = "1"
+
+try:
+ import faulthandler
+except ImportError:
+ pass
+else:
+ faulthandler.enable()
+
+
def pytest_addoption(parser):
parser.addoption("--runslow", action="store_true", help="run slow tests")
+pytest_plugins = ['distributed.pytest_resourceleaks']
diff --git a/continuous_integration/Dockerfile b/continuous_integration/Dockerfile
deleted file mode 100644
index 53b548a..0000000
--- a/continuous_integration/Dockerfile
+++ /dev/null
@@ -1,27 +0,0 @@
-FROM sequenceiq/hadoop-docker:2.7.1
-
-ENV HADOOP_PREFIX=/usr/local/hadoop
-ENV HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoop
-
-ADD docker-files/start.sh /tmp/start.sh
-ADD docker-files/*.xml $HADOOP_CONF_DIR/
-
-VOLUME /host
-
-EXPOSE 8020
-EXPOSE 50070
-
-CMD ["bash", "/tmp/start.sh"]
-
-# Build:
-#
-# $ docker build -t distributed-hdfs-test .
-
-# Configure host to access container HDFS:
-#
-# $ sudo bash -c 'echo "127.0.0.1 hdfs-container" >> /etc/hosts'
-
-# Execution:
-#
-# $ rm -f hdfs-initialized
-# $ docker run -d -h hdfs-container -v$(pwd):/host -p8020:8020 -p 50070:50070 distributed-hdfs-test
diff --git a/continuous_integration/README.md b/continuous_integration/README.md
deleted file mode 100644
index 2b73b5b..0000000
--- a/continuous_integration/README.md
+++ /dev/null
@@ -1,37 +0,0 @@
-# Continuous Integration
-
-## Local test
-
-Requirements:
-- `docker`
-
-Build the container:
-```bash
-docker build -t distributed-hdfs .
-```
-
-Start the container and wait for it to be ready:
-
-```bash
-docker run -it -p 8020:8020 -p 50070:50070 -v $(pwd):/distributed distributed-hdfs
-```
-
-Now the port `8020` and `50070` are in the host are pointing to the container and the source code (as a shared volume) is available in the container under `/distributed`
-
-Run the following from the root of this project directory to start a bash
-session in the running container:
-
-```bash
-# Get the container ID
-export CONTAINER_ID=$(docker ps -l -q)
-
-# Start the bash session
-docker exec -it $CONTAINER_ID bash
-```
-
-Now that we are in the container we can install the library and run the test:
-
-```bash
-python setup.py install
-py.test distributed -s -vv
-```
diff --git a/continuous_integration/docker-files/core-site.xml b/continuous_integration/docker-files/core-site.xml
deleted file mode 100644
index 14a30e6..0000000
--- a/continuous_integration/docker-files/core-site.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-<configuration>
- <property>
- <name>fs.defaultFS</name>
- <value>hdfs://0.0.0.0:8020</value>
- </property>
-</configuration>
-
diff --git a/continuous_integration/docker-files/hdfs-site.xml b/continuous_integration/docker-files/hdfs-site.xml
deleted file mode 100644
index 161c2ca..0000000
--- a/continuous_integration/docker-files/hdfs-site.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<configuration>
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
-
-<!-- <property>
- <name>dfs.namenode.rpc-address</name>
- <value>0.0.0.0:8020</value>
- <description>
- RPC address that handles all clients requests. In the case of HA/Federation where multiple namenodes exist,
- the name service id is added to the name e.g. dfs.namenode.rpc-address.ns1
- dfs.namenode.rpc-address.EXAMPLENAMESERVICE
- The value of this property will take the form of nn-host1:rpc-port.
- </description>
- </property>-->
-
-<!-- <property>
- <name>dfs.namenode.rpc-bind-host</name>
- <value>0.0.0.0</value>
- <description>
- The actual address the RPC server will bind to. If this optional address is
- set, it overrides only the hostname portion of dfs.namenode.rpc-address.
- It can also be specified per name node or name service for HA/Federation.
- This is useful for making the name node listen on all interfaces by
- setting it to 0.0.0.0.
- </description>
- </property>-->
-
- <property>
- <name>dfs.namenode.safemode.extension</name>
- <value>10</value>
- <description>
- Determines extension of safe mode in milliseconds
- after the threshold level is reached.
- </description>
- </property>
-
- <property>
- <name>dfs.permissions</name>
- <value>false</value>
- <description>
- If "true", enable permission checking in HDFS.
- If "false", permission checking is turned off,
- but all other behavior is unchanged.
- Switching from one parameter value to the other does not change the mode,
- owner or group of files or directories.
- </description>
- </property>
-
-</configuration>
diff --git a/continuous_integration/docker-files/start.sh b/continuous_integration/docker-files/start.sh
deleted file mode 100755
index 04140fd..0000000
--- a/continuous_integration/docker-files/start.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!bin/bash
-
-set -e
-
-export HADOOP_PREFIX=/usr/local/hadoop
-$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
-
-service sshd start
-
-rm -f /tmp/*.pid
-$HADOOP_PREFIX/sbin/start-dfs.sh
-
-echo "--"
-echo "-- HDFS started!"
-echo "--"
-
-# Wait for nodes to be fully initialized
-sleep 5
-touch /host/hdfs-initialized
-
-# Stay alive
-sleep infinity
diff --git a/continuous_integration/run-hdfs.sh b/continuous_integration/run-hdfs.sh
deleted file mode 100755
index 90e371c..0000000
--- a/continuous_integration/run-hdfs.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-HOSTDIR=$(pwd)
-INIT_MARKER=$HOSTDIR/hdfs-initialized
-
-# Remove initialization marker
-rm -f $INIT_MARKER
-
-CONTAINER_ID=$(docker run -d -h hdfs-container -v$HOSTDIR:/host -p8020:8020 -p 50070:50070 distributed-hdfs-test)
-
-if [ $? -ne 0 ]; then
- echo "Failed starting HDFS container"
- exit 1
-fi
-echo "Started HDFS container: $CONTAINER_ID"
-
-# CONTAINER_ID=$1
-CHECK_RUNNING="docker top $CONTAINER_ID"
-
-# Wait for initialization
-while [[ $($CHECK_RUNNING) ]] && [[ ! -f $INIT_MARKER ]]
-do
- sleep 1
-done
-
-# Error out if the container failed starting
-if [[ ! $($CHECK_RUNNING) ]]; then
- echo "HDFS startup failed! Logs follow"
- echo "-------------------------------------------------"
- docker logs $CONTAINER_ID
- echo "-------------------------------------------------"
- exit 1
-fi
diff --git a/continuous_integration/setup_conda_environment.cmd b/continuous_integration/setup_conda_environment.cmd
index 2df5802..44b5ad8 100644
--- a/continuous_integration/setup_conda_environment.cmd
+++ b/continuous_integration/setup_conda_environment.cmd
@@ -48,7 +48,7 @@ call activate %CONDA_ENV%
%PIP_INSTALL% git+https://github.com/joblib/joblib.git --upgrade
%PIP_INSTALL% git+https://github.com/dask/zict --upgrade
-%PIP_INSTALL% pytest-timeout pytest-faulthandler sortedcollections
+%PIP_INSTALL% pytest-repeat pytest-timeout pytest-faulthandler sortedcollections
@rem Display final environment (for reproducing)
%CONDA% list
diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh
index cce65bb..425710d 100644
--- a/continuous_integration/travis/install.sh
+++ b/continuous_integration/travis/install.sh
@@ -45,21 +45,17 @@ conda install -q -c conda-forge \
netcdf4 \
paramiko \
psutil \
- pycodestyle \
pytest=3.1 \
pytest-faulthandler \
pytest-timeout \
+ python=$PYTHON \
requests \
tblib \
toolz \
tornado=4.5 \
$PACKAGES
-if [[ $HDFS == true ]]; then
- conda install -q libxml2 krb5 boost
- conda install -q -c conda-forge libhdfs3 libgsasl libntlm
- pip install -q git+https://github.com/dask/hdfs3 --upgrade
-fi;
+pip install -q pytest-repeat
pip install -q git+https://github.com/dask/dask.git --upgrade
pip install -q git+https://github.com/joblib/joblib.git --upgrade
@@ -76,6 +72,13 @@ fi;
# Install distributed
pip install --no-deps -e .
+# Update Tornado to desired version
+if [[ $TORNADO == "dev" ]]; then
+ pip install -U https://github.com/tornadoweb/tornado/archive/master.zip
+elif [[ ! -z $TORNADO ]]; then
+ pip install -U tornado==$TORNADO
+fi
+
# For debugging
echo -e "--\n--Conda Environment\n--"
conda list
diff --git a/continuous_integration/travis/run_tests.sh b/continuous_integration/travis/run_tests.sh
index 8bacb6a..dbc0b21 100644
--- a/continuous_integration/travis/run_tests.sh
+++ b/continuous_integration/travis/run_tests.sh
@@ -18,17 +18,7 @@ echo "-- Hard limits"
echo "--"
ulimit -a -H
-if [[ $HDFS == true ]]; then
- py.test distributed/tests/test_hdfs.py $PYTEST_OPTIONS
- if [ $? -ne 0 ]; then
- # Diagnose test error
- echo "--"
- echo "-- HDFS namenode log follows"
- echo "--"
- docker exec -it $(docker ps -q) bash -c "tail -n50 /usr/local/hadoop/logs/hadoop-root-namenode-hdfs-container.log"
- (exit 1)
- fi
-elif [[ $COVERAGE == true ]]; then
+if [[ $COVERAGE == true ]]; then
coverage run $(which py.test) distributed -m "not avoid_travis" $PYTEST_OPTIONS;
else
py.test -m "not avoid_travis" distributed $PYTEST_OPTIONS;
diff --git a/distributed/__init__.py b/distributed/__init__.py
index 52cb8ab..db3f049 100644
--- a/distributed/__init__.py
+++ b/distributed/__init__.py
@@ -7,12 +7,14 @@ from .diagnostics import progress
from .client import (Client, Executor, CompatibleExecutor,
wait, as_completed, default_client, fire_and_forget,
Future)
+from .lock import Lock
from .nanny import Nanny
from .queues import Queue
from .scheduler import Scheduler
+from .threadpoolexecutor import rejoin
from .utils import sync
from .variable import Variable
-from .worker import Worker, get_worker, get_client, secede
+from .worker import Worker, get_worker, get_client, secede, Reschedule
from .worker_client import local_client, worker_client
from ._version import get_versions
diff --git a/distributed/asyncio.py b/distributed/asyncio.py
index 03e0e09..3d680a5 100644
--- a/distributed/asyncio.py
+++ b/distributed/asyncio.py
@@ -12,6 +12,7 @@ from tornado.platform.asyncio import to_asyncio_future
from . import client
from .client import Client, Future
+from .variable import Variable
from .utils import ignoring
@@ -25,17 +26,6 @@ def to_asyncio(fn, **default_kwargs):
return convert
-class AioFuture(Future):
- """Provides awaitable syntax for a distributed Future"""
-
- def __await__(self):
- return self.result().__await__()
-
- result = to_asyncio(Future._result)
- exception = to_asyncio(Future._exception)
- traceback = to_asyncio(Future._traceback)
-
-
class AioClient(Client):
""" Connect to and drive computation on a distributed Dask cluster
@@ -102,65 +92,28 @@ class AioClient(Client):
distributed.client.Client: Blocking Client
distributed.scheduler.Scheduler: Internal scheduler
"""
- _Future = AioFuture
-
def __init__(self, *args, **kwargs):
- if kwargs.get('set_as_default'):
- raise Exception("AioClient instance can't be set as default")
-
loop = asyncio.get_event_loop()
ioloop = BaseAsyncIOLoop(loop)
- super().__init__(*args, loop=ioloop, set_as_default=False, asynchronous=True, **kwargs)
+ super().__init__(*args, loop=ioloop, asynchronous=True, **kwargs)
+
+ def __enter__(self):
+ raise RuntimeError("Use AioClient in an 'async with' block, not 'with'")
async def __aenter__(self):
await to_asyncio_future(self._started)
return self
async def __aexit__(self, type, value, traceback):
- await self.close()
+ await to_asyncio_future(self._close())
def __await__(self):
return to_asyncio_future(self._started).__await__()
- async def close(self, fast=False):
- if self.status == 'closed':
- return
-
- try:
- future = self._close(fast=fast)
- await to_asyncio_future(future)
-
- with ignoring(AttributeError):
- future = self.cluster._close()
- await to_asyncio_future(future)
- self.cluster.status = 'closed'
- finally:
- BaseAsyncIOLoop.clear_current()
-
- def __del__(self):
- # Override Client.__del__ to avoid running self.close()
- assert self.status != 'running'
-
- gather = to_asyncio(Client._gather)
- scatter = to_asyncio(Client._scatter)
- cancel = to_asyncio(Client._cancel)
- publish_dataset = to_asyncio(Client._publish_dataset)
- get_dataset = to_asyncio(Client._get_dataset)
- run_on_scheduler = to_asyncio(Client._run_on_scheduler)
- run = to_asyncio(Client._run)
- run_coroutine = to_asyncio(Client._run_coroutine)
get = to_asyncio(Client.get, sync=False)
- upload_environment = to_asyncio(Client._upload_environment)
- restart = to_asyncio(Client._restart)
- upload_file = to_asyncio(Client._upload_file)
- upload_large_file = to_asyncio(Client._upload_large_file)
- rebalance = to_asyncio(Client._rebalance)
- replicate = to_asyncio(Client._replicate)
- start_ipython_workers = to_asyncio(Client._start_ipython_workers)
- shutdown = close
-
- def __enter__(self):
- raise RuntimeError("Use AioClient in an 'async with' block, not 'with'")
+ sync = to_asyncio(Client.sync)
+ close = to_asyncio(Client.close)
+ shutdown = to_asyncio(Client.shutdown)
class as_completed(client.as_completed):
diff --git a/distributed/batched.py b/distributed/batched.py
index b8f981d..152e8d4 100644
--- a/distributed/batched.py
+++ b/distributed/batched.py
@@ -56,11 +56,9 @@ class BatchedSend(object):
self.comm = comm
self.loop.add_callback(self._background_send)
- def __str__(self):
+ def __repr__(self):
return '<BatchedSend: %d in buffer>' % len(self.buffer)
- __repr__ = __str__
-
@gen.coroutine
def _background_send(self):
while not self.please_stop:
@@ -93,6 +91,8 @@ class BatchedSend(object):
except Exception:
logger.exception("Error in batched write")
break
+ finally:
+ payload = None # lose ref
self.stopped.set()
diff --git a/distributed/bokeh/background/__init__.py b/distributed/bokeh/background/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/distributed/bokeh/background/main.py b/distributed/bokeh/background/main.py
deleted file mode 100644
index e69de29..0000000
diff --git a/distributed/bokeh/background/server_lifecycle.py b/distributed/bokeh/background/server_lifecycle.py
deleted file mode 100644
index 2f6e3b9..0000000
--- a/distributed/bokeh/background/server_lifecycle.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-from __future__ import print_function, division, absolute_import
-
-import json
-import logging
-import sys
-
-from tornado import gen
-from tornado.httpclient import AsyncHTTPClient, HTTPError
-from tornado.ioloop import IOLoop
-
-from distributed.compatibility import ConnectionRefusedError
-from distributed.core import connect, CommClosedError
-from distributed.metrics import time
-from distributed.protocol.pickle import dumps
-from distributed.diagnostics.eventstream import eventstream
-from distributed.diagnostics.progress_stream import (progress_stream,
- task_stream_append)
-from distributed.bokeh.worker_monitor import resource_append
-import distributed.bokeh
-from distributed.bokeh.utils import parse_args
-from distributed.utils import log_errors
-
-
-logger = logging.getLogger(__name__)
-
-client = AsyncHTTPClient()
-
-messages = distributed.bokeh.messages # monkey-patching
-
-options = parse_args(sys.argv[1:])
-
-
- at gen.coroutine
-def http_get(route):
- """ Get data from JSON route, store in messages deques """
- try:
- url = 'http://%(host)s:%(http-port)d/' % options + route + '.json'
- response = yield client.fetch(url)
- except ConnectionRefusedError as e:
- logger.info("Can not connect to %s", url, exc_info=True)
- return
- except HTTPError:
- logger.warning("http route %s failed", route)
- return
- msg = json.loads(response.body.decode())
- messages[route]['deque'].append(msg)
- messages[route]['times'].append(time())
-
-
-last_index = [0]
-
-
- at gen.coroutine
-def workers():
- """ Get data from JSON route, store in messages deques """
- try:
- response = yield client.fetch(
- 'http://%(host)s:%(http-port)d/workers.json' % options)
- except HTTPError:
- logger.warning("workers http route failed")
- return
- msg = json.loads(response.body.decode())
- if msg:
- messages['workers']['deque'].append(msg)
- messages['workers']['times'].append(time())
- resource_append(messages['workers']['plot-data'], msg)
- index = messages['workers']['index']
- index.append(last_index[0] + 1)
- last_index[0] += 1
-
-
- at gen.coroutine
-def progress():
- with log_errors():
- addr = options['scheduler-address']
- comm = yield progress_stream(addr, 0.050)
- while True:
- try:
- msg = yield comm.read()
- except CommClosedError:
- break
- else:
- messages['progress'] = msg
-
-
- at gen.coroutine
-def processing():
- with log_errors():
- from distributed.diagnostics.scheduler import processing
- addr = options['scheduler-address']
- comm = yield connect(addr)
- yield comm.write({'op': 'feed',
- 'function': dumps(processing),
- 'interval': 0.200})
- while True:
- try:
- msg = yield comm.read()
- except CommClosedError:
- break
- else:
- messages['processing'] = msg
-
-
- at gen.coroutine
-def task_events(interval, deque, times, index, rectangles, workers, last_seen):
- i = 0
- try:
- addr = options['scheduler-address']
- comm = yield eventstream(addr, 0.100)
- while True:
- msgs = yield comm.read()
- if not msgs:
- continue
-
- last_seen[0] = time()
- for msg in msgs:
- if 'startstops' in msg:
- deque.append(msg)
- count = task_stream_append(rectangles, msg, workers)
- for _ in range(count):
- index.append(i)
- i += 1
-
- except CommClosedError:
- pass # don't log CommClosedErrors
- except Exception as e:
- logger.exception(e)
-
-
-def on_server_loaded(server_context):
- server_context.add_periodic_callback(workers, 500)
-
- server_context.add_periodic_callback(lambda: http_get('tasks'), 100)
- IOLoop.current().add_callback(processing)
-
- IOLoop.current().add_callback(progress)
-
- IOLoop.current().add_callback(task_events, **messages['task-events'])
diff --git a/distributed/bokeh/components.py b/distributed/bokeh/components.py
index 3282a7e..a42c83d 100644
--- a/distributed/bokeh/components.py
+++ b/distributed/bokeh/components.py
@@ -6,18 +6,11 @@ from time import time
import weakref
from bokeh.layouts import row, column
-from bokeh.models import (
- ColumnDataSource, Plot, DataRange1d, LinearAxis,
- DatetimeAxis, HoverTool, BoxZoomTool, ResetTool,
- PanTool, WheelZoomTool, Title, Range1d, Quad, Text, value, Line,
- NumeralTickFormatter, ToolbarBox, Legend, BoxSelectTool, TapTool,
- Circle, OpenURL,
-)
-from bokeh.models.widgets import (DataTable, TableColumn, NumberFormatter,
- Button, Select)
+from bokeh.models import ( ColumnDataSource, Plot, DataRange1d, LinearAxis,
+ HoverTool, BoxZoomTool, ResetTool, PanTool, WheelZoomTool, Range1d,
+ Quad, Text, value, TapTool, OpenURL, Button, Select)
from bokeh.palettes import Spectral9
from bokeh.plotting import figure
-from toolz import valmap
from tornado import gen
from ..config import config
@@ -70,7 +63,7 @@ class TaskStream(DashboardComponent):
self.source = ColumnDataSource(data=dict(
start=[time() - clear_interval], duration=[0.1], key=['start'],
- name=['start'], color=['white'],
+ name=['start'], color=['white'], duration_text=['100 ms'],
worker=['foo'], y=[0], worker_thread=[1], alpha=[0.0])
)
@@ -98,8 +91,7 @@ class TaskStream(DashboardComponent):
tooltips="""
<div>
<span style="font-size: 12px; font-weight: bold;">@name:</span>
- <span style="font-size: 10px; font-family: Monaco, monospace;">@duration</span>
- <span style="font-size: 10px;">ms</span>
+ <span style="font-size: 10px; font-family: Monaco, monospace;">@duration_text</span>
</div>
"""
)
@@ -283,202 +275,6 @@ class MemoryUsage(DashboardComponent):
"Memory Use: %0.2f MB" % (sum(msg['nbytes'].values()) / 1e6)
-class ResourceProfiles(DashboardComponent):
- """ Time plots of the current resource usage on the cluster
-
- This is two plots, one for CPU and Memory and another for Network I/O
- """
-
- def __init__(self, **kwargs):
- self.source = ColumnDataSource(data={'time': [], 'cpu': [],
- 'memory_percent': [], 'network-send': [], 'network-recv': []}
- )
-
- x_range = DataRange1d(follow='end', follow_interval=30000, range_padding=0)
-
- resource_plot = Plot(
- x_range=x_range, y_range=Range1d(start=0, end=1),
- toolbar_location=None, min_border_bottom=10, **kwargs
- )
-
- line_opts = dict(line_width=2, line_alpha=0.8)
- g1 = resource_plot.add_glyph(
- self.source,
- Line(x='time', y='memory_percent', line_color="#33a02c", **line_opts)
- )
- g2 = resource_plot.add_glyph(
- self.source,
- Line(x='time', y='cpu', line_color="#1f78b4", **line_opts)
- )
-
- resource_plot.add_layout(
- LinearAxis(formatter=NumeralTickFormatter(format="0 %")),
- 'left'
- )
-
- legend_opts = dict(
- location='top_left', orientation='horizontal', padding=5, margin=5,
- label_height=5)
-
- resource_plot.add_layout(
- Legend(items=[('Memory', [g1]), ('CPU', [g2])], **legend_opts)
- )
-
- network_plot = Plot(
- x_range=x_range, y_range=DataRange1d(start=0),
- toolbar_location=None, **kwargs
- )
- g1 = network_plot.add_glyph(
- self.source,
- Line(x='time', y='network-send', line_color="#a6cee3", **line_opts)
- )
- g2 = network_plot.add_glyph(
- self.source,
- Line(x='time', y='network-recv', line_color="#b2df8a", **line_opts)
- )
-
- network_plot.add_layout(DatetimeAxis(axis_label="Time"), "below")
- network_plot.add_layout(LinearAxis(axis_label="MB/s"), 'left')
- network_plot.add_layout(
- Legend(items=[('Network Send', [g1]), ('Network Recv', [g2])], **legend_opts)
- )
-
- tools = [
- PanTool(dimensions='width'), WheelZoomTool(dimensions='width'),
- BoxZoomTool(), ResetTool()
- ]
-
- if 'sizing_mode' in kwargs:
- sizing_mode = {'sizing_mode': kwargs['sizing_mode']}
- else:
- sizing_mode = {}
-
- combo_toolbar = ToolbarBox(
- tools=tools, logo=None, toolbar_location='right', **sizing_mode
- )
-
- self.root = row(
- column(resource_plot, network_plot, **sizing_mode),
- column(combo_toolbar, **sizing_mode),
- id='bk-resource-profiles-plot',
- **sizing_mode
- )
-
- # Required for update callback
- self.resource_index = [0]
-
- def update(self, messages):
- with log_errors():
- index = messages['workers']['index']
- data = messages['workers']['plot-data']
-
- if not index or index[-1] == self.resource_index[0]:
- return
-
- if self.resource_index == [0]:
- data = valmap(list, data)
-
- ind = bisect(index, self.resource_index[0])
- indexes = list(range(ind, len(index)))
- data = {k: [v[i] for i in indexes] for k, v in data.items()}
- self.resource_index[0] = index[-1]
- self.source.stream(data, 1000)
-
-
-class WorkerTable(DashboardComponent):
- """ Status of the current workers
-
- This is two plots, a text-based table for each host and a thin horizontal
- plot laying out hosts by their current memory use.
- """
... 13941 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/dask.distributed.git
More information about the Python-modules-commits
mailing list