[Python-modules-commits] [dask.distributed] 01/05: New upstream version 1.20.2+ds.1

Diane Trout diane at moszumanska.debian.org
Sun Dec 10 05:34:42 UTC 2017


This is an automated email from the git hooks/post-receive script.

diane pushed a commit to branch master
in repository dask.distributed.

commit 1dfce3ce940167a205914ba0457cf992d2b03738
Author: Diane Trout <diane at ghic.org>
Date:   Fri Dec 8 08:27:23 2017 -0800

    New upstream version 1.20.2+ds.1
---
 .travis.yml                                        |  46 +-
 conftest.py                                        |  17 +
 continuous_integration/Dockerfile                  |  27 -
 continuous_integration/README.md                   |  37 --
 continuous_integration/docker-files/core-site.xml  |   7 -
 continuous_integration/docker-files/hdfs-site.xml  |  51 --
 continuous_integration/docker-files/start.sh       |  22 -
 continuous_integration/run-hdfs.sh                 |  33 -
 continuous_integration/setup_conda_environment.cmd |   2 +-
 continuous_integration/travis/install.sh           |  15 +-
 continuous_integration/travis/run_tests.sh         |  12 +-
 distributed/__init__.py                            |   4 +-
 distributed/asyncio.py                             |  65 +-
 distributed/batched.py                             |   6 +-
 distributed/bokeh/background/__init__.py           |   0
 distributed/bokeh/background/main.py               |   0
 distributed/bokeh/background/server_lifecycle.py   | 139 -----
 distributed/bokeh/components.py                    | 214 +------
 distributed/bokeh/core.py                          |  23 +-
 distributed/bokeh/memory-usage.py                  |  13 -
 distributed/bokeh/processing-stacks.py             |  13 -
 distributed/bokeh/resource-profiles.py             |  14 -
 distributed/bokeh/scheduler.py                     |  60 +-
 distributed/bokeh/scheduler_html.py                | 144 +++++
 distributed/bokeh/task-progress.py                 |  13 -
 distributed/bokeh/task-stream.py                   |  14 -
 distributed/bokeh/task_stream.py                   |   5 +-
 distributed/bokeh/template.html                    |   1 +
 distributed/bokeh/templates/call-stack.html        |  15 +
 distributed/bokeh/templates/json-index.html        |  11 +
 distributed/bokeh/templates/logs.html              |   8 +
 distributed/bokeh/templates/main.html              |  13 +
 distributed/bokeh/templates/task.html              | 127 ++++
 distributed/bokeh/templates/worker-table.html      |  30 +
 distributed/bokeh/templates/worker.html            |  52 ++
 distributed/bokeh/templates/workers.html           |   9 +
 distributed/bokeh/tests/test_components.py         |  25 +-
 distributed/bokeh/tests/test_scheduler_bokeh.py    |   2 +-
 .../bokeh/tests/test_scheduler_bokeh_html.py       |  41 ++
 distributed/bokeh/tests/test_worker_bokeh.py       |   1 +
 distributed/bokeh/tests/test_worker_monitor.py     |  52 --
 distributed/bokeh/worker-table.py                  |  14 -
 distributed/bokeh/worker.py                        |   4 +
 distributed/bokeh/worker_monitor.py                | 101 ---
 distributed/cli/dask_mpi.py                        |  23 +-
 distributed/cli/dask_scheduler.py                  |  34 +-
 distributed/cli/dask_worker.py                     |  50 +-
 distributed/cli/dcluster.py                        |  11 -
 distributed/cli/dscheduler.py                      |  11 -
 distributed/cli/dworker.py                         |  11 -
 distributed/cli/tests/test_dask_mpi.py             |  26 +-
 distributed/cli/tests/test_dask_scheduler.py       |  10 +-
 distributed/cli/tests/test_dask_worker.py          |   4 +-
 distributed/cli/utils.py                           |  44 +-
 distributed/client.py                              | 677 ++++++++++++++-------
 distributed/comm/inproc.py                         |   1 +
 distributed/comm/tcp.py                            |   6 +-
 distributed/comm/tests/test_comms.py               | 164 +++--
 distributed/config.py                              |   6 +-
 distributed/config.yaml                            |   5 +
 distributed/core.py                                |  48 +-
 distributed/counter.py                             |  11 +-
 distributed/deploy/adaptive.py                     |  30 +-
 distributed/deploy/local.py                        |  84 ++-
 distributed/deploy/tests/test_adaptive.py          |  37 +-
 distributed/deploy/tests/test_local.py             |  23 +-
 distributed/diagnostics/progressbar.py             |   5 +-
 distributed/diagnostics/scheduler.py               |  71 ---
 distributed/diagnostics/tests/test_progress.py     |   3 +-
 .../diagnostics/tests/test_progress_stream.py      |   6 +-
 distributed/diagnostics/tests/test_progressbar.py  |   4 +-
 .../tests/test_scheduler_diagnostics.py            |  59 --
 distributed/diskutils.py                           | 219 +++++++
 distributed/hdfs.py                                |  80 ---
 distributed/http/__init__.py                       |   4 -
 distributed/http/core.py                           |  92 ---
 distributed/http/scheduler.py                      | 116 ----
 distributed/http/tests/test_scheduler_http.py      | 189 ------
 distributed/http/tests/test_worker_http.py         | 106 ----
 distributed/http/worker.py                         |  70 ---
 distributed/lock.py                                | 146 +++++
 distributed/locket.py                              | 198 ++++++
 distributed/nanny.py                               |  30 +-
 distributed/process.py                             |  33 +-
 distributed/proctitle.py                           |  44 ++
 distributed/profile.py                             |  12 +-
 distributed/protocol/core.py                       |   5 +-
 distributed/protocol/serialize.py                  |   9 +-
 distributed/protocol/tests/test_pickle.py          |  28 +-
 distributed/publish.py                             |  32 +
 distributed/pytest_resourceleaks.py                | 438 +++++++++++++
 distributed/queues.py                              |  21 +-
 distributed/scheduler.py                           | 347 +++++++----
 distributed/security.py                            |   5 +-
 distributed/stealing.py                            | 227 +++++--
 distributed/system_monitor.py                      |  12 +-
 distributed/tests/py3_test_asyncio.py              |  44 +-
 distributed/tests/py3_test_client.py               |  14 +-
 distributed/tests/test_asyncprocess.py             |  11 +-
 distributed/tests/test_batched.py                  |  20 +-
 distributed/tests/test_client.py                   | 427 +++++++++++--
 distributed/tests/test_client_executor.py          |   2 +-
 distributed/tests/test_diskutils.py                | 257 ++++++++
 distributed/tests/test_hdfs.py                     | 436 -------------
 distributed/tests/test_ipython.py                  |  35 +-
 distributed/tests/test_locks.py                    |  95 +++
 distributed/tests/test_nanny.py                    |  25 +-
 distributed/tests/test_profile.py                  |   9 +-
 distributed/tests/test_publish.py                  |  50 ++
 distributed/tests/test_queues.py                   |  14 +
 distributed/tests/test_resources.py                |  22 +-
 distributed/tests/test_scheduler.py                | 183 ++++--
 distributed/tests/test_security.py                 |   6 +-
 distributed/tests/test_steal.py                    |  95 +--
 distributed/tests/test_stress.py                   |  30 +-
 distributed/tests/test_system_monitor.py           |  16 +-
 distributed/tests/test_threadpoolexecutor.py       |  80 ++-
 distributed/tests/test_tls_functional.py           |   5 +-
 distributed/tests/test_utils.py                    | 277 +++++++--
 distributed/tests/test_utils_test.py               |  17 +-
 distributed/tests/test_variable.py                 |  46 +-
 distributed/tests/test_worker.py                   | 122 ++--
 distributed/threadpoolexecutor.py                  |  51 +-
 distributed/utils.py                               | 385 +++++++++++-
 distributed/utils_comm.py                          |   3 -
 distributed/utils_test.py                          | 448 ++++++++------
 distributed/variable.py                            |  10 +-
 distributed/worker.py                              | 337 +++++-----
 docs/Makefile                                      |   2 +-
 docs/source/api.rst                                |   8 +
 docs/source/changelog.rst                          |  91 +++
 docs/source/locality.rst                           |   2 +-
 docs/source/protocol.rst                           |   5 +-
 docs/source/publish.rst                            |  18 +
 docs/source/related-work.rst                       |   2 +-
 docs/source/resources.rst                          |   4 +-
 docs/source/scheduling-policies.rst                |   2 +
 docs/source/scheduling-state.rst                   | 414 +++++++++----
 docs/source/setup.rst                              |   9 +
 docs/source/work-stealing.rst                      |  20 +-
 requirements.txt                                   |  15 +-
 setup.cfg                                          |   4 +-
 setup.py                                           |   7 +-
 143 files changed, 5846 insertions(+), 3758 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 8f080b1..d6f1b62 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,51 +1,39 @@
-language: generic
-sudo: false
+language: python
+# sudo shouldn't be required, but currently tests fail when run in a container
+# on travis instead of a vm. See https://github.com/dask/distributed/pull/1563.
+sudo: required
 dist: trusty
-services:
-  - docker
 
 env:
   matrix:
-    - PYTHON=2.7 PACKAGES="python-blosc futures faulthandler"
-    - PYTHON=3.5 COVERAGE=true PACKAGES=python-blosc CRICK=true
-    - PYTHON=3.6
-    - HDFS=true PYTHON=2.7
-    - HDFS=true PYTHON=3.5 PACKAGES=python-blosc
+    - PYTHON=2.7 TESTS=true PACKAGES="python-blosc futures faulthandler"
+    - PYTHON=3.5.4 TESTS=true COVERAGE=true PACKAGES=python-blosc CRICK=true
+    - PYTHON=3.6 TESTS=true
 
 matrix:
   fast_finish: true
   include:
+  - os: linux
+    # Using Travis-CI's python makes job faster by not downloading miniconda
+    python: 3.6
+    env: LINT=true
   - os: osx
     env: PYTHON=3.6 RUNSLOW=false
+  # Tornado dev builds disabled, as Bokeh and IPython choke on it
+  #- env: PYTHON=3.6 TORNADO=dev
+  #- env: PYTHON=2.7 TORNADO=dev RUNSLOW=false PACKAGES="futures faulthandler"
   # Together with fast_finish, allow build to be marked successful before the OS X job finishes
   allow_failures:
   - os: osx
     # This needs to be the exact same line as above
     env: PYTHON=3.6 RUNSLOW=false
 
-addons:
-  hosts:
-    # Need to make the container's hostname resolvable, since the HDFS
-    # client will contact it back.
-    - hdfs-container
-
-before_install:
-  - |
-    if [[ $HDFS == true ]]; then
-        pushd continuous_integration
-        docker build -t distributed-hdfs-test .
-        # Run HDFS
-        ./run-hdfs.sh || exit 1
-        popd
-    fi;
-
 install:
-  - source continuous_integration/travis/install.sh
+  - if [[ $TESTS == true ]]; then source continuous_integration/travis/install.sh ; fi
 
 script:
-  - source continuous_integration/travis/run_tests.sh
-  - flake8 distributed
-  - pycodestyle --select=E722 distributed  # check for bare `except:` blocks
+  - if [[ $TESTS == true ]]; then source continuous_integration/travis/run_tests.sh ; fi
+  - if [[ $LINT == true ]]; then pip install flake8 ; flake8 distributed ; fi
 
 after_success:
   - if [[ $COVERAGE == true ]]; then coverage report; pip install -q coveralls ; coveralls ; fi
diff --git a/conftest.py b/conftest.py
index b5972ff..cba68bd 100644
--- a/conftest.py
+++ b/conftest.py
@@ -1,5 +1,22 @@
 # https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option
+import os
 import pytest
+
+
+# Uncomment to enable more logging and checks
+# (https://docs.python.org/3/library/asyncio-dev.html)
+# Note this makes things slower and might consume much memory.
+#os.environ["PYTHONASYNCIODEBUG"] = "1"
+
+try:
+    import faulthandler
+except ImportError:
+    pass
+else:
+    faulthandler.enable()
+
+
 def pytest_addoption(parser):
     parser.addoption("--runslow", action="store_true", help="run slow tests")
 
+pytest_plugins = ['distributed.pytest_resourceleaks']
diff --git a/continuous_integration/Dockerfile b/continuous_integration/Dockerfile
deleted file mode 100644
index 53b548a..0000000
--- a/continuous_integration/Dockerfile
+++ /dev/null
@@ -1,27 +0,0 @@
-FROM sequenceiq/hadoop-docker:2.7.1
-
-ENV HADOOP_PREFIX=/usr/local/hadoop
-ENV HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoop
-
-ADD docker-files/start.sh /tmp/start.sh
-ADD docker-files/*.xml $HADOOP_CONF_DIR/
-
-VOLUME /host
-
-EXPOSE 8020
-EXPOSE 50070
-
-CMD ["bash", "/tmp/start.sh"]
-
-# Build:
-#
-# $ docker build -t distributed-hdfs-test .
-
-# Configure host to access container HDFS:
-#
-# $ sudo bash -c 'echo "127.0.0.1 hdfs-container" >> /etc/hosts'
-
-# Execution:
-#
-# $ rm -f hdfs-initialized
-# $ docker run -d -h hdfs-container -v$(pwd):/host -p8020:8020 -p 50070:50070 distributed-hdfs-test
diff --git a/continuous_integration/README.md b/continuous_integration/README.md
deleted file mode 100644
index 2b73b5b..0000000
--- a/continuous_integration/README.md
+++ /dev/null
@@ -1,37 +0,0 @@
-# Continuous Integration
-
-## Local test
-
-Requirements:
--  `docker`
-
-Build the container:
-```bash
-docker build -t distributed-hdfs .
-```
-
-Start the container and wait for it to be ready:
-
-```bash
-docker run -it -p 8020:8020 -p 50070:50070 -v $(pwd):/distributed distributed-hdfs
-```
-
-Now the port `8020` and `50070` are in the host are pointing to the container and the source code (as a shared volume) is available in the container under `/distributed`
-
-Run the following from the root of this project directory to start a bash
-session in the running container:
-
-```bash
-# Get the container ID
-export CONTAINER_ID=$(docker ps -l -q)
-
-# Start the bash session
-docker exec -it $CONTAINER_ID bash
-```
-
-Now that we are in the container we can install the library and run the test:
-
-```bash
-python setup.py install
-py.test distributed -s -vv
-```
diff --git a/continuous_integration/docker-files/core-site.xml b/continuous_integration/docker-files/core-site.xml
deleted file mode 100644
index 14a30e6..0000000
--- a/continuous_integration/docker-files/core-site.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-<configuration>
-    <property>
-        <name>fs.defaultFS</name>
-        <value>hdfs://0.0.0.0:8020</value>
-    </property>
-</configuration>
-
diff --git a/continuous_integration/docker-files/hdfs-site.xml b/continuous_integration/docker-files/hdfs-site.xml
deleted file mode 100644
index 161c2ca..0000000
--- a/continuous_integration/docker-files/hdfs-site.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<configuration>
-    <property>
-        <name>dfs.replication</name>
-        <value>1</value>
-    </property>
-
-<!--    <property>
-        <name>dfs.namenode.rpc-address</name>
-        <value>0.0.0.0:8020</value>
-        <description>
-            RPC address that handles all clients requests. In the case of HA/Federation where multiple namenodes exist,
-            the name service id is added to the name e.g. dfs.namenode.rpc-address.ns1
-            dfs.namenode.rpc-address.EXAMPLENAMESERVICE
-            The value of this property will take the form of nn-host1:rpc-port.
-        </description>
-    </property>-->
-
-<!--    <property>
-        <name>dfs.namenode.rpc-bind-host</name>
-        <value>0.0.0.0</value>
-        <description>
-            The actual address the RPC server will bind to. If this optional address is
-            set, it overrides only the hostname portion of dfs.namenode.rpc-address.
-            It can also be specified per name node or name service for HA/Federation.
-            This is useful for making the name node listen on all interfaces by
-            setting it to 0.0.0.0.
-        </description>
-    </property>-->
-
-    <property>
-        <name>dfs.namenode.safemode.extension</name>
-        <value>10</value>
-        <description>
-            Determines extension of safe mode in milliseconds
-            after the threshold level is reached.
-        </description>
-    </property>
-
-    <property>
-        <name>dfs.permissions</name>
-        <value>false</value>
-        <description>
-            If "true", enable permission checking in HDFS.
-            If "false", permission checking is turned off,
-            but all other behavior is unchanged.
-            Switching from one parameter value to the other does not change the mode,
-            owner or group of files or directories.
-        </description>
-    </property>
-
-</configuration>
diff --git a/continuous_integration/docker-files/start.sh b/continuous_integration/docker-files/start.sh
deleted file mode 100755
index 04140fd..0000000
--- a/continuous_integration/docker-files/start.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!bin/bash
-
-set -e
-
-export HADOOP_PREFIX=/usr/local/hadoop
-$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
-
-service sshd start
-
-rm -f /tmp/*.pid
-$HADOOP_PREFIX/sbin/start-dfs.sh
-
-echo "--"
-echo "-- HDFS started!"
-echo "--"
-
-# Wait for nodes to be fully initialized
-sleep 5
-touch /host/hdfs-initialized
-
-# Stay alive
-sleep infinity
diff --git a/continuous_integration/run-hdfs.sh b/continuous_integration/run-hdfs.sh
deleted file mode 100755
index 90e371c..0000000
--- a/continuous_integration/run-hdfs.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-HOSTDIR=$(pwd)
-INIT_MARKER=$HOSTDIR/hdfs-initialized
-
-# Remove initialization marker
-rm -f $INIT_MARKER
-
-CONTAINER_ID=$(docker run -d -h hdfs-container -v$HOSTDIR:/host -p8020:8020 -p 50070:50070 distributed-hdfs-test)
-
-if [ $? -ne 0 ]; then
-    echo "Failed starting HDFS container"
-    exit 1
-fi
-echo "Started HDFS container: $CONTAINER_ID"
-
-# CONTAINER_ID=$1
-CHECK_RUNNING="docker top $CONTAINER_ID"
-
-# Wait for initialization
-while [[ $($CHECK_RUNNING) ]] && [[ ! -f $INIT_MARKER ]]
-do
-    sleep 1
-done
-
-# Error out if the container failed starting
-if [[ ! $($CHECK_RUNNING) ]]; then
-    echo "HDFS startup failed! Logs follow"
-    echo "-------------------------------------------------"
-    docker logs $CONTAINER_ID
-    echo "-------------------------------------------------"
-    exit 1
-fi
diff --git a/continuous_integration/setup_conda_environment.cmd b/continuous_integration/setup_conda_environment.cmd
index 2df5802..44b5ad8 100644
--- a/continuous_integration/setup_conda_environment.cmd
+++ b/continuous_integration/setup_conda_environment.cmd
@@ -48,7 +48,7 @@ call activate %CONDA_ENV%
 %PIP_INSTALL% git+https://github.com/joblib/joblib.git --upgrade
 %PIP_INSTALL% git+https://github.com/dask/zict --upgrade
 
-%PIP_INSTALL% pytest-timeout pytest-faulthandler sortedcollections
+%PIP_INSTALL% pytest-repeat pytest-timeout pytest-faulthandler sortedcollections
 
 @rem Display final environment (for reproducing)
 %CONDA% list
diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh
index cce65bb..425710d 100644
--- a/continuous_integration/travis/install.sh
+++ b/continuous_integration/travis/install.sh
@@ -45,21 +45,17 @@ conda install -q -c conda-forge \
     netcdf4 \
     paramiko \
     psutil \
-    pycodestyle \
     pytest=3.1 \
     pytest-faulthandler \
     pytest-timeout \
+    python=$PYTHON \
     requests \
     tblib \
     toolz \
     tornado=4.5 \
     $PACKAGES
 
-if [[ $HDFS == true ]]; then
-    conda install -q libxml2 krb5 boost
-    conda install -q -c conda-forge libhdfs3 libgsasl libntlm
-    pip install -q git+https://github.com/dask/hdfs3 --upgrade
-fi;
+pip install -q pytest-repeat
 
 pip install -q git+https://github.com/dask/dask.git --upgrade
 pip install -q git+https://github.com/joblib/joblib.git --upgrade
@@ -76,6 +72,13 @@ fi;
 # Install distributed
 pip install --no-deps -e .
 
+# Update Tornado to desired version
+if [[ $TORNADO == "dev" ]]; then
+    pip install -U https://github.com/tornadoweb/tornado/archive/master.zip
+elif [[ ! -z $TORNADO ]]; then
+    pip install -U tornado==$TORNADO
+fi
+
 # For debugging
 echo -e "--\n--Conda Environment\n--"
 conda list
diff --git a/continuous_integration/travis/run_tests.sh b/continuous_integration/travis/run_tests.sh
index 8bacb6a..dbc0b21 100644
--- a/continuous_integration/travis/run_tests.sh
+++ b/continuous_integration/travis/run_tests.sh
@@ -18,17 +18,7 @@ echo "-- Hard limits"
 echo "--"
 ulimit -a -H
 
-if [[ $HDFS == true ]]; then
-    py.test distributed/tests/test_hdfs.py $PYTEST_OPTIONS
-    if [ $? -ne 0 ]; then
-        # Diagnose test error
-        echo "--"
-        echo "-- HDFS namenode log follows"
-        echo "--"
-        docker exec -it $(docker ps -q) bash -c "tail -n50 /usr/local/hadoop/logs/hadoop-root-namenode-hdfs-container.log"
-        (exit 1)
-    fi
-elif [[ $COVERAGE == true ]]; then
+if [[ $COVERAGE == true ]]; then
     coverage run $(which py.test) distributed -m "not avoid_travis" $PYTEST_OPTIONS;
 else
     py.test -m "not avoid_travis" distributed $PYTEST_OPTIONS;
diff --git a/distributed/__init__.py b/distributed/__init__.py
index 52cb8ab..db3f049 100644
--- a/distributed/__init__.py
+++ b/distributed/__init__.py
@@ -7,12 +7,14 @@ from .diagnostics import progress
 from .client import (Client, Executor, CompatibleExecutor,
                      wait, as_completed, default_client, fire_and_forget,
                      Future)
+from .lock import Lock
 from .nanny import Nanny
 from .queues import Queue
 from .scheduler import Scheduler
+from .threadpoolexecutor import rejoin
 from .utils import sync
 from .variable import Variable
-from .worker import Worker, get_worker, get_client, secede
+from .worker import Worker, get_worker, get_client, secede, Reschedule
 from .worker_client import local_client, worker_client
 
 from ._version import get_versions
diff --git a/distributed/asyncio.py b/distributed/asyncio.py
index 03e0e09..3d680a5 100644
--- a/distributed/asyncio.py
+++ b/distributed/asyncio.py
@@ -12,6 +12,7 @@ from tornado.platform.asyncio import to_asyncio_future
 
 from . import client
 from .client import Client, Future
+from .variable import Variable
 from .utils import ignoring
 
 
@@ -25,17 +26,6 @@ def to_asyncio(fn, **default_kwargs):
     return convert
 
 
-class AioFuture(Future):
-    """Provides awaitable syntax for a distributed Future"""
-
-    def __await__(self):
-        return self.result().__await__()
-
-    result = to_asyncio(Future._result)
-    exception = to_asyncio(Future._exception)
-    traceback = to_asyncio(Future._traceback)
-
-
 class AioClient(Client):
     """ Connect to and drive computation on a distributed Dask cluster
 
@@ -102,65 +92,28 @@ class AioClient(Client):
     distributed.client.Client: Blocking Client
     distributed.scheduler.Scheduler: Internal scheduler
     """
-    _Future = AioFuture
-
     def __init__(self, *args, **kwargs):
-        if kwargs.get('set_as_default'):
-            raise Exception("AioClient instance can't be set as default")
-
         loop = asyncio.get_event_loop()
         ioloop = BaseAsyncIOLoop(loop)
-        super().__init__(*args, loop=ioloop, set_as_default=False, asynchronous=True, **kwargs)
+        super().__init__(*args, loop=ioloop, asynchronous=True, **kwargs)
+
+    def __enter__(self):
+        raise RuntimeError("Use AioClient in an 'async with' block, not 'with'")
 
     async def __aenter__(self):
         await to_asyncio_future(self._started)
         return self
 
     async def __aexit__(self, type, value, traceback):
-        await self.close()
+        await to_asyncio_future(self._close())
 
     def __await__(self):
         return to_asyncio_future(self._started).__await__()
 
-    async def close(self, fast=False):
-        if self.status == 'closed':
-            return
-
-        try:
-            future = self._close(fast=fast)
-            await to_asyncio_future(future)
-
-            with ignoring(AttributeError):
-                future = self.cluster._close()
-                await to_asyncio_future(future)
-                self.cluster.status = 'closed'
-        finally:
-            BaseAsyncIOLoop.clear_current()
-
-    def __del__(self):
-        # Override Client.__del__ to avoid running self.close()
-        assert self.status != 'running'
-
-    gather = to_asyncio(Client._gather)
-    scatter = to_asyncio(Client._scatter)
-    cancel = to_asyncio(Client._cancel)
-    publish_dataset = to_asyncio(Client._publish_dataset)
-    get_dataset = to_asyncio(Client._get_dataset)
-    run_on_scheduler = to_asyncio(Client._run_on_scheduler)
-    run = to_asyncio(Client._run)
-    run_coroutine = to_asyncio(Client._run_coroutine)
     get = to_asyncio(Client.get, sync=False)
-    upload_environment = to_asyncio(Client._upload_environment)
-    restart = to_asyncio(Client._restart)
-    upload_file = to_asyncio(Client._upload_file)
-    upload_large_file = to_asyncio(Client._upload_large_file)
-    rebalance = to_asyncio(Client._rebalance)
-    replicate = to_asyncio(Client._replicate)
-    start_ipython_workers = to_asyncio(Client._start_ipython_workers)
-    shutdown = close
-
-    def __enter__(self):
-        raise RuntimeError("Use AioClient in an 'async with' block, not 'with'")
+    sync = to_asyncio(Client.sync)
+    close = to_asyncio(Client.close)
+    shutdown = to_asyncio(Client.shutdown)
 
 
 class as_completed(client.as_completed):
diff --git a/distributed/batched.py b/distributed/batched.py
index b8f981d..152e8d4 100644
--- a/distributed/batched.py
+++ b/distributed/batched.py
@@ -56,11 +56,9 @@ class BatchedSend(object):
         self.comm = comm
         self.loop.add_callback(self._background_send)
 
-    def __str__(self):
+    def __repr__(self):
         return '<BatchedSend: %d in buffer>' % len(self.buffer)
 
-    __repr__ = __str__
-
     @gen.coroutine
     def _background_send(self):
         while not self.please_stop:
@@ -93,6 +91,8 @@ class BatchedSend(object):
             except Exception:
                 logger.exception("Error in batched write")
                 break
+            finally:
+                payload = None  # lose ref
 
         self.stopped.set()
 
diff --git a/distributed/bokeh/background/__init__.py b/distributed/bokeh/background/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/distributed/bokeh/background/main.py b/distributed/bokeh/background/main.py
deleted file mode 100644
index e69de29..0000000
diff --git a/distributed/bokeh/background/server_lifecycle.py b/distributed/bokeh/background/server_lifecycle.py
deleted file mode 100644
index 2f6e3b9..0000000
--- a/distributed/bokeh/background/server_lifecycle.py
+++ /dev/null
@@ -1,139 +0,0 @@
-#!/usr/bin/env python
-from __future__ import print_function, division, absolute_import
-
-import json
-import logging
-import sys
-
-from tornado import gen
-from tornado.httpclient import AsyncHTTPClient, HTTPError
-from tornado.ioloop import IOLoop
-
-from distributed.compatibility import ConnectionRefusedError
-from distributed.core import connect, CommClosedError
-from distributed.metrics import time
-from distributed.protocol.pickle import dumps
-from distributed.diagnostics.eventstream import eventstream
-from distributed.diagnostics.progress_stream import (progress_stream,
-                                                     task_stream_append)
-from distributed.bokeh.worker_monitor import resource_append
-import distributed.bokeh
-from distributed.bokeh.utils import parse_args
-from distributed.utils import log_errors
-
-
-logger = logging.getLogger(__name__)
-
-client = AsyncHTTPClient()
-
-messages = distributed.bokeh.messages  # monkey-patching
-
-options = parse_args(sys.argv[1:])
-
-
- at gen.coroutine
-def http_get(route):
-    """ Get data from JSON route, store in messages deques """
-    try:
-        url = 'http://%(host)s:%(http-port)d/' % options + route + '.json'
-        response = yield client.fetch(url)
-    except ConnectionRefusedError as e:
-        logger.info("Can not connect to %s", url, exc_info=True)
-        return
-    except HTTPError:
-        logger.warning("http route %s failed", route)
-        return
-    msg = json.loads(response.body.decode())
-    messages[route]['deque'].append(msg)
-    messages[route]['times'].append(time())
-
-
-last_index = [0]
-
-
- at gen.coroutine
-def workers():
-    """ Get data from JSON route, store in messages deques """
-    try:
-        response = yield client.fetch(
-            'http://%(host)s:%(http-port)d/workers.json' % options)
-    except HTTPError:
-        logger.warning("workers http route failed")
-        return
-    msg = json.loads(response.body.decode())
-    if msg:
-        messages['workers']['deque'].append(msg)
-        messages['workers']['times'].append(time())
-        resource_append(messages['workers']['plot-data'], msg)
-        index = messages['workers']['index']
-        index.append(last_index[0] + 1)
-        last_index[0] += 1
-
-
- at gen.coroutine
-def progress():
-    with log_errors():
-        addr = options['scheduler-address']
-        comm = yield progress_stream(addr, 0.050)
-        while True:
-            try:
-                msg = yield comm.read()
-            except CommClosedError:
-                break
-            else:
-                messages['progress'] = msg
-
-
- at gen.coroutine
-def processing():
-    with log_errors():
-        from distributed.diagnostics.scheduler import processing
-        addr = options['scheduler-address']
-        comm = yield connect(addr)
-        yield comm.write({'op': 'feed',
-                          'function': dumps(processing),
-                          'interval': 0.200})
-        while True:
-            try:
-                msg = yield comm.read()
-            except CommClosedError:
-                break
-            else:
-                messages['processing'] = msg
-
-
- at gen.coroutine
-def task_events(interval, deque, times, index, rectangles, workers, last_seen):
-    i = 0
-    try:
-        addr = options['scheduler-address']
-        comm = yield eventstream(addr, 0.100)
-        while True:
-            msgs = yield comm.read()
-            if not msgs:
-                continue
-
-            last_seen[0] = time()
-            for msg in msgs:
-                if 'startstops' in msg:
-                    deque.append(msg)
-                    count = task_stream_append(rectangles, msg, workers)
-                    for _ in range(count):
-                        index.append(i)
-                        i += 1
-
-    except CommClosedError:
-        pass  # don't log CommClosedErrors
-    except Exception as e:
-        logger.exception(e)
-
-
-def on_server_loaded(server_context):
-    server_context.add_periodic_callback(workers, 500)
-
-    server_context.add_periodic_callback(lambda: http_get('tasks'), 100)
-    IOLoop.current().add_callback(processing)
-
-    IOLoop.current().add_callback(progress)
-
-    IOLoop.current().add_callback(task_events, **messages['task-events'])
diff --git a/distributed/bokeh/components.py b/distributed/bokeh/components.py
index 3282a7e..a42c83d 100644
--- a/distributed/bokeh/components.py
+++ b/distributed/bokeh/components.py
@@ -6,18 +6,11 @@ from time import time
 import weakref
 
 from bokeh.layouts import row, column
-from bokeh.models import (
-    ColumnDataSource, Plot, DataRange1d, LinearAxis,
-    DatetimeAxis, HoverTool, BoxZoomTool, ResetTool,
-    PanTool, WheelZoomTool, Title, Range1d, Quad, Text, value, Line,
-    NumeralTickFormatter, ToolbarBox, Legend, BoxSelectTool, TapTool,
-    Circle, OpenURL,
-)
-from bokeh.models.widgets import (DataTable, TableColumn, NumberFormatter,
-        Button, Select)
+from bokeh.models import ( ColumnDataSource, Plot, DataRange1d, LinearAxis,
+        HoverTool, BoxZoomTool, ResetTool, PanTool, WheelZoomTool, Range1d,
+        Quad, Text, value, TapTool, OpenURL, Button, Select)
 from bokeh.palettes import Spectral9
 from bokeh.plotting import figure
-from toolz import valmap
 from tornado import gen
 
 from ..config import config
@@ -70,7 +63,7 @@ class TaskStream(DashboardComponent):
 
         self.source = ColumnDataSource(data=dict(
             start=[time() - clear_interval], duration=[0.1], key=['start'],
-            name=['start'], color=['white'],
+            name=['start'], color=['white'], duration_text=['100 ms'],
             worker=['foo'], y=[0], worker_thread=[1], alpha=[0.0])
         )
 
@@ -98,8 +91,7 @@ class TaskStream(DashboardComponent):
             tooltips="""
                 <div>
                     <span style="font-size: 12px; font-weight: bold;">@name:</span> 
-                    <span style="font-size: 10px; font-family: Monaco, monospace;">@duration</span>
-                    <span style="font-size: 10px;">ms</span> 
+                    <span style="font-size: 10px; font-family: Monaco, monospace;">@duration_text</span>
                 </div>
                 """
         )
@@ -283,202 +275,6 @@ class MemoryUsage(DashboardComponent):
                 "Memory Use: %0.2f MB" % (sum(msg['nbytes'].values()) / 1e6)
 
 
-class ResourceProfiles(DashboardComponent):
-    """ Time plots of the current resource usage on the cluster
-
-    This is two plots, one for CPU and Memory and another for Network I/O
-    """
-
-    def __init__(self, **kwargs):
-        self.source = ColumnDataSource(data={'time': [], 'cpu': [],
-                                             'memory_percent': [], 'network-send': [], 'network-recv': []}
-                                       )
-
-        x_range = DataRange1d(follow='end', follow_interval=30000, range_padding=0)
-
-        resource_plot = Plot(
-            x_range=x_range, y_range=Range1d(start=0, end=1),
-            toolbar_location=None, min_border_bottom=10, **kwargs
-        )
-
-        line_opts = dict(line_width=2, line_alpha=0.8)
-        g1 = resource_plot.add_glyph(
-            self.source,
-            Line(x='time', y='memory_percent', line_color="#33a02c", **line_opts)
-        )
-        g2 = resource_plot.add_glyph(
-            self.source,
-            Line(x='time', y='cpu', line_color="#1f78b4", **line_opts)
-        )
-
-        resource_plot.add_layout(
-            LinearAxis(formatter=NumeralTickFormatter(format="0 %")),
-            'left'
-        )
-
-        legend_opts = dict(
-            location='top_left', orientation='horizontal', padding=5, margin=5,
-            label_height=5)
-
-        resource_plot.add_layout(
-            Legend(items=[('Memory', [g1]), ('CPU', [g2])], **legend_opts)
-        )
-
-        network_plot = Plot(
-            x_range=x_range, y_range=DataRange1d(start=0),
-            toolbar_location=None, **kwargs
-        )
-        g1 = network_plot.add_glyph(
-            self.source,
-            Line(x='time', y='network-send', line_color="#a6cee3", **line_opts)
-        )
-        g2 = network_plot.add_glyph(
-            self.source,
-            Line(x='time', y='network-recv', line_color="#b2df8a", **line_opts)
-        )
-
-        network_plot.add_layout(DatetimeAxis(axis_label="Time"), "below")
-        network_plot.add_layout(LinearAxis(axis_label="MB/s"), 'left')
-        network_plot.add_layout(
-            Legend(items=[('Network Send', [g1]), ('Network Recv', [g2])], **legend_opts)
-        )
-
-        tools = [
-            PanTool(dimensions='width'), WheelZoomTool(dimensions='width'),
-            BoxZoomTool(), ResetTool()
-        ]
-
-        if 'sizing_mode' in kwargs:
-            sizing_mode = {'sizing_mode': kwargs['sizing_mode']}
-        else:
-            sizing_mode = {}
-
-        combo_toolbar = ToolbarBox(
-            tools=tools, logo=None, toolbar_location='right', **sizing_mode
-        )
-
-        self.root = row(
-            column(resource_plot, network_plot, **sizing_mode),
-            column(combo_toolbar, **sizing_mode),
-            id='bk-resource-profiles-plot',
-            **sizing_mode
-        )
-
-        # Required for update callback
-        self.resource_index = [0]
-
-    def update(self, messages):
-        with log_errors():
-            index = messages['workers']['index']
-            data = messages['workers']['plot-data']
-
-            if not index or index[-1] == self.resource_index[0]:
-                return
-
-            if self.resource_index == [0]:
-                data = valmap(list, data)
-
-            ind = bisect(index, self.resource_index[0])
-            indexes = list(range(ind, len(index)))
-            data = {k: [v[i] for i in indexes] for k, v in data.items()}
-            self.resource_index[0] = index[-1]
-            self.source.stream(data, 1000)
-
-
-class WorkerTable(DashboardComponent):
-    """ Status of the current workers
-
-    This is two plots, a text-based table for each host and a thin horizontal
-    plot laying out hosts by their current memory use.
-    """
... 13941 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/dask.distributed.git



More information about the Python-modules-commits mailing list