[Python-modules-commits] [dask] 01/06: New upstream version 0.16.0

Diane Trout diane at moszumanska.debian.org
Sat Dec 9 00:20:54 UTC 2017


This is an automated email from the git hooks/post-receive script.

diane pushed a commit to branch master
in repository dask.

commit fb0ff3945cc1fd9f545dd1851f427d58a184f9de
Author: Diane Trout <diane at ghic.org>
Date:   Thu Dec 7 16:57:37 2017 -0800

    New upstream version 0.16.0
---
 .travis.yml                                     |   6 +-
 continuous_integration/travis/install.sh        |  44 +-
 dask/__init__.py                                |   6 +-
 dask/array/__init__.py                          |   5 +-
 dask/array/chunk.py                             |   9 +
 dask/array/core.py                              | 171 ++++---
 dask/array/creation.py                          |   6 +-
 dask/array/ghost.py                             |   4 +-
 dask/array/optimization.py                      |  13 +-
 dask/array/percentile.py                        |   2 +-
 dask/array/random.py                            |  68 ++-
 dask/array/rechunk.py                           |   2 +-
 dask/array/reductions.py                        |   8 +
 dask/array/reshape.py                           |   2 +-
 dask/array/routines.py                          | 274 ++++++++--
 dask/array/slicing.py                           |  16 +-
 dask/array/tests/test_array_core.py             | 155 ++++--
 dask/array/tests/test_optimization.py           |  29 +-
 dask/array/tests/test_random.py                 |  49 ++
 dask/array/tests/test_reductions.py             |   2 +-
 dask/array/tests/test_routines.py               | 164 +++++-
 dask/array/ufunc.py                             |  22 +-
 dask/array/utils.py                             |   2 +-
 dask/async.py                                   |  19 -
 dask/bag/core.py                                | 191 +++----
 dask/bag/tests/test_bag.py                      |  17 +-
 dask/base.py                                    | 640 +++++++++++++-----------
 dask/bytes/core.py                              |  44 +-
 dask/bytes/local.py                             |   5 +
 dask/bytes/s3.py                                |   5 +
 dask/bytes/tests/test_local.py                  |  41 +-
 dask/bytes/tests/test_s3.py                     |  53 +-
 dask/bytes/utils.py                             |  82 ++-
 dask/context.py                                 |  76 +--
 dask/dataframe/categorical.py                   |   7 +-
 dask/dataframe/core.py                          | 177 ++++---
 dask/dataframe/groupby.py                       |  43 +-
 dask/dataframe/io/csv.py                        |  69 ++-
 dask/dataframe/io/hdf.py                        |   4 +-
 dask/dataframe/io/io.py                         |  10 +-
 dask/dataframe/io/parquet.py                    | 573 ++++++++++++---------
 dask/dataframe/io/tests/test_csv.py             |  64 ++-
 dask/dataframe/io/tests/test_parquet.py         | 583 +++++++++++----------
 dask/dataframe/methods.py                       |   4 +-
 dask/dataframe/multi.py                         |  23 +-
 dask/dataframe/optimize.py                      |  14 +-
 dask/dataframe/partitionquantiles.py            |  12 +-
 dask/dataframe/rolling.py                       |   2 +-
 dask/dataframe/shuffle.py                       |  24 +-
 dask/dataframe/tests/test_dataframe.py          | 116 +++--
 dask/dataframe/tests/test_format.py             |  72 +--
 dask/dataframe/tests/test_groupby.py            |  54 +-
 dask/dataframe/tests/test_multi.py              |  16 +-
 dask/dataframe/tests/test_optimize_dataframe.py |   6 +-
 dask/dataframe/tests/test_shuffle.py            |  79 ++-
 dask/dataframe/tests/test_utils_dataframe.py    |  36 +-
 dask/dataframe/tseries/resample.py              |   2 +-
 dask/dataframe/utils.py                         |   2 +-
 dask/delayed.py                                 |  57 ++-
 dask/diagnostics/profile.py                     |  22 +-
 dask/diagnostics/profile_visualize.py           |  31 +-
 dask/diagnostics/tests/test_profiler.py         |  35 +-
 dask/optimize.py                                |   7 +-
 dask/tests/test_base.py                         | 118 ++++-
 dask/tests/test_context.py                      |  40 +-
 dask/tests/test_delayed.py                      |  57 ++-
 dask/utils.py                                   | 115 ++---
 docs/release-procedure.md                       |   5 +-
 docs/source/array-api.rst                       |  16 +-
 docs/source/changelog.rst                       |  85 +++-
 docs/source/custom-collections.rst              | 581 +++++++++++++++++++++
 docs/source/examples/bag-word-count-hdfs.rst    |   8 +-
 docs/source/futures.rst                         |   2 +-
 docs/source/index.rst                           |   2 +
 docs/source/machine-learning.rst                | 104 +---
 setup.cfg                                       |   4 +-
 setup.py                                        |   5 +-
 77 files changed, 3767 insertions(+), 1751 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index fa7761c..b5e5e38 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -49,8 +49,9 @@ jobs:
 
     - env: &py36_env
       - PYTHON=3.6
-      - NUMPY=1.13.0
-      - PANDAS=0.20.2
+      - UPSTREAM_DEV=1  # Install nightly versions of NumPy and pandas
+      - NUMPY=1.13.0    # these are overridden later
+      - PANDAS=0.20.3
       - *no_coverage
       - *no_optimize
       - *imports
@@ -69,7 +70,6 @@ install:
 script:
   - source continuous_integration/travis/run_tests.sh
   - flake8 dask
-  - pycodestyle --select=E722 dask  # check for bare `except:` blocks
   - if [[ $TEST_IMPORTS == 'true' ]]; then source continuous_integration/travis/test_imports.sh; fi
 
 after_success:
diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh
index 21024ae..d79813b 100644
--- a/continuous_integration/travis/install.sh
+++ b/continuous_integration/travis/install.sh
@@ -22,8 +22,11 @@ source activate test-environment
 # Pin matrix items
 # Please see PR ( https://github.com/dask/dask/pull/2185 ) for details.
 touch $CONDA_PREFIX/conda-meta/pinned
-echo "numpy $NUMPY" >> $CONDA_PREFIX/conda-meta/pinned
-echo "pandas $PANDAS" >> $CONDA_PREFIX/conda-meta/pinned
+if ! [[ ${UPSTREAM_DEV} ]]; then
+    echo "Pinning NumPy $NUMPY, pandas $PANDAS"
+    echo "numpy $NUMPY" >> $CONDA_PREFIX/conda-meta/pinned
+    echo "pandas $PANDAS" >> $CONDA_PREFIX/conda-meta/pinned
+fi;
 
 # Install dependencies.
 conda install -q -c conda-forge \
@@ -50,39 +53,44 @@ conda install -q -c conda-forge \
     sqlalchemy \
     toolz
 
+if [[ ${UPSTREAM_DEV} ]]; then
+    echo "Installing NumPy and Pandas dev"
+    conda uninstall -y --force numpy pandas
+    PRE_WHEELS="https://7933911d6844c6c53a7d-47bd50c35cd79bd838daf386af554a83.ssl.cf2.rackcdn.com"
+    pip install -q --pre --no-deps --upgrade --timeout=60 -f $PRE_WHEELS numpy pandas
+fi;
+
 # install pytables from defaults for now
 conda install -q pytables
 
-pip install -q git+https://github.com/dask/partd --upgrade --no-deps
-pip install -q git+https://github.com/dask/zict --upgrade --no-deps
-pip install -q git+https://github.com/dask/distributed --upgrade --no-deps
-pip install -q git+https://github.com/mrocklin/sparse --upgrade --no-deps
-pip install -q git+https://github.com/dask/s3fs --upgrade --no-deps
+pip install -q --upgrade --no-deps git+https://github.com/dask/partd
+pip install -q --upgrade --no-deps git+https://github.com/dask/zict
+pip install -q --upgrade --no-deps git+https://github.com/dask/distributed
+pip install -q --upgrade --no-deps git+https://github.com/mrocklin/sparse
+pip install -q --upgrade --no-deps git+https://github.com/dask/s3fs
 
-if [[ $PYTHONOPTIMIZE != '2' ]] && [[ $NUMPY > '1.11.0' ]] && [[ $NUMPY < '1.13.0' ]]; then
-    conda install -q -c conda-forge numba cython
-    pip install -q git+https://github.com/dask/fastparquet
+if [[ $PYTHONOPTIMIZE != '2' ]] && [[ $NUMPY > '1.11.0' ]]; then
+    conda install -q -c conda-forge fastparquet python-snappy
+    pip install -q --no-deps git+https://github.com/dask/fastparquet
 fi
 
 if [[ $PYTHON == '2.7' ]]; then
-    pip install -q backports.lzma mock
+    pip install -q --no-deps backports.lzma mock
 fi
 
-pip install -q \
+pip install -q --upgrade --no-deps \
     cachey \
     graphviz \
-    moto \
     pyarrow \
-    --upgrade --no-deps
+    pandas_datareader
 
-pip install -q \
+pip install -q --upgrade \
     cityhash \
     flake8 \
+    moto \
     mmh3 \
-    pandas_datareader \
     pytest-xdist \
-    xxhash \
-    pycodestyle
+    xxhash
 
 # Install dask
 pip install -q --no-deps -e .[complete]
diff --git a/dask/__init__.py b/dask/__init__.py
index d81af59..9b362f8 100644
--- a/dask/__init__.py
+++ b/dask/__init__.py
@@ -8,14 +8,10 @@ try:
 except ImportError:
     pass
 try:
-    from .base import visualize, compute, persist
+    from .base import visualize, compute, persist, is_dask_collection
 except ImportError:
     pass
 
-# dask.async is deprecated. For now we import it to the top namespace to be
-# compatible with prior releases. This should be removed in a future release:
-import dask.async
-
 from ._version import get_versions
 versions = get_versions()
 __version__ = versions['version']
diff --git a/dask/array/__init__.py b/dask/array/__init__.py
index 9155423..0dfc8d5 100644
--- a/dask/array/__init__.py
+++ b/dask/array/__init__.py
@@ -9,8 +9,9 @@ from .routines import (take, choose, argwhere, where, coarsen, insert,
                        bincount, digitize, histogram, cov, array, dstack,
                        vstack, hstack, compress, extract, round, count_nonzero,
                        flatnonzero, nonzero, around, isnull, notnull, isclose,
-                       corrcoef, swapaxes, tensordot, transpose, dot,
-                       apply_along_axis, apply_over_axes, result_type)
+                       allclose, corrcoef, swapaxes, tensordot, transpose, dot,
+                       apply_along_axis, apply_over_axes, result_type,
+                       atleast_1d, atleast_2d, atleast_3d)
 from .reshape import reshape
 from .ufunc import (add, subtract, multiply, divide, logaddexp, logaddexp2,
         true_divide, floor_divide, negative, power, remainder, mod, conj, exp,
diff --git a/dask/array/chunk.py b/dask/array/chunk.py
index 7bc6c72..d38fe66 100644
--- a/dask/array/chunk.py
+++ b/dask/array/chunk.py
@@ -195,3 +195,12 @@ def arange(start, stop, step, length, dtype):
 
 def astype(x, astype_dtype=None, **kwargs):
     return x.astype(astype_dtype, **kwargs)
+
+
+def view(x, dtype, order='C'):
+    if order == 'C':
+        x = np.ascontiguousarray(x)
+        return x.view(dtype)
+    else:
+        x = np.asfortranarray(x)
+        return x.T.view(dtype).T
diff --git a/dask/array/core.py b/dask/array/core.py
index de1df97..ade0d53 100644
--- a/dask/array/core.py
+++ b/dask/array/core.py
@@ -31,8 +31,8 @@ import numpy as np
 
 from . import chunk
 from .slicing import slice_array, replace_ellipsis
-from ..base import Base, tokenize, normalize_token
-from ..context import _globals
+from ..base import Base, tokenize, dont_optimize, compute_as_if_collection
+from ..context import _globals, globalmethod
 from ..utils import (homogeneous_deepmap, ndeepmap, ignoring, concrete,
                      is_integer, IndexCallable, funcname, derived_from,
                      SerializableLock, ensure_dict, Dispatch)
@@ -897,7 +897,7 @@ def store(sources, targets, lock=True, regions=None, compute=True, **kwargs):
     name = 'store-' + tokenize(*keys)
     dsk = sharedict.merge((name, updates), *[src.dask for src in sources])
     if compute:
-        Array._get(dsk, keys, **kwargs)
+        compute_as_if_collection(Array, dsk, keys, **kwargs)
     else:
         from ..delayed import Delayed
         dsk.update({name: keys})
@@ -977,10 +977,6 @@ class Array(Base):
     """
     __slots__ = 'dask', '_name', '_cached_keys', '_chunks', 'dtype'
 
-    _optimize = staticmethod(optimize)
-    _default_get = staticmethod(threaded.get)
-    _finalize = staticmethod(finalize)
-
     def __new__(cls, dask, name, chunks, dtype, shape=None):
         self = super(Array, cls).__new__(cls)
         assert isinstance(dask, Mapping)
@@ -1007,6 +1003,41 @@ class Array(Base):
     def __reduce__(self):
         return (Array, (self.dask, self.name, self.chunks, self.dtype))
 
+    def __dask_graph__(self):
+        return self.dask
+
+    def __dask_keys__(self):
+        if self._cached_keys is not None:
+            return self._cached_keys
+
+        name, chunks, numblocks = self.name, self.chunks, self.numblocks
+
+        def keys(*args):
+            if not chunks:
+                return [(name,)]
+            ind = len(args)
+            if ind + 1 == len(numblocks):
+                result = [(name,) + args + (i,) for i in range(numblocks[ind])]
+            else:
+                result = [keys(*(args + (i,))) for i in range(numblocks[ind])]
+            return result
+
+        self._cached_keys = result = keys()
+        return result
+
+    def __dask_tokenize__(self):
+        return self.name
+
+    __dask_optimize__ = globalmethod(optimize, key='array_optimize',
+                                     falsey=dont_optimize)
+    __dask_scheduler__ = staticmethod(threaded.get)
+
+    def __dask_postcompute__(self):
+        return finalize, ()
+
+    def __dask_postpersist__(self):
+        return Array, (self.name, self.chunks, self.dtype)
+
     @property
     def numblocks(self):
         return tuple(map(len, self.chunks))
@@ -1101,24 +1132,6 @@ class Array(Base):
         # Clear the key cache when the name is reset
         self._cached_keys = None
 
-    def _keys(self, *args):
-        if not args:
-            if self._cached_keys is not None:
-                return self._cached_keys
-
-        if not self.chunks:
-            return [(self.name,)]
-        ind = len(args)
-        if ind + 1 == self.ndim:
-            result = [(self.name,) + args + (i,)
-                      for i in range(self.numblocks[ind])]
-        else:
-            result = [self._keys(*(args + (i,)))
-                      for i in range(self.numblocks[ind])]
-        if not args:
-            self._cached_keys = result
-        return result
-
     __array_priority__ = 11  # higher than numpy.ndarray and numpy.matrix
 
     def __array__(self, dtype=None, **kwargs):
@@ -1737,20 +1750,16 @@ class Array(Base):
         mult = self.dtype.itemsize / dtype.itemsize
 
         if order == 'C':
-            ascontiguousarray = np.ascontiguousarray
             chunks = self.chunks[:-1] + (tuple(ensure_int(c * mult)
                                          for c in self.chunks[-1]),)
         elif order == 'F':
-            ascontiguousarray = np.asfortranarray
             chunks = ((tuple(ensure_int(c * mult) for c in self.chunks[0]), ) +
                       self.chunks[1:])
         else:
             raise ValueError("Order must be one of 'C' or 'F'")
 
-        out = elemwise(ascontiguousarray, self, dtype=self.dtype)
-        out = elemwise(np.ndarray.view, out, dtype, dtype=dtype)
-        out._chunks = chunks
-        return out
+        return self.map_blocks(chunk.view, dtype, order=order,
+                               dtype=dtype, chunks=chunks)
 
     @derived_from(np.ndarray)
     def swapaxes(self, axis1, axis2):
@@ -1783,8 +1792,9 @@ class Array(Base):
         dask.array.from_delayed
         """
         from ..delayed import Delayed
-        dsk = self._optimize(self.dask, self._keys())
-        L = ndeepmap(self.ndim, lambda k: Delayed(k, dsk), self._keys())
+        keys = self.__dask_keys__()
+        dsk = self.__dask_optimize__(self.__dask_graph__(), keys)
+        L = ndeepmap(self.ndim, lambda k: Delayed(k, dsk), keys)
         return np.array(L, dtype=object)
 
     @derived_from(np.ndarray)
@@ -1805,9 +1815,6 @@ def ensure_int(f):
     return i
 
 
-normalize_token.register(Array, lambda a: a.name)
-
-
 def normalize_chunks(chunks, shape=None):
     """ Normalize chunks to tuple of tuples
 
@@ -2397,44 +2404,80 @@ def insert_to_ooc(out, arr, lock=True, region=None):
     slices = slices_from_chunks(arr.chunks)
 
     name = 'store-%s' % arr.name
-    dsk = dict(((name,) + t[1:], (store, out, t, slc, lock, region))
-               for t, slc in zip(core.flatten(arr._keys()), slices))
+    dsk = {(name,) + t[1:]: (store, out, t, slc, lock, region)
+           for t, slc in zip(core.flatten(arr.__dask_keys__()), slices)}
 
     return dsk
 
 
-def asarray(array):
-    """Coerce argument into a dask array
+def asarray(a):
+    """Convert the input to a dask array.
+
+    Parameters
+    ----------
+    a : array-like
+        Input data, in any form that can be converted to a dask array.
+
+    Returns
+    -------
+    out : dask array
+        Dask array interpretation of a.
 
     Examples
     --------
+    >>> import dask.array as da
+    >>> import numpy as np
     >>> x = np.arange(3)
-    >>> asarray(x)
+    >>> da.asarray(x)
     dask.array<array, shape=(3,), dtype=int64, chunksize=(3,)>
+
+    >>> y = [[1, 2, 3], [4, 5, 6]]
+    >>> da.asarray(y)
+    dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3)>
     """
-    if isinstance(array, Array):
-        return array
-    if not isinstance(getattr(array, 'shape', None), Iterable):
-        array = np.asarray(array)
-    return from_array(array, chunks=array.shape, getitem=getter_inline)
+    if isinstance(a, Array):
+        return a
+    if isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
+        a = stack(a)
+    elif not isinstance(getattr(a, 'shape', None), Iterable):
+        a = np.asarray(a)
+    return from_array(a, chunks=a.shape, getitem=getter_inline)
 
 
-def asanyarray(array):
-    """Coerce argument into a dask array.
+def asanyarray(a):
+    """Convert the input to a dask array.
 
-    Subclasses of `np.ndarray` will be passed through as chunks unchanged.
+    Subclasses of ``np.ndarray`` will be passed through as chunks unchanged.
+
+    Parameters
+    ----------
+    a : array-like
+        Input data, in any form that can be converted to a dask array.
+
+    Returns
+    -------
+    out : dask array
+        Dask array interpretation of a.
 
     Examples
     --------
+    >>> import dask.array as da
+    >>> import numpy as np
     >>> x = np.arange(3)
-    >>> asanyarray(x)
+    >>> da.asanyarray(x)
     dask.array<array, shape=(3,), dtype=int64, chunksize=(3,)>
+
+    >>> y = [[1, 2, 3], [4, 5, 6]]
+    >>> da.asanyarray(y)
+    dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3)>
     """
-    if isinstance(array, Array):
-        return array
-    if not isinstance(getattr(array, 'shape', None), Iterable):
-        array = np.asanyarray(array)
-    return from_array(array, chunks=array.shape, getitem=getter_inline,
+    if isinstance(a, Array):
+        return a
+    if isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
+        a = stack(a)
+    elif not isinstance(getattr(a, 'shape', None), Iterable):
+        a = np.asanyarray(a)
+    return from_array(a, chunks=a.shape, getitem=getter_inline,
                       asarray=False)
 
 
@@ -2486,8 +2529,8 @@ def broadcast_shapes(*shapes):
         return shapes[0]
     out = []
     for sizes in zip_longest(*map(reversed, shapes), fillvalue=-1):
-        dim = max(sizes)
-        if any(i != -1 and i != 1 and i != dim and not np.isnan(i) for i in sizes):
+        dim = 0 if 0 in sizes else max(sizes)
+        if any(i not in [-1, 0, 1, dim] and not np.isnan(i) for i in sizes):
             raise ValueError("operands could not be broadcast together with "
                              "shapes {0}".format(' '.join(map(str, shapes))))
         out.append(dim)
@@ -2644,10 +2687,10 @@ def broadcast_to(x, shape):
     chunks = (tuple((s,) for s in shape[:ndim_new]) +
               tuple(bd if old > 1 else (new,)
               for bd, old, new in zip(x.chunks, x.shape, shape[ndim_new:])))
-    dsk = dict(((name,) + (0,) * ndim_new + key[1:],
-                (chunk.broadcast_to, key, shape[:ndim_new] +
-                 tuple(bd[i] for i, bd in zip(key[1:], chunks[ndim_new:]))))
-               for key in core.flatten(x._keys()))
+    dsk = {(name,) + (0,) * ndim_new + key[1:]:
+           (chunk.broadcast_to, key, shape[:ndim_new] +
+            tuple(bd[i] for i, bd in zip(key[1:], chunks[ndim_new:])))
+           for key in core.flatten(x.__dask_keys__())}
     return Array(sharedict.merge((name, dsk), x.dask), name, chunks, dtype=x.dtype)
 
 
@@ -3191,10 +3234,10 @@ def to_npy_stack(dirname, x, axis=0):
         pickle.dump(meta, f)
 
     name = 'to-npy-stack-' + str(uuid.uuid1())
-    dsk = dict(((name, i), (np.save, os.path.join(dirname, '%d.npy' % i), key))
-               for i, key in enumerate(core.flatten(xx._keys())))
+    dsk = {(name, i): (np.save, os.path.join(dirname, '%d.npy' % i), key)
+           for i, key in enumerate(core.flatten(xx.__dask_keys__()))}
 
-    Array._get(sharedict.merge(dsk, xx.dask), list(dsk))
+    compute_as_if_collection(Array, sharedict.merge(dsk, xx.dask), list(dsk))
 
 
 def from_npy_stack(dirname, mmap_mode='r'):
diff --git a/dask/array/creation.py b/dask/array/creation.py
index f1a2d78..6fa2c17 100644
--- a/dask/array/creation.py
+++ b/dask/array/creation.py
@@ -427,14 +427,14 @@ def diag(v):
                         "got {0}".format(type(v)))
     if v.ndim != 1:
         if v.chunks[0] == v.chunks[1]:
-            dsk = dict(((name, i), (np.diag, row[i])) for (i, row)
-                       in enumerate(v._keys()))
+            dsk = {(name, i): (np.diag, row[i])
+                   for i, row in enumerate(v.__dask_keys__())}
             return Array(sharedict.merge(v.dask, (name, dsk)), name, (v.chunks[0],), dtype=v.dtype)
         else:
             raise NotImplementedError("Extracting diagonals from non-square "
                                       "chunked arrays")
     chunks_1d = v.chunks[0]
-    blocks = v._keys()
+    blocks = v.__dask_keys__()
     dsk = {}
     for i, m in enumerate(chunks_1d):
         for j, n in enumerate(chunks_1d):
diff --git a/dask/array/ghost.py b/dask/array/ghost.py
index 68d8405..2a8c1a1 100644
--- a/dask/array/ghost.py
+++ b/dask/array/ghost.py
@@ -100,8 +100,8 @@ def ghost_internal(x, axes):
     """
     dims = list(map(len, x.chunks))
     expand_key2 = partial(expand_key, dims=dims)
-    interior_keys = pipe(x._keys(), flatten, map(expand_key2), map(flatten),
-                         concat, list)
+    interior_keys = pipe(x.__dask_keys__(), flatten, map(expand_key2),
+                         map(flatten), concat, list)
 
     token = tokenize(x, axes)
     name = 'ghost-' + token
diff --git a/dask/array/optimization.py b/dask/array/optimization.py
index 2d34be2..bf49a32 100644
--- a/dask/array/optimization.py
+++ b/dask/array/optimization.py
@@ -5,17 +5,20 @@ from operator import getitem
 import numpy as np
 
 from .core import getter, getter_nofancy, getter_inline
-from ..context import defer_to_globals
 from ..compatibility import zip_longest
 from ..core import flatten, reverse_dict
-from ..optimize import cull, fuse, inline_functions, dont_optimize
+from ..optimize import cull, fuse, inline_functions
 from ..utils import ensure_dict
 
 
+# All get* functions the optimizations know about
 GETTERS = (getter, getter_nofancy, getter_inline, getitem)
+# These get* functions aren't ever completely removed from the graph,
+# even if the index should be a no-op by numpy semantics. Some array-like's
+# don't completely follow semantics, making indexing always necessary.
+GETNOREMOVE = (getter, getter_nofancy)
 
 
- at defer_to_globals('array_optimize', falsey=dont_optimize)
 def optimize(dsk, keys, fuse_keys=None, fast_functions=None,
              inline_functions_fast_functions=(getter_inline,), rename_fused_keys=True,
              **kwargs):
@@ -138,8 +141,8 @@ def optimize_slices(dsk):
                 a, a_index, a_lock = b, c_index, b_lock
                 a_asarray |= b_asarray
 
-            # Skip the get call if no asarray call, no lock, and nothing to do
-            if ((not a_asarray and not a_lock) and
+            # Skip the get call if not from from_array and nothing to do
+            if (get not in GETNOREMOVE and
                 ((type(a_index) is slice and not a_index.start and
                   a_index.stop is None and a_index.step is None) or
                  (type(a_index) is tuple and
diff --git a/dask/array/percentile.py b/dask/array/percentile.py
index 1f02dcd..6368231 100644
--- a/dask/array/percentile.py
+++ b/dask/array/percentile.py
@@ -42,7 +42,7 @@ def percentile(a, q, interpolation='linear'):
     token = tokenize(a, list(q), interpolation)
     name = 'percentile_chunk-' + token
     dsk = dict(((name, i), (_percentile, (key), q, interpolation))
-               for i, key in enumerate(a._keys()))
+               for i, key in enumerate(a.__dask_keys__()))
 
     name2 = 'percentile-' + token
     dsk2 = {(name2, 0): (merge_percentiles, q, [q] * len(a.chunks[0]),
diff --git a/dask/array/random.py b/dask/array/random.py
index ad5ef19..745a4e0 100644
--- a/dask/array/random.py
+++ b/dask/array/random.py
@@ -1,11 +1,12 @@
 from __future__ import absolute_import, division, print_function
 
 from itertools import product
+from numbers import Integral
 from operator import getitem
 
 import numpy as np
 
-from .core import (normalize_chunks, Array, slices_from_chunks,
+from .core import (normalize_chunks, Array, slices_from_chunks, asarray,
                    broadcast_shapes, broadcast_to)
 from .. import sharedict
 from ..base import tokenize
@@ -170,8 +171,62 @@ class RandomState(object):
     with ignoring(AttributeError):
         @doc_wraps(np.random.RandomState.choice)
         def choice(self, a, size=None, replace=True, p=None, chunks=None):
-            return self._wrap(np.random.RandomState.choice, a,
-                              size=size, replace=True, p=None, chunks=chunks)
+            dsks = []
+            # Normalize and validate `a`
+            if isinstance(a, Integral):
+                # On windows the output dtype differs if p is provided or
+                # absent, see https://github.com/numpy/numpy/issues/9867
+                dummy_p = np.array([1]) if p is not None else p
+                dtype = np.random.choice(1, size=(), p=dummy_p).dtype
+                len_a = a
+                if a < 0:
+                    raise ValueError("a must be greater than 0")
+            else:
+                a = asarray(a).rechunk(a.shape)
+                dtype = a.dtype
+                if a.ndim != 1:
+                    raise ValueError("a must be one dimensional")
+                len_a = len(a)
+                dsks.append(a.dask)
+                a = a.__dask_keys__()[0]
+
+            # Normalize and validate `p`
+            if p is not None:
+                if not isinstance(p, Array):
+                    # If p is not a dask array, first check the sum is close
+                    # to 1 before converting.
+                    p = np.asarray(p)
+                    if not np.isclose(p.sum(), 1, rtol=1e-7, atol=0):
+                        raise ValueError("probabilities do not sum to 1")
+                    p = asarray(p)
+                else:
+                    p = p.rechunk(p.shape)
+
+                if p.ndim != 1:
+                    raise ValueError("p must be one dimensional")
+                if len(p) != len_a:
+                    raise ValueError("a and p must have the same size")
+
+                dsks.append(p.dask)
+                p = p.__dask_keys__()[0]
+
+            if size is None:
+                size = ()
+            elif not isinstance(size, (tuple, list)):
+                size = (size,)
+
+            chunks = normalize_chunks(chunks, size)
+            sizes = list(product(*chunks))
+            state_data = random_state_data(len(sizes), self._numpy_state)
+
+            name = 'da.random.choice-%s' % tokenize(state_data, size, chunks,
+                                                    a, replace, p)
+            keys = product([name], *(range(len(bd)) for bd in chunks))
+            dsk = {k: (_choice, state, a, size, replace, p) for
+                   k, state, size in zip(keys, state_data, sizes)}
+
+            return Array(sharedict.merge((name, dsk), *dsks),
+                         name, chunks, dtype=dtype)
 
     # @doc_wraps(np.random.RandomState.dirichlet)
     # def dirichlet(self, alpha, size=None, chunks=None):
@@ -352,6 +407,11 @@ class RandomState(object):
                           size=size, chunks=chunks)
 
 
+def _choice(state_data, a, size, replace, p):
+    state = np.random.RandomState(state_data)
+    return state.choice(a, size=size, replace=replace, p=p)
+
+
 def _apply_random(func, state_data, size, args, kwargs):
     """Apply RandomState method with seed"""
     state = np.random.RandomState(state_data)
@@ -368,6 +428,8 @@ seed = _state.seed
 beta = _state.beta
 binomial = _state.binomial
 chisquare = _state.chisquare
+if hasattr(_state, 'choice'):
+    choice = _state.choice
 exponential = _state.exponential
 f = _state.f
 gamma = _state.gamma
diff --git a/dask/array/rechunk.py b/dask/array/rechunk.py
index 8b69009..69e344b 100644
--- a/dask/array/rechunk.py
+++ b/dask/array/rechunk.py
@@ -391,7 +391,7 @@ def find_merge_rechunk(old_chunks, new_chunks, block_size_limit):
         bse = block_size_effect[k]
         if bse == 1:
             bse = 1 + 1e-9
-        return np.log(gse) / np.log(bse)
+        return (np.log(gse) / np.log(bse)) if bse > 0 else 0
 
     sorted_candidates = sorted(merge_candidates, key=key)
 
diff --git a/dask/array/reductions.py b/dask/array/reductions.py
index 3a454b1..918042a 100644
--- a/dask/array/reductions.py
+++ b/dask/array/reductions.py
@@ -1,5 +1,7 @@
 from __future__ import absolute_import, division, print_function
 
+import warnings
+
 from functools import partial, wraps
 from itertools import product, repeat
 from math import factorial, log, ceil
@@ -440,6 +442,12 @@ def vnorm(a, ord=None, axis=None, dtype=None, keepdims=False, split_every=None,
 
     See np.linalg.norm
     """
+
+    warnings.warn(
+        "DeprecationWarning: Please use `dask.array.linalg.norm` instead.",
+        UserWarning
+    )
+
     if ord is None or ord == 'fro':
         ord = 2
     if ord == np.inf:
diff --git a/dask/array/reshape.py b/dask/array/reshape.py
index e830c6d..2a1c3f7 100644
--- a/dask/array/reshape.py
+++ b/dask/array/reshape.py
@@ -170,7 +170,7 @@ def reshape(x, shape):
     name = 'reshape-' + tokenize(x, shape)
 
     if x.npartitions == 1:
-        key = next(flatten(x._keys()))
+        key = next(flatten(x.__dask_keys__()))
         dsk = {(name,) + (0,) * len(shape): (M.reshape, key, shape)}
         chunks = tuple((d,) for d in shape)
         return Array(sharedict.merge((name, dsk), x.dask), name, chunks,
diff --git a/dask/array/routines.py b/dask/array/routines.py
index 61505b6..7495bf5 100644
--- a/dask/array/routines.py
+++ b/dask/array/routines.py
@@ -16,9 +16,11 @@ from .. import sharedict
 from ..core import flatten
 from ..base import tokenize
 from . import numpy_compat, chunk
+from .creation import arange
+from .wrap import ones
 
 from .core import (Array, map_blocks, elemwise, from_array, asarray,
-                   concatenate, stack, atop, broadcast_shapes,
+                   asanyarray, concatenate, stack, atop, broadcast_shapes,
                    is_scalar_for_elemwise, broadcast_to, tensordot_lookup)
 
 
@@ -37,24 +39,58 @@ def result_type(*args):
     return np.result_type(*args)
 
 
-def atleast_3d(x):
-    if x.ndim == 1:
-        return x[None, :, None]
-    elif x.ndim == 2:
-        return x[:, :, None]
-    elif x.ndim > 2:
-        return x
+ at wraps(np.atleast_3d)
+def atleast_3d(*arys):
+    new_arys = []
+    for x in arys:
+        x = asanyarray(x)
+        if x.ndim == 0:
+            x = x[None, None, None]
+        elif x.ndim == 1:
+            x = x[None, :, None]
+        elif x.ndim == 2:
+            x = x[:, :, None]
+
+        new_arys.append(x)
+
+    if len(new_arys) == 1:
+        return new_arys[0]
     else:
-        raise NotImplementedError()
+        return new_arys
+
+
+ at wraps(np.atleast_2d)
+def atleast_2d(*arys):
+    new_arys = []
+    for x in arys:
+        x = asanyarray(x)
+        if x.ndim == 0:
+            x = x[None, None]
+        elif x.ndim == 1:
+            x = x[None, :]
 
+        new_arys.append(x)
 
-def atleast_2d(x):
-    if x.ndim == 1:
-        return x[None, :]
-    elif x.ndim > 1:
-        return x
+    if len(new_arys) == 1:
+        return new_arys[0]
     else:
-        raise NotImplementedError()
+        return new_arys
+
+
+ at wraps(np.atleast_1d)
+def atleast_1d(*arys):
+    new_arys = []
+    for x in arys:
+        x = asanyarray(x)
+        if x.ndim == 0:
+            x = x[None]
+
+        new_arys.append(x)
+
+    if len(new_arys) == 1:
+        return new_arys[0]
+    else:
+        return new_arys
 
 
 @wraps(np.vstack)
@@ -297,13 +333,12 @@ def bincount(x, weights=None, minlength=None):
     token = tokenize(x, weights, minlength)
     name = 'bincount-' + token
     if weights is not None:
-        dsk = dict(((name, i),
-                   (np.bincount, (x.name, i), (weights.name, i), minlength))
-                   for i, _ in enumerate(x._keys()))
+        dsk = {(name, i): (np.bincount, (x.name, i), (weights.name, i), minlength)
+               for i, _ in enumerate(x.__dask_keys__())}
         dtype = np.bincount([1], weights=[1]).dtype
     else:
-        dsk = dict(((name, i), (np.bincount, (x.name, i), None, minlength))
-                   for i, _ in enumerate(x._keys()))
+        dsk = {(name, i): (np.bincount, (x.name, i), None, minlength)
+               for i, _ in enumerate(x.__dask_keys__())}
         dtype = np.bincount([]).dtype
 
     # Sum up all of the intermediate bincounts per block
@@ -383,7 +418,7 @@ def histogram(a, bins=None, range=None, normed=False, weights=None, density=None
         bin_token = bins
     token = tokenize(a, bin_token, range, normed, weights, density)
 
-    nchunks = len(list(flatten(a._keys())))
+    nchunks = len(list(flatten(a.__dask_keys__())))
     chunks = ((1,) * nchunks, (len(bins) - 1,))
 
     name = 'histogram-sum-' + token
@@ -393,14 +428,14 @@ def histogram(a, bins=None, range=None, normed=False, weights=None, density=None
         return np.histogram(x, bins, weights=weights)[0][np.newaxis]
 
     if weights is None:
-        dsk = dict(((name, i, 0), (block_hist, k))
-                   for i, k in enumerate(flatten(a._keys())))
+        dsk = {(name, i, 0): (block_hist, k)
+               for i, k in enumerate(flatten(a.__dask_keys__()))}
         dtype = np.histogram([])[0].dtype
     else:
-        a_keys = flatten(a._keys())
-        w_keys = flatten(weights._keys())
-        dsk = dict(((name, i, 0), (block_hist, k, w))
-                   for i, (k, w) in enumerate(zip(a_keys, w_keys)))
+        a_keys = flatten(a.__dask_keys__())
+        w_keys = flatten(weights.__dask_keys__())
+        dsk = {(name, i, 0): (block_hist, k, w)
+               for i, (k, w) in enumerate(zip(a_keys, w_keys))}
         dtype = weights.dtype
 
     all_dsk = sharedict.merge(a.dask, (name, dsk))
@@ -495,12 +530,173 @@ def round(a, decimals=0):
     return a.map_blocks(np.round, decimals=decimals, dtype=a.dtype)
 
 
+def _unique_internal(ar, indices, counts, return_inverse=False):
+    """
+    Helper/wrapper function for NumPy's ``unique``.
+
+    Uses NumPy's ``unique`` to find the unique values for the array chunk.
+    Given this chunk may not represent the whole array, also take the
+    ``indices`` and ``counts`` that are in 1-to-1 correspondence to ``ar``
+    and reduce them in the same fashion as ``ar`` is reduced. Namely sum
+    any counts that correspond to the same value and take the smallest
+    index that corresponds to the same value.
+
+    To handle the inverse mapping from the unique values to the original
+    array, simply return a NumPy array created with ``arange`` with enough
+    values to correspond 1-to-1 to the unique values. While there is more
+    work needed to be done to create the full inverse mapping for the
+    original array, this provides enough information to generate the
+    inverse mapping in Dask.
+
+    Given Dask likes to have one array returned from functions like
+    ``atop``, some formatting is done to stuff all of the resulting arrays
+    into one big NumPy structured array. Dask is then able to handle this
+    object and can split it apart into the separate results on the Dask
+    side, which then can be passed back to this function in concatenated
+    chunks for further reduction or can be return to the user to perform
+    other forms of analysis.
+
+    By handling the problem in this way, it does not matter where a chunk
+    is in a larger array or how big it is. The chunk can still be computed
+    on the same way. Also it does not matter if the chunk is the result of
+    other chunks being run through this function multiple times. The end
+    result will still be just as accurate using this strategy.
+    """
+
+    return_index = (indices is not None)
+    return_counts = (counts is not None)
+
+    u = np.unique(ar)
+
+    dt = [("values", u.dtype)]
+    if return_index:
+        dt.append(("indices", np.int64))
+    if return_inverse:
+        dt.append(("inverse", np.int64))
+    if return_counts:
+        dt.append(("counts", np.int64))
+
+    r = np.empty(u.shape, dtype=dt)
+    r["values"] = u
+    if return_inverse:
+        r["inverse"] = np.arange(len(r), dtype=np.int64)
+    if return_index or return_counts:
+        for i, v in enumerate(r["values"]):
+            m = (ar == v)
+            if return_index:
+                indices[m].min(keepdims=True, out=r["indices"][i:i + 1])
+            if return_counts:
+                counts[m].sum(keepdims=True, out=r["counts"][i:i + 1])
+
+    return r
+
+
 @wraps(np.unique)
-def unique(x):
-    name = 'unique-' + x.name
-    dsk = dict(((name, i), (np.unique, key)) for i, key in enumerate(x._keys()))
-    parts = Array._get(sharedict.merge((name, dsk), x.dask), list(dsk.keys()))
-    return np.unique(np.concatenate(parts))
+def unique(ar, return_index=False, return_inverse=False, return_counts=False):
+    ar = ar.ravel()
+
+    # Run unique on each chunk and collect results in a Dask Array of
+    # unknown size.
+
+    args = [ar, "i"]
+    out_dtype = [("values", ar.dtype)]
+    if return_index:
+        args.extend([
+            arange(ar.shape[0], dtype=np.int64, chunks=ar.chunks[0]),
+            "i"
+        ])
+        out_dtype.append(("indices", np.int64))
+    else:
+        args.extend([None, None])
+    if return_counts:
+        args.extend([
+            ones((ar.shape[0],), dtype=np.int64, chunks=ar.chunks[0]),
... 7842 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/dask.git



More information about the Python-modules-commits mailing list