[Python-modules-commits] [dask] 01/06: New upstream version 0.16.0
Diane Trout
diane at moszumanska.debian.org
Sat Dec 9 00:20:54 UTC 2017
This is an automated email from the git hooks/post-receive script.
diane pushed a commit to branch master
in repository dask.
commit fb0ff3945cc1fd9f545dd1851f427d58a184f9de
Author: Diane Trout <diane at ghic.org>
Date: Thu Dec 7 16:57:37 2017 -0800
New upstream version 0.16.0
---
.travis.yml | 6 +-
continuous_integration/travis/install.sh | 44 +-
dask/__init__.py | 6 +-
dask/array/__init__.py | 5 +-
dask/array/chunk.py | 9 +
dask/array/core.py | 171 ++++---
dask/array/creation.py | 6 +-
dask/array/ghost.py | 4 +-
dask/array/optimization.py | 13 +-
dask/array/percentile.py | 2 +-
dask/array/random.py | 68 ++-
dask/array/rechunk.py | 2 +-
dask/array/reductions.py | 8 +
dask/array/reshape.py | 2 +-
dask/array/routines.py | 274 ++++++++--
dask/array/slicing.py | 16 +-
dask/array/tests/test_array_core.py | 155 ++++--
dask/array/tests/test_optimization.py | 29 +-
dask/array/tests/test_random.py | 49 ++
dask/array/tests/test_reductions.py | 2 +-
dask/array/tests/test_routines.py | 164 +++++-
dask/array/ufunc.py | 22 +-
dask/array/utils.py | 2 +-
dask/async.py | 19 -
dask/bag/core.py | 191 +++----
dask/bag/tests/test_bag.py | 17 +-
dask/base.py | 640 +++++++++++++-----------
dask/bytes/core.py | 44 +-
dask/bytes/local.py | 5 +
dask/bytes/s3.py | 5 +
dask/bytes/tests/test_local.py | 41 +-
dask/bytes/tests/test_s3.py | 53 +-
dask/bytes/utils.py | 82 ++-
dask/context.py | 76 +--
dask/dataframe/categorical.py | 7 +-
dask/dataframe/core.py | 177 ++++---
dask/dataframe/groupby.py | 43 +-
dask/dataframe/io/csv.py | 69 ++-
dask/dataframe/io/hdf.py | 4 +-
dask/dataframe/io/io.py | 10 +-
dask/dataframe/io/parquet.py | 573 ++++++++++++---------
dask/dataframe/io/tests/test_csv.py | 64 ++-
dask/dataframe/io/tests/test_parquet.py | 583 +++++++++++----------
dask/dataframe/methods.py | 4 +-
dask/dataframe/multi.py | 23 +-
dask/dataframe/optimize.py | 14 +-
dask/dataframe/partitionquantiles.py | 12 +-
dask/dataframe/rolling.py | 2 +-
dask/dataframe/shuffle.py | 24 +-
dask/dataframe/tests/test_dataframe.py | 116 +++--
dask/dataframe/tests/test_format.py | 72 +--
dask/dataframe/tests/test_groupby.py | 54 +-
dask/dataframe/tests/test_multi.py | 16 +-
dask/dataframe/tests/test_optimize_dataframe.py | 6 +-
dask/dataframe/tests/test_shuffle.py | 79 ++-
dask/dataframe/tests/test_utils_dataframe.py | 36 +-
dask/dataframe/tseries/resample.py | 2 +-
dask/dataframe/utils.py | 2 +-
dask/delayed.py | 57 ++-
dask/diagnostics/profile.py | 22 +-
dask/diagnostics/profile_visualize.py | 31 +-
dask/diagnostics/tests/test_profiler.py | 35 +-
dask/optimize.py | 7 +-
dask/tests/test_base.py | 118 ++++-
dask/tests/test_context.py | 40 +-
dask/tests/test_delayed.py | 57 ++-
dask/utils.py | 115 ++---
docs/release-procedure.md | 5 +-
docs/source/array-api.rst | 16 +-
docs/source/changelog.rst | 85 +++-
docs/source/custom-collections.rst | 581 +++++++++++++++++++++
docs/source/examples/bag-word-count-hdfs.rst | 8 +-
docs/source/futures.rst | 2 +-
docs/source/index.rst | 2 +
docs/source/machine-learning.rst | 104 +---
setup.cfg | 4 +-
setup.py | 5 +-
77 files changed, 3767 insertions(+), 1751 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index fa7761c..b5e5e38 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -49,8 +49,9 @@ jobs:
- env: &py36_env
- PYTHON=3.6
- - NUMPY=1.13.0
- - PANDAS=0.20.2
+ - UPSTREAM_DEV=1 # Install nightly versions of NumPy and pandas
+ - NUMPY=1.13.0 # these are overridden later
+ - PANDAS=0.20.3
- *no_coverage
- *no_optimize
- *imports
@@ -69,7 +70,6 @@ install:
script:
- source continuous_integration/travis/run_tests.sh
- flake8 dask
- - pycodestyle --select=E722 dask # check for bare `except:` blocks
- if [[ $TEST_IMPORTS == 'true' ]]; then source continuous_integration/travis/test_imports.sh; fi
after_success:
diff --git a/continuous_integration/travis/install.sh b/continuous_integration/travis/install.sh
index 21024ae..d79813b 100644
--- a/continuous_integration/travis/install.sh
+++ b/continuous_integration/travis/install.sh
@@ -22,8 +22,11 @@ source activate test-environment
# Pin matrix items
# Please see PR ( https://github.com/dask/dask/pull/2185 ) for details.
touch $CONDA_PREFIX/conda-meta/pinned
-echo "numpy $NUMPY" >> $CONDA_PREFIX/conda-meta/pinned
-echo "pandas $PANDAS" >> $CONDA_PREFIX/conda-meta/pinned
+if ! [[ ${UPSTREAM_DEV} ]]; then
+ echo "Pinning NumPy $NUMPY, pandas $PANDAS"
+ echo "numpy $NUMPY" >> $CONDA_PREFIX/conda-meta/pinned
+ echo "pandas $PANDAS" >> $CONDA_PREFIX/conda-meta/pinned
+fi;
# Install dependencies.
conda install -q -c conda-forge \
@@ -50,39 +53,44 @@ conda install -q -c conda-forge \
sqlalchemy \
toolz
+if [[ ${UPSTREAM_DEV} ]]; then
+ echo "Installing NumPy and Pandas dev"
+ conda uninstall -y --force numpy pandas
+ PRE_WHEELS="https://7933911d6844c6c53a7d-47bd50c35cd79bd838daf386af554a83.ssl.cf2.rackcdn.com"
+ pip install -q --pre --no-deps --upgrade --timeout=60 -f $PRE_WHEELS numpy pandas
+fi;
+
# install pytables from defaults for now
conda install -q pytables
-pip install -q git+https://github.com/dask/partd --upgrade --no-deps
-pip install -q git+https://github.com/dask/zict --upgrade --no-deps
-pip install -q git+https://github.com/dask/distributed --upgrade --no-deps
-pip install -q git+https://github.com/mrocklin/sparse --upgrade --no-deps
-pip install -q git+https://github.com/dask/s3fs --upgrade --no-deps
+pip install -q --upgrade --no-deps git+https://github.com/dask/partd
+pip install -q --upgrade --no-deps git+https://github.com/dask/zict
+pip install -q --upgrade --no-deps git+https://github.com/dask/distributed
+pip install -q --upgrade --no-deps git+https://github.com/mrocklin/sparse
+pip install -q --upgrade --no-deps git+https://github.com/dask/s3fs
-if [[ $PYTHONOPTIMIZE != '2' ]] && [[ $NUMPY > '1.11.0' ]] && [[ $NUMPY < '1.13.0' ]]; then
- conda install -q -c conda-forge numba cython
- pip install -q git+https://github.com/dask/fastparquet
+if [[ $PYTHONOPTIMIZE != '2' ]] && [[ $NUMPY > '1.11.0' ]]; then
+ conda install -q -c conda-forge fastparquet python-snappy
+ pip install -q --no-deps git+https://github.com/dask/fastparquet
fi
if [[ $PYTHON == '2.7' ]]; then
- pip install -q backports.lzma mock
+ pip install -q --no-deps backports.lzma mock
fi
-pip install -q \
+pip install -q --upgrade --no-deps \
cachey \
graphviz \
- moto \
pyarrow \
- --upgrade --no-deps
+ pandas_datareader
-pip install -q \
+pip install -q --upgrade \
cityhash \
flake8 \
+ moto \
mmh3 \
- pandas_datareader \
pytest-xdist \
- xxhash \
- pycodestyle
+ xxhash
# Install dask
pip install -q --no-deps -e .[complete]
diff --git a/dask/__init__.py b/dask/__init__.py
index d81af59..9b362f8 100644
--- a/dask/__init__.py
+++ b/dask/__init__.py
@@ -8,14 +8,10 @@ try:
except ImportError:
pass
try:
- from .base import visualize, compute, persist
+ from .base import visualize, compute, persist, is_dask_collection
except ImportError:
pass
-# dask.async is deprecated. For now we import it to the top namespace to be
-# compatible with prior releases. This should be removed in a future release:
-import dask.async
-
from ._version import get_versions
versions = get_versions()
__version__ = versions['version']
diff --git a/dask/array/__init__.py b/dask/array/__init__.py
index 9155423..0dfc8d5 100644
--- a/dask/array/__init__.py
+++ b/dask/array/__init__.py
@@ -9,8 +9,9 @@ from .routines import (take, choose, argwhere, where, coarsen, insert,
bincount, digitize, histogram, cov, array, dstack,
vstack, hstack, compress, extract, round, count_nonzero,
flatnonzero, nonzero, around, isnull, notnull, isclose,
- corrcoef, swapaxes, tensordot, transpose, dot,
- apply_along_axis, apply_over_axes, result_type)
+ allclose, corrcoef, swapaxes, tensordot, transpose, dot,
+ apply_along_axis, apply_over_axes, result_type,
+ atleast_1d, atleast_2d, atleast_3d)
from .reshape import reshape
from .ufunc import (add, subtract, multiply, divide, logaddexp, logaddexp2,
true_divide, floor_divide, negative, power, remainder, mod, conj, exp,
diff --git a/dask/array/chunk.py b/dask/array/chunk.py
index 7bc6c72..d38fe66 100644
--- a/dask/array/chunk.py
+++ b/dask/array/chunk.py
@@ -195,3 +195,12 @@ def arange(start, stop, step, length, dtype):
def astype(x, astype_dtype=None, **kwargs):
return x.astype(astype_dtype, **kwargs)
+
+
+def view(x, dtype, order='C'):
+ if order == 'C':
+ x = np.ascontiguousarray(x)
+ return x.view(dtype)
+ else:
+ x = np.asfortranarray(x)
+ return x.T.view(dtype).T
diff --git a/dask/array/core.py b/dask/array/core.py
index de1df97..ade0d53 100644
--- a/dask/array/core.py
+++ b/dask/array/core.py
@@ -31,8 +31,8 @@ import numpy as np
from . import chunk
from .slicing import slice_array, replace_ellipsis
-from ..base import Base, tokenize, normalize_token
-from ..context import _globals
+from ..base import Base, tokenize, dont_optimize, compute_as_if_collection
+from ..context import _globals, globalmethod
from ..utils import (homogeneous_deepmap, ndeepmap, ignoring, concrete,
is_integer, IndexCallable, funcname, derived_from,
SerializableLock, ensure_dict, Dispatch)
@@ -897,7 +897,7 @@ def store(sources, targets, lock=True, regions=None, compute=True, **kwargs):
name = 'store-' + tokenize(*keys)
dsk = sharedict.merge((name, updates), *[src.dask for src in sources])
if compute:
- Array._get(dsk, keys, **kwargs)
+ compute_as_if_collection(Array, dsk, keys, **kwargs)
else:
from ..delayed import Delayed
dsk.update({name: keys})
@@ -977,10 +977,6 @@ class Array(Base):
"""
__slots__ = 'dask', '_name', '_cached_keys', '_chunks', 'dtype'
- _optimize = staticmethod(optimize)
- _default_get = staticmethod(threaded.get)
- _finalize = staticmethod(finalize)
-
def __new__(cls, dask, name, chunks, dtype, shape=None):
self = super(Array, cls).__new__(cls)
assert isinstance(dask, Mapping)
@@ -1007,6 +1003,41 @@ class Array(Base):
def __reduce__(self):
return (Array, (self.dask, self.name, self.chunks, self.dtype))
+ def __dask_graph__(self):
+ return self.dask
+
+ def __dask_keys__(self):
+ if self._cached_keys is not None:
+ return self._cached_keys
+
+ name, chunks, numblocks = self.name, self.chunks, self.numblocks
+
+ def keys(*args):
+ if not chunks:
+ return [(name,)]
+ ind = len(args)
+ if ind + 1 == len(numblocks):
+ result = [(name,) + args + (i,) for i in range(numblocks[ind])]
+ else:
+ result = [keys(*(args + (i,))) for i in range(numblocks[ind])]
+ return result
+
+ self._cached_keys = result = keys()
+ return result
+
+ def __dask_tokenize__(self):
+ return self.name
+
+ __dask_optimize__ = globalmethod(optimize, key='array_optimize',
+ falsey=dont_optimize)
+ __dask_scheduler__ = staticmethod(threaded.get)
+
+ def __dask_postcompute__(self):
+ return finalize, ()
+
+ def __dask_postpersist__(self):
+ return Array, (self.name, self.chunks, self.dtype)
+
@property
def numblocks(self):
return tuple(map(len, self.chunks))
@@ -1101,24 +1132,6 @@ class Array(Base):
# Clear the key cache when the name is reset
self._cached_keys = None
- def _keys(self, *args):
- if not args:
- if self._cached_keys is not None:
- return self._cached_keys
-
- if not self.chunks:
- return [(self.name,)]
- ind = len(args)
- if ind + 1 == self.ndim:
- result = [(self.name,) + args + (i,)
- for i in range(self.numblocks[ind])]
- else:
- result = [self._keys(*(args + (i,)))
- for i in range(self.numblocks[ind])]
- if not args:
- self._cached_keys = result
- return result
-
__array_priority__ = 11 # higher than numpy.ndarray and numpy.matrix
def __array__(self, dtype=None, **kwargs):
@@ -1737,20 +1750,16 @@ class Array(Base):
mult = self.dtype.itemsize / dtype.itemsize
if order == 'C':
- ascontiguousarray = np.ascontiguousarray
chunks = self.chunks[:-1] + (tuple(ensure_int(c * mult)
for c in self.chunks[-1]),)
elif order == 'F':
- ascontiguousarray = np.asfortranarray
chunks = ((tuple(ensure_int(c * mult) for c in self.chunks[0]), ) +
self.chunks[1:])
else:
raise ValueError("Order must be one of 'C' or 'F'")
- out = elemwise(ascontiguousarray, self, dtype=self.dtype)
- out = elemwise(np.ndarray.view, out, dtype, dtype=dtype)
- out._chunks = chunks
- return out
+ return self.map_blocks(chunk.view, dtype, order=order,
+ dtype=dtype, chunks=chunks)
@derived_from(np.ndarray)
def swapaxes(self, axis1, axis2):
@@ -1783,8 +1792,9 @@ class Array(Base):
dask.array.from_delayed
"""
from ..delayed import Delayed
- dsk = self._optimize(self.dask, self._keys())
- L = ndeepmap(self.ndim, lambda k: Delayed(k, dsk), self._keys())
+ keys = self.__dask_keys__()
+ dsk = self.__dask_optimize__(self.__dask_graph__(), keys)
+ L = ndeepmap(self.ndim, lambda k: Delayed(k, dsk), keys)
return np.array(L, dtype=object)
@derived_from(np.ndarray)
@@ -1805,9 +1815,6 @@ def ensure_int(f):
return i
-normalize_token.register(Array, lambda a: a.name)
-
-
def normalize_chunks(chunks, shape=None):
""" Normalize chunks to tuple of tuples
@@ -2397,44 +2404,80 @@ def insert_to_ooc(out, arr, lock=True, region=None):
slices = slices_from_chunks(arr.chunks)
name = 'store-%s' % arr.name
- dsk = dict(((name,) + t[1:], (store, out, t, slc, lock, region))
- for t, slc in zip(core.flatten(arr._keys()), slices))
+ dsk = {(name,) + t[1:]: (store, out, t, slc, lock, region)
+ for t, slc in zip(core.flatten(arr.__dask_keys__()), slices)}
return dsk
-def asarray(array):
- """Coerce argument into a dask array
+def asarray(a):
+ """Convert the input to a dask array.
+
+ Parameters
+ ----------
+ a : array-like
+ Input data, in any form that can be converted to a dask array.
+
+ Returns
+ -------
+ out : dask array
+ Dask array interpretation of a.
Examples
--------
+ >>> import dask.array as da
+ >>> import numpy as np
>>> x = np.arange(3)
- >>> asarray(x)
+ >>> da.asarray(x)
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,)>
+
+ >>> y = [[1, 2, 3], [4, 5, 6]]
+ >>> da.asarray(y)
+ dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3)>
"""
- if isinstance(array, Array):
- return array
- if not isinstance(getattr(array, 'shape', None), Iterable):
- array = np.asarray(array)
- return from_array(array, chunks=array.shape, getitem=getter_inline)
+ if isinstance(a, Array):
+ return a
+ if isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
+ a = stack(a)
+ elif not isinstance(getattr(a, 'shape', None), Iterable):
+ a = np.asarray(a)
+ return from_array(a, chunks=a.shape, getitem=getter_inline)
-def asanyarray(array):
- """Coerce argument into a dask array.
+def asanyarray(a):
+ """Convert the input to a dask array.
- Subclasses of `np.ndarray` will be passed through as chunks unchanged.
+ Subclasses of ``np.ndarray`` will be passed through as chunks unchanged.
+
+ Parameters
+ ----------
+ a : array-like
+ Input data, in any form that can be converted to a dask array.
+
+ Returns
+ -------
+ out : dask array
+ Dask array interpretation of a.
Examples
--------
+ >>> import dask.array as da
+ >>> import numpy as np
>>> x = np.arange(3)
- >>> asanyarray(x)
+ >>> da.asanyarray(x)
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,)>
+
+ >>> y = [[1, 2, 3], [4, 5, 6]]
+ >>> da.asanyarray(y)
+ dask.array<array, shape=(2, 3), dtype=int64, chunksize=(2, 3)>
"""
- if isinstance(array, Array):
- return array
- if not isinstance(getattr(array, 'shape', None), Iterable):
- array = np.asanyarray(array)
- return from_array(array, chunks=array.shape, getitem=getter_inline,
+ if isinstance(a, Array):
+ return a
+ if isinstance(a, (list, tuple)) and any(isinstance(i, Array) for i in a):
+ a = stack(a)
+ elif not isinstance(getattr(a, 'shape', None), Iterable):
+ a = np.asanyarray(a)
+ return from_array(a, chunks=a.shape, getitem=getter_inline,
asarray=False)
@@ -2486,8 +2529,8 @@ def broadcast_shapes(*shapes):
return shapes[0]
out = []
for sizes in zip_longest(*map(reversed, shapes), fillvalue=-1):
- dim = max(sizes)
- if any(i != -1 and i != 1 and i != dim and not np.isnan(i) for i in sizes):
+ dim = 0 if 0 in sizes else max(sizes)
+ if any(i not in [-1, 0, 1, dim] and not np.isnan(i) for i in sizes):
raise ValueError("operands could not be broadcast together with "
"shapes {0}".format(' '.join(map(str, shapes))))
out.append(dim)
@@ -2644,10 +2687,10 @@ def broadcast_to(x, shape):
chunks = (tuple((s,) for s in shape[:ndim_new]) +
tuple(bd if old > 1 else (new,)
for bd, old, new in zip(x.chunks, x.shape, shape[ndim_new:])))
- dsk = dict(((name,) + (0,) * ndim_new + key[1:],
- (chunk.broadcast_to, key, shape[:ndim_new] +
- tuple(bd[i] for i, bd in zip(key[1:], chunks[ndim_new:]))))
- for key in core.flatten(x._keys()))
+ dsk = {(name,) + (0,) * ndim_new + key[1:]:
+ (chunk.broadcast_to, key, shape[:ndim_new] +
+ tuple(bd[i] for i, bd in zip(key[1:], chunks[ndim_new:])))
+ for key in core.flatten(x.__dask_keys__())}
return Array(sharedict.merge((name, dsk), x.dask), name, chunks, dtype=x.dtype)
@@ -3191,10 +3234,10 @@ def to_npy_stack(dirname, x, axis=0):
pickle.dump(meta, f)
name = 'to-npy-stack-' + str(uuid.uuid1())
- dsk = dict(((name, i), (np.save, os.path.join(dirname, '%d.npy' % i), key))
- for i, key in enumerate(core.flatten(xx._keys())))
+ dsk = {(name, i): (np.save, os.path.join(dirname, '%d.npy' % i), key)
+ for i, key in enumerate(core.flatten(xx.__dask_keys__()))}
- Array._get(sharedict.merge(dsk, xx.dask), list(dsk))
+ compute_as_if_collection(Array, sharedict.merge(dsk, xx.dask), list(dsk))
def from_npy_stack(dirname, mmap_mode='r'):
diff --git a/dask/array/creation.py b/dask/array/creation.py
index f1a2d78..6fa2c17 100644
--- a/dask/array/creation.py
+++ b/dask/array/creation.py
@@ -427,14 +427,14 @@ def diag(v):
"got {0}".format(type(v)))
if v.ndim != 1:
if v.chunks[0] == v.chunks[1]:
- dsk = dict(((name, i), (np.diag, row[i])) for (i, row)
- in enumerate(v._keys()))
+ dsk = {(name, i): (np.diag, row[i])
+ for i, row in enumerate(v.__dask_keys__())}
return Array(sharedict.merge(v.dask, (name, dsk)), name, (v.chunks[0],), dtype=v.dtype)
else:
raise NotImplementedError("Extracting diagonals from non-square "
"chunked arrays")
chunks_1d = v.chunks[0]
- blocks = v._keys()
+ blocks = v.__dask_keys__()
dsk = {}
for i, m in enumerate(chunks_1d):
for j, n in enumerate(chunks_1d):
diff --git a/dask/array/ghost.py b/dask/array/ghost.py
index 68d8405..2a8c1a1 100644
--- a/dask/array/ghost.py
+++ b/dask/array/ghost.py
@@ -100,8 +100,8 @@ def ghost_internal(x, axes):
"""
dims = list(map(len, x.chunks))
expand_key2 = partial(expand_key, dims=dims)
- interior_keys = pipe(x._keys(), flatten, map(expand_key2), map(flatten),
- concat, list)
+ interior_keys = pipe(x.__dask_keys__(), flatten, map(expand_key2),
+ map(flatten), concat, list)
token = tokenize(x, axes)
name = 'ghost-' + token
diff --git a/dask/array/optimization.py b/dask/array/optimization.py
index 2d34be2..bf49a32 100644
--- a/dask/array/optimization.py
+++ b/dask/array/optimization.py
@@ -5,17 +5,20 @@ from operator import getitem
import numpy as np
from .core import getter, getter_nofancy, getter_inline
-from ..context import defer_to_globals
from ..compatibility import zip_longest
from ..core import flatten, reverse_dict
-from ..optimize import cull, fuse, inline_functions, dont_optimize
+from ..optimize import cull, fuse, inline_functions
from ..utils import ensure_dict
+# All get* functions the optimizations know about
GETTERS = (getter, getter_nofancy, getter_inline, getitem)
+# These get* functions aren't ever completely removed from the graph,
+# even if the index should be a no-op by numpy semantics. Some array-like's
+# don't completely follow semantics, making indexing always necessary.
+GETNOREMOVE = (getter, getter_nofancy)
- at defer_to_globals('array_optimize', falsey=dont_optimize)
def optimize(dsk, keys, fuse_keys=None, fast_functions=None,
inline_functions_fast_functions=(getter_inline,), rename_fused_keys=True,
**kwargs):
@@ -138,8 +141,8 @@ def optimize_slices(dsk):
a, a_index, a_lock = b, c_index, b_lock
a_asarray |= b_asarray
- # Skip the get call if no asarray call, no lock, and nothing to do
- if ((not a_asarray and not a_lock) and
+ # Skip the get call if not from from_array and nothing to do
+ if (get not in GETNOREMOVE and
((type(a_index) is slice and not a_index.start and
a_index.stop is None and a_index.step is None) or
(type(a_index) is tuple and
diff --git a/dask/array/percentile.py b/dask/array/percentile.py
index 1f02dcd..6368231 100644
--- a/dask/array/percentile.py
+++ b/dask/array/percentile.py
@@ -42,7 +42,7 @@ def percentile(a, q, interpolation='linear'):
token = tokenize(a, list(q), interpolation)
name = 'percentile_chunk-' + token
dsk = dict(((name, i), (_percentile, (key), q, interpolation))
- for i, key in enumerate(a._keys()))
+ for i, key in enumerate(a.__dask_keys__()))
name2 = 'percentile-' + token
dsk2 = {(name2, 0): (merge_percentiles, q, [q] * len(a.chunks[0]),
diff --git a/dask/array/random.py b/dask/array/random.py
index ad5ef19..745a4e0 100644
--- a/dask/array/random.py
+++ b/dask/array/random.py
@@ -1,11 +1,12 @@
from __future__ import absolute_import, division, print_function
from itertools import product
+from numbers import Integral
from operator import getitem
import numpy as np
-from .core import (normalize_chunks, Array, slices_from_chunks,
+from .core import (normalize_chunks, Array, slices_from_chunks, asarray,
broadcast_shapes, broadcast_to)
from .. import sharedict
from ..base import tokenize
@@ -170,8 +171,62 @@ class RandomState(object):
with ignoring(AttributeError):
@doc_wraps(np.random.RandomState.choice)
def choice(self, a, size=None, replace=True, p=None, chunks=None):
- return self._wrap(np.random.RandomState.choice, a,
- size=size, replace=True, p=None, chunks=chunks)
+ dsks = []
+ # Normalize and validate `a`
+ if isinstance(a, Integral):
+ # On windows the output dtype differs if p is provided or
+ # absent, see https://github.com/numpy/numpy/issues/9867
+ dummy_p = np.array([1]) if p is not None else p
+ dtype = np.random.choice(1, size=(), p=dummy_p).dtype
+ len_a = a
+ if a < 0:
+ raise ValueError("a must be greater than 0")
+ else:
+ a = asarray(a).rechunk(a.shape)
+ dtype = a.dtype
+ if a.ndim != 1:
+ raise ValueError("a must be one dimensional")
+ len_a = len(a)
+ dsks.append(a.dask)
+ a = a.__dask_keys__()[0]
+
+ # Normalize and validate `p`
+ if p is not None:
+ if not isinstance(p, Array):
+ # If p is not a dask array, first check the sum is close
+ # to 1 before converting.
+ p = np.asarray(p)
+ if not np.isclose(p.sum(), 1, rtol=1e-7, atol=0):
+ raise ValueError("probabilities do not sum to 1")
+ p = asarray(p)
+ else:
+ p = p.rechunk(p.shape)
+
+ if p.ndim != 1:
+ raise ValueError("p must be one dimensional")
+ if len(p) != len_a:
+ raise ValueError("a and p must have the same size")
+
+ dsks.append(p.dask)
+ p = p.__dask_keys__()[0]
+
+ if size is None:
+ size = ()
+ elif not isinstance(size, (tuple, list)):
+ size = (size,)
+
+ chunks = normalize_chunks(chunks, size)
+ sizes = list(product(*chunks))
+ state_data = random_state_data(len(sizes), self._numpy_state)
+
+ name = 'da.random.choice-%s' % tokenize(state_data, size, chunks,
+ a, replace, p)
+ keys = product([name], *(range(len(bd)) for bd in chunks))
+ dsk = {k: (_choice, state, a, size, replace, p) for
+ k, state, size in zip(keys, state_data, sizes)}
+
+ return Array(sharedict.merge((name, dsk), *dsks),
+ name, chunks, dtype=dtype)
# @doc_wraps(np.random.RandomState.dirichlet)
# def dirichlet(self, alpha, size=None, chunks=None):
@@ -352,6 +407,11 @@ class RandomState(object):
size=size, chunks=chunks)
+def _choice(state_data, a, size, replace, p):
+ state = np.random.RandomState(state_data)
+ return state.choice(a, size=size, replace=replace, p=p)
+
+
def _apply_random(func, state_data, size, args, kwargs):
"""Apply RandomState method with seed"""
state = np.random.RandomState(state_data)
@@ -368,6 +428,8 @@ seed = _state.seed
beta = _state.beta
binomial = _state.binomial
chisquare = _state.chisquare
+if hasattr(_state, 'choice'):
+ choice = _state.choice
exponential = _state.exponential
f = _state.f
gamma = _state.gamma
diff --git a/dask/array/rechunk.py b/dask/array/rechunk.py
index 8b69009..69e344b 100644
--- a/dask/array/rechunk.py
+++ b/dask/array/rechunk.py
@@ -391,7 +391,7 @@ def find_merge_rechunk(old_chunks, new_chunks, block_size_limit):
bse = block_size_effect[k]
if bse == 1:
bse = 1 + 1e-9
- return np.log(gse) / np.log(bse)
+ return (np.log(gse) / np.log(bse)) if bse > 0 else 0
sorted_candidates = sorted(merge_candidates, key=key)
diff --git a/dask/array/reductions.py b/dask/array/reductions.py
index 3a454b1..918042a 100644
--- a/dask/array/reductions.py
+++ b/dask/array/reductions.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import, division, print_function
+import warnings
+
from functools import partial, wraps
from itertools import product, repeat
from math import factorial, log, ceil
@@ -440,6 +442,12 @@ def vnorm(a, ord=None, axis=None, dtype=None, keepdims=False, split_every=None,
See np.linalg.norm
"""
+
+ warnings.warn(
+ "DeprecationWarning: Please use `dask.array.linalg.norm` instead.",
+ UserWarning
+ )
+
if ord is None or ord == 'fro':
ord = 2
if ord == np.inf:
diff --git a/dask/array/reshape.py b/dask/array/reshape.py
index e830c6d..2a1c3f7 100644
--- a/dask/array/reshape.py
+++ b/dask/array/reshape.py
@@ -170,7 +170,7 @@ def reshape(x, shape):
name = 'reshape-' + tokenize(x, shape)
if x.npartitions == 1:
- key = next(flatten(x._keys()))
+ key = next(flatten(x.__dask_keys__()))
dsk = {(name,) + (0,) * len(shape): (M.reshape, key, shape)}
chunks = tuple((d,) for d in shape)
return Array(sharedict.merge((name, dsk), x.dask), name, chunks,
diff --git a/dask/array/routines.py b/dask/array/routines.py
index 61505b6..7495bf5 100644
--- a/dask/array/routines.py
+++ b/dask/array/routines.py
@@ -16,9 +16,11 @@ from .. import sharedict
from ..core import flatten
from ..base import tokenize
from . import numpy_compat, chunk
+from .creation import arange
+from .wrap import ones
from .core import (Array, map_blocks, elemwise, from_array, asarray,
- concatenate, stack, atop, broadcast_shapes,
+ asanyarray, concatenate, stack, atop, broadcast_shapes,
is_scalar_for_elemwise, broadcast_to, tensordot_lookup)
@@ -37,24 +39,58 @@ def result_type(*args):
return np.result_type(*args)
-def atleast_3d(x):
- if x.ndim == 1:
- return x[None, :, None]
- elif x.ndim == 2:
- return x[:, :, None]
- elif x.ndim > 2:
- return x
+ at wraps(np.atleast_3d)
+def atleast_3d(*arys):
+ new_arys = []
+ for x in arys:
+ x = asanyarray(x)
+ if x.ndim == 0:
+ x = x[None, None, None]
+ elif x.ndim == 1:
+ x = x[None, :, None]
+ elif x.ndim == 2:
+ x = x[:, :, None]
+
+ new_arys.append(x)
+
+ if len(new_arys) == 1:
+ return new_arys[0]
else:
- raise NotImplementedError()
+ return new_arys
+
+
+ at wraps(np.atleast_2d)
+def atleast_2d(*arys):
+ new_arys = []
+ for x in arys:
+ x = asanyarray(x)
+ if x.ndim == 0:
+ x = x[None, None]
+ elif x.ndim == 1:
+ x = x[None, :]
+ new_arys.append(x)
-def atleast_2d(x):
- if x.ndim == 1:
- return x[None, :]
- elif x.ndim > 1:
- return x
+ if len(new_arys) == 1:
+ return new_arys[0]
else:
- raise NotImplementedError()
+ return new_arys
+
+
+ at wraps(np.atleast_1d)
+def atleast_1d(*arys):
+ new_arys = []
+ for x in arys:
+ x = asanyarray(x)
+ if x.ndim == 0:
+ x = x[None]
+
+ new_arys.append(x)
+
+ if len(new_arys) == 1:
+ return new_arys[0]
+ else:
+ return new_arys
@wraps(np.vstack)
@@ -297,13 +333,12 @@ def bincount(x, weights=None, minlength=None):
token = tokenize(x, weights, minlength)
name = 'bincount-' + token
if weights is not None:
- dsk = dict(((name, i),
- (np.bincount, (x.name, i), (weights.name, i), minlength))
- for i, _ in enumerate(x._keys()))
+ dsk = {(name, i): (np.bincount, (x.name, i), (weights.name, i), minlength)
+ for i, _ in enumerate(x.__dask_keys__())}
dtype = np.bincount([1], weights=[1]).dtype
else:
- dsk = dict(((name, i), (np.bincount, (x.name, i), None, minlength))
- for i, _ in enumerate(x._keys()))
+ dsk = {(name, i): (np.bincount, (x.name, i), None, minlength)
+ for i, _ in enumerate(x.__dask_keys__())}
dtype = np.bincount([]).dtype
# Sum up all of the intermediate bincounts per block
@@ -383,7 +418,7 @@ def histogram(a, bins=None, range=None, normed=False, weights=None, density=None
bin_token = bins
token = tokenize(a, bin_token, range, normed, weights, density)
- nchunks = len(list(flatten(a._keys())))
+ nchunks = len(list(flatten(a.__dask_keys__())))
chunks = ((1,) * nchunks, (len(bins) - 1,))
name = 'histogram-sum-' + token
@@ -393,14 +428,14 @@ def histogram(a, bins=None, range=None, normed=False, weights=None, density=None
return np.histogram(x, bins, weights=weights)[0][np.newaxis]
if weights is None:
- dsk = dict(((name, i, 0), (block_hist, k))
- for i, k in enumerate(flatten(a._keys())))
+ dsk = {(name, i, 0): (block_hist, k)
+ for i, k in enumerate(flatten(a.__dask_keys__()))}
dtype = np.histogram([])[0].dtype
else:
- a_keys = flatten(a._keys())
- w_keys = flatten(weights._keys())
- dsk = dict(((name, i, 0), (block_hist, k, w))
- for i, (k, w) in enumerate(zip(a_keys, w_keys)))
+ a_keys = flatten(a.__dask_keys__())
+ w_keys = flatten(weights.__dask_keys__())
+ dsk = {(name, i, 0): (block_hist, k, w)
+ for i, (k, w) in enumerate(zip(a_keys, w_keys))}
dtype = weights.dtype
all_dsk = sharedict.merge(a.dask, (name, dsk))
@@ -495,12 +530,173 @@ def round(a, decimals=0):
return a.map_blocks(np.round, decimals=decimals, dtype=a.dtype)
+def _unique_internal(ar, indices, counts, return_inverse=False):
+ """
+ Helper/wrapper function for NumPy's ``unique``.
+
+ Uses NumPy's ``unique`` to find the unique values for the array chunk.
+ Given this chunk may not represent the whole array, also take the
+ ``indices`` and ``counts`` that are in 1-to-1 correspondence to ``ar``
+ and reduce them in the same fashion as ``ar`` is reduced. Namely sum
+ any counts that correspond to the same value and take the smallest
+ index that corresponds to the same value.
+
+ To handle the inverse mapping from the unique values to the original
+ array, simply return a NumPy array created with ``arange`` with enough
+ values to correspond 1-to-1 to the unique values. While there is more
+ work needed to be done to create the full inverse mapping for the
+ original array, this provides enough information to generate the
+ inverse mapping in Dask.
+
+ Given Dask likes to have one array returned from functions like
+ ``atop``, some formatting is done to stuff all of the resulting arrays
+ into one big NumPy structured array. Dask is then able to handle this
+ object and can split it apart into the separate results on the Dask
+ side, which then can be passed back to this function in concatenated
+ chunks for further reduction or can be return to the user to perform
+ other forms of analysis.
+
+ By handling the problem in this way, it does not matter where a chunk
+ is in a larger array or how big it is. The chunk can still be computed
+ on the same way. Also it does not matter if the chunk is the result of
+ other chunks being run through this function multiple times. The end
+ result will still be just as accurate using this strategy.
+ """
+
+ return_index = (indices is not None)
+ return_counts = (counts is not None)
+
+ u = np.unique(ar)
+
+ dt = [("values", u.dtype)]
+ if return_index:
+ dt.append(("indices", np.int64))
+ if return_inverse:
+ dt.append(("inverse", np.int64))
+ if return_counts:
+ dt.append(("counts", np.int64))
+
+ r = np.empty(u.shape, dtype=dt)
+ r["values"] = u
+ if return_inverse:
+ r["inverse"] = np.arange(len(r), dtype=np.int64)
+ if return_index or return_counts:
+ for i, v in enumerate(r["values"]):
+ m = (ar == v)
+ if return_index:
+ indices[m].min(keepdims=True, out=r["indices"][i:i + 1])
+ if return_counts:
+ counts[m].sum(keepdims=True, out=r["counts"][i:i + 1])
+
+ return r
+
+
@wraps(np.unique)
-def unique(x):
- name = 'unique-' + x.name
- dsk = dict(((name, i), (np.unique, key)) for i, key in enumerate(x._keys()))
- parts = Array._get(sharedict.merge((name, dsk), x.dask), list(dsk.keys()))
- return np.unique(np.concatenate(parts))
+def unique(ar, return_index=False, return_inverse=False, return_counts=False):
+ ar = ar.ravel()
+
+ # Run unique on each chunk and collect results in a Dask Array of
+ # unknown size.
+
+ args = [ar, "i"]
+ out_dtype = [("values", ar.dtype)]
+ if return_index:
+ args.extend([
+ arange(ar.shape[0], dtype=np.int64, chunks=ar.chunks[0]),
+ "i"
+ ])
+ out_dtype.append(("indices", np.int64))
+ else:
+ args.extend([None, None])
+ if return_counts:
+ args.extend([
+ ones((ar.shape[0],), dtype=np.int64, chunks=ar.chunks[0]),
... 7842 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/dask.git
More information about the Python-modules-commits
mailing list