[Git][debian-gis-team/flox][master] 4 commits: New upstream version 0.6.1
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Fri Oct 21 07:34:56 BST 2022
Antonio Valentino pushed to branch master at Debian GIS Project / flox
Commits:
6988c5ac by Antonio Valentino at 2022-10-21T06:22:53+00:00
New upstream version 0.6.1
- - - - -
52d14a8b by Antonio Valentino at 2022-10-21T06:22:55+00:00
Update upstream source from tag 'upstream/0.6.1'
Update to upstream version '0.6.1'
with Debian dir b50b50a3945eb197114331ee10323c3e1f25edc0
- - - - -
08039ccd by Antonio Valentino at 2022-10-21T06:23:28+00:00
New upstream release
- - - - -
f9f97ba0 by Antonio Valentino at 2022-10-21T06:23:52+00:00
Set distribution to unstable
- - - - -
7 changed files:
- + asv_bench/benchmarks/cohorts.py
- debian/changelog
- flox/__init__.py
- flox/cache.py
- flox/core.py
- tests/__init__.py
- tests/test_core.py
Changes:
=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -0,0 +1,127 @@
+import dask
+import numpy as np
+import pandas as pd
+
+import flox
+
+
+class Cohorts:
+ """Time the core reduction function."""
+
+ def setup(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def time_find_group_cohorts(self):
+ flox.core.find_group_cohorts(self.by, self.array.chunks)
+ # The cache clear fails dependably in CI
+ # Not sure why
+ try:
+ flox.cache.cache.clear()
+ except AttributeError:
+ pass
+
+ def time_graph_construct(self):
+ flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts")
+
+ def track_num_tasks(self):
+ result = flox.groupby_reduce(
+ self.array, self.by, func="sum", axis=self.axis, method="cohorts"
+ )[0]
+ return len(result.dask.to_dict())
+
+ def track_num_tasks_optimized(self):
+ result = flox.groupby_reduce(
+ self.array, self.by, func="sum", axis=self.axis, method="cohorts"
+ )[0]
+ (opt,) = dask.optimize(result)
+ return len(opt.dask.to_dict())
+
+ def track_num_layers(self):
+ result = flox.groupby_reduce(
+ self.array, self.by, func="sum", axis=self.axis, method="cohorts"
+ )[0]
+ return len(result.dask.layers)
+
+ track_num_tasks.unit = "tasks"
+ track_num_tasks_optimized.unit = "tasks"
+ track_num_layers.unit = "layers"
+
+
+class NWMMidwest(Cohorts):
+ """2D labels, ireregular w.r.t chunk size.
+ Mimics National Weather Model, Midwest county groupby."""
+
+ def setup(self, *args, **kwargs):
+ x = np.repeat(np.arange(30), 150)
+ y = np.repeat(np.arange(30), 60)
+ self.by = x[np.newaxis, :] * y[:, np.newaxis]
+
+ self.array = dask.array.ones(self.by.shape, chunks=(350, 350))
+ self.axis = (-2, -1)
+
+
+class ERA5Dataset:
+ """ERA5"""
+
+ def __init__(self, *args, **kwargs):
+ self.time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H"))
+ self.axis = (-1,)
+ self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 48))
+
+ def rechunk(self):
+ self.array = flox.core.rechunk_for_cohorts(
+ self.array, -1, self.by, force_new_chunk_at=[1], chunksize=48, ignore_old_chunks=True
+ )
+
+
+class ERA5DayOfYear(ERA5Dataset, Cohorts):
+ def setup(self, *args, **kwargs):
+ super().__init__()
+ self.by = self.time.dt.dayofyear.values
+
+
+class ERA5DayOfYearRechunked(ERA5DayOfYear, Cohorts):
+ def setup(self, *args, **kwargs):
+ super().setup()
+ super().rechunk()
+
+
+class ERA5MonthHour(ERA5Dataset, Cohorts):
+ def setup(self, *args, **kwargs):
+ super().__init__()
+ by = (self.time.dt.month.values, self.time.dt.hour.values)
+ ret = flox.core._factorize_multiple(
+ by,
+ expected_groups=(pd.Index(np.arange(1, 13)), pd.Index(np.arange(1, 25))),
+ by_is_dask=False,
+ reindex=False,
+ )
+ # Add one so the rechunk code is simpler and makes sense
+ self.by = ret[0][0] + 1
+
+
+class ERA5MonthHourRechunked(ERA5MonthHour, Cohorts):
+ def setup(self, *args, **kwargs):
+ super().setup()
+ super().rechunk()
+
+
+class PerfectMonthly(Cohorts):
+ """Perfectly chunked for a "cohorts" monthly mean climatology"""
+
+ def setup(self, *args, **kwargs):
+ self.time = pd.Series(pd.date_range("1961-01-01", "2018-12-31 23:59", freq="M"))
+ self.axis = (-1,)
+ self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 4))
+ self.by = self.time.dt.month.values
+
+ def rechunk(self):
+ self.array = flox.core.rechunk_for_cohorts(
+ self.array, -1, self.by, force_new_chunk_at=[1], chunksize=4, ignore_old_chunks=True
+ )
+
+
+class PerfectMonthlyRechunked(PerfectMonthly):
+ def setup(self, *args, **kwargs):
+ super().setup()
+ super().rechunk()
=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+flox (0.6.1-1) unstable; urgency=medium
+
+ * New upstream release.
+
+ -- Antonio Valentino <antonio.valentino at tiscali.it> Fri, 21 Oct 2022 06:23:34 +0000
+
flox (0.6.0-1) unstable; urgency=medium
* New upstream release.
=====================================
flox/__init__.py
=====================================
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# flake8: noqa
"""Top-level module for flox ."""
+from . import cache
from .aggregations import Aggregation # noqa
from .core import groupby_reduce, rechunk_for_blockwise, rechunk_for_cohorts # noqa
=====================================
flox/cache.py
=====================================
@@ -8,4 +8,5 @@ try:
cache = cachey.Cache(1e6)
memoize = partial(cache.memoize, key=dask.base.tokenize)
except ImportError:
+ cache = {}
memoize = lambda x: x # type: ignore
=====================================
flox/core.py
=====================================
@@ -6,6 +6,7 @@ import math
import operator
from collections import namedtuple
from functools import partial, reduce
+from numbers import Integral
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Mapping, Sequence, Union
import numpy as np
@@ -106,7 +107,7 @@ def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray:
def _get_optimal_chunks_for_groups(chunks, labels):
chunkidx = np.cumsum(chunks) - 1
# what are the groups at chunk boundaries
- labels_at_chunk_bounds = np.unique(labels[chunkidx])
+ labels_at_chunk_bounds = _unique(labels[chunkidx])
# what's the last index of all groups
last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last")
# what's the last index of groups at the chunk boundaries.
@@ -136,6 +137,12 @@ def _get_optimal_chunks_for_groups(chunks, labels):
return tuple(newchunks)
+def _unique(a):
+ """Much faster to use pandas unique and sort the results.
+ np.unique sorts before uniquifying and is slow."""
+ return np.sort(pd.unique(a))
+
+
@memoize
def find_group_cohorts(labels, chunks, merge: bool = True):
"""
@@ -180,14 +187,11 @@ def find_group_cohorts(labels, chunks, merge: bool = True):
blocks[idx] = np.full(tuple(block.shape[ax] for ax in axis), idx)
which_chunk = np.block(blocks.reshape(shape).tolist()).reshape(-1)
- # We always drop NaN; np.unique also considers every NaN to be different so
- # it's really important we get rid of them.
raveled = labels.reshape(-1)
- unique_labels = np.unique(raveled[~isnull(raveled)])
# these are chunks where a label is present
- label_chunks = {lab: tuple(np.unique(which_chunk[raveled == lab])) for lab in unique_labels}
+ label_chunks = pd.Series(which_chunk).groupby(raveled).unique()
# These invert the label_chunks mapping so we know which labels occur together.
- chunks_cohorts = tlz.groupby(label_chunks.get, label_chunks.keys())
+ chunks_cohorts = tlz.groupby(lambda x: tuple(label_chunks.get(x)), label_chunks.keys())
if merge:
# First sort by number of chunks occupied by cohort
@@ -285,7 +289,7 @@ def rechunk_for_cohorts(
divisions = []
counter = 1
for idx, lab in enumerate(labels):
- if lab in force_new_chunk_at:
+ if lab in force_new_chunk_at or idx == 0:
divisions.append(idx)
counter = 1
continue
@@ -302,6 +306,7 @@ def rechunk_for_cohorts(
divisions.append(idx)
counter = 1
continue
+
counter += 1
divisions.append(len(labels))
@@ -310,6 +315,9 @@ def rechunk_for_cohorts(
print(labels_at_breaks[:40])
newchunks = tuple(np.diff(divisions))
+ if debug:
+ print(divisions[:10], newchunks[:10])
+ print(divisions[-10:], newchunks[-10:])
assert sum(newchunks) == len(labels)
if newchunks == array.chunks[axis]:
@@ -892,7 +900,7 @@ def _grouped_combine(
# when there's only a single axis of reduction, we can just concatenate later,
# reindexing is unnecessary
# I bet we can minimize the amount of reindexing for mD reductions too, but it's complicated
- unique_groups = np.unique(tuple(flatten(deepmap(listify_groups, x_chunk))))
+ unique_groups = _unique(tuple(flatten(deepmap(listify_groups, x_chunk))))
unique_groups = unique_groups[~isnull(unique_groups)]
if len(unique_groups) == 0:
unique_groups = [np.nan]
@@ -1043,6 +1051,44 @@ def _reduce_blockwise(
return result
+def _normalize_indexes(array, flatblocks, blkshape):
+ """
+ .blocks accessor can only accept one iterable at a time,
+ but can handle multiple slices.
+ To minimize tasks and layers, we normalize to produce slices
+ along as many axes as possible, and then repeatedly apply
+ any remaining iterables in a loop.
+
+ TODO: move this upstream
+ """
+ unraveled = np.unravel_index(flatblocks, blkshape)
+
+ normalized: list[Union[int, np.ndarray, slice]] = []
+ for ax, idx in enumerate(unraveled):
+ i = _unique(idx).squeeze()
+ if i.ndim == 0:
+ normalized.append(i.item())
+ else:
+ if np.array_equal(i, np.arange(blkshape[ax])):
+ normalized.append(slice(None))
+ elif np.array_equal(i, np.arange(i[0], i[-1] + 1)):
+ normalized.append(slice(i[0], i[-1] + 1))
+ else:
+ normalized.append(list(i))
+ full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized)
+
+ # has no iterables
+ noiter = list(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized)
+ # has all iterables
+ alliter = {ax: i for ax, i in enumerate(full_normalized) if hasattr(i, "__len__")}
+
+ mesh = dict(zip(alliter.keys(), np.ix_(*alliter.values())))
+
+ full_tuple = tuple(i if ax not in mesh else mesh[ax] for ax, i in enumerate(noiter))
+
+ return full_tuple
+
+
def subset_to_blocks(
array: DaskArray, flatblocks: Sequence[int], blkshape: tuple[int] | None = None
) -> DaskArray:
@@ -1059,45 +1105,34 @@ def subset_to_blocks(
-------
dask.array
"""
+ import dask.array
+ from dask.array.slicing import normalize_index
+ from dask.base import tokenize
+ from dask.highlevelgraph import HighLevelGraph
+
if blkshape is None:
blkshape = array.blocks.shape
- unraveled = np.unravel_index(flatblocks, blkshape)
- normalized: list[Union[int, np.ndarray, slice]] = []
- for ax, idx in enumerate(unraveled):
- i = np.unique(idx).squeeze()
- if i.ndim == 0:
- normalized.append(i.item())
- else:
- if np.array_equal(i, np.arange(blkshape[ax])):
- normalized.append(slice(None))
- elif np.array_equal(i, np.arange(i[0], i[-1] + 1)):
- normalized.append(slice(i[0], i[-1] + 1))
- else:
- normalized.append(i)
- full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized)
-
- # has no iterables
- noiter = tuple(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized)
- # has all iterables
- alliter = {
- ax: i if hasattr(i, "__len__") else slice(None) for ax, i in enumerate(full_normalized)
- }
+ index = _normalize_indexes(array, flatblocks, blkshape)
- # apply everything but the iterables
- if all(i == slice(None) for i in noiter):
+ if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index):
return array
- subset = array.blocks[noiter]
+ # These rest is copied from dask.array.core.py with slight modifications
+ index = normalize_index(index, array.numblocks)
+ index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index)
- for ax, inds in alliter.items():
- if isinstance(inds, slice):
- continue
- idxr = [slice(None, None)] * array.ndim
- idxr[ax] = inds
- subset = subset.blocks[tuple(idxr)]
+ name = "blocks-" + tokenize(array, index)
+ new_keys = array._key_array[index]
+
+ squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index)
+ chunks = tuple(tuple(np.array(c)[i].tolist()) for c, i in zip(array.chunks, squeezed))
+
+ keys = itertools.product(*(range(len(c)) for c in chunks))
+ layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys}
+ graph = HighLevelGraph.from_collections(name, layer, dependencies=[array])
- return subset
+ return dask.array.Array(graph, name, chunks, meta=array)
def _extract_unknown_groups(reduced, group_chunks, dtype) -> tuple[DaskArray]:
@@ -1310,7 +1345,7 @@ def dask_groupby_agg(
# along the reduced axis
slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis))
if expected_groups is None:
- groups_in_block = tuple(np.unique(by_input[slc]) for slc in slices)
+ groups_in_block = tuple(_unique(by_input[slc]) for slc in slices)
else:
# For cohorts, we could be indexing a block with groups that
# are not in the cohort (usually for nD `by`)
=====================================
tests/__init__.py
=====================================
@@ -115,6 +115,18 @@ def assert_equal(a, b, tolerance=None):
np.testing.assert_allclose(a, b, equal_nan=True, **tolerance)
+def assert_equal_tuple(a, b):
+ """assert_equal for .blocks indexing tuples"""
+ assert len(a) == len(b)
+
+ for a_, b_ in zip(a, b):
+ assert type(a_) == type(b_)
+ if isinstance(a_, np.ndarray):
+ np.testing.assert_array_equal(a_, b_)
+ else:
+ assert a_ == b_
+
+
@pytest.fixture(scope="module", params=["flox", "numpy", "numba"])
def engine(request):
if request.param == "numba":
=====================================
tests/test_core.py
=====================================
@@ -12,14 +12,23 @@ from flox.aggregations import Aggregation
from flox.core import (
_convert_expected_groups_to_index,
_get_optimal_chunks_for_groups,
+ _normalize_indexes,
factorize_,
find_group_cohorts,
groupby_reduce,
rechunk_for_cohorts,
reindex_,
+ subset_to_blocks,
)
-from . import assert_equal, engine, has_dask, raise_if_dask_computes, requires_dask
+from . import (
+ assert_equal,
+ assert_equal_tuple,
+ engine,
+ has_dask,
+ raise_if_dask_computes,
+ requires_dask,
+)
labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0])
nan_labels = labels.astype(float) # copy
@@ -1035,3 +1044,84 @@ def test_dtype(func, dtype, engine):
labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"])
actual, _ = groupby_reduce(arr, labels, func=func, dtype=np.float64)
assert actual.dtype == np.dtype("float64")
+
+
+ at requires_dask
+def test_subset_blocks():
+ array = dask.array.random.random((120,), chunks=(4,))
+
+ blockid = (0, 3, 6, 9, 12, 15, 18, 21, 24, 27)
+ subset = subset_to_blocks(array, blockid)
+ assert subset.blocks.shape == (len(blockid),)
+
+
+ at requires_dask
+ at pytest.mark.parametrize(
+ "flatblocks, expected",
+ (
+ ((0, 1, 2, 3, 4), (slice(None),)),
+ ((1, 2, 3), (slice(1, 4),)),
+ ((1, 3), ([1, 3],)),
+ ((0, 1, 3), ([0, 1, 3],)),
+ ),
+)
+def test_normalize_block_indexing_1d(flatblocks, expected):
+ nblocks = 5
+ array = dask.array.ones((nblocks,), chunks=(1,))
+ expected = tuple(np.array(i) if isinstance(i, list) else i for i in expected)
+ actual = _normalize_indexes(array, flatblocks, array.blocks.shape)
+ assert_equal_tuple(expected, actual)
+
+
+ at requires_dask
+ at pytest.mark.parametrize(
+ "flatblocks, expected",
+ (
+ ((0, 1, 2, 3, 4), (0, slice(None))),
+ ((1, 2, 3), (0, slice(1, 4))),
+ ((1, 3), (0, [1, 3])),
+ ((0, 1, 3), (0, [0, 1, 3])),
+ (tuple(range(10)), (slice(0, 2), slice(None))),
+ ((0, 1, 3, 5, 6, 8), (slice(0, 2), [0, 1, 3])),
+ ((0, 3, 4, 5, 6, 8, 24), np.ix_([0, 1, 4], [0, 1, 3, 4])),
+ ),
+)
+def test_normalize_block_indexing_2d(flatblocks, expected):
+ nblocks = 5
+ ndim = 2
+ array = dask.array.ones((nblocks,) * ndim, chunks=(1,) * ndim)
+ expected = tuple(np.array(i) if isinstance(i, list) else i for i in expected)
+ actual = _normalize_indexes(array, flatblocks, array.blocks.shape)
+ assert_equal_tuple(expected, actual)
+
+
+ at requires_dask
+def test_subset_block_passthrough():
+ # full slice pass through
+ array = dask.array.ones((5,), chunks=(1,))
+ subset = subset_to_blocks(array, np.arange(5))
+ assert subset.name == array.name
+
+ array = dask.array.ones((5, 5), chunks=1)
+ subset = subset_to_blocks(array, np.arange(25))
+ assert subset.name == array.name
+
+
+ at requires_dask
+ at pytest.mark.parametrize(
+ "flatblocks, expectidx",
+ [
+ (np.arange(10), (slice(2), slice(None))),
+ (np.arange(8), (slice(2), slice(None))),
+ ([0, 10], ([0, 2], slice(1))),
+ ([0, 7], (slice(2), [0, 2])),
+ ([0, 7, 9], (slice(2), [0, 2, 4])),
+ ([0, 6, 12, 14], (slice(3), [0, 1, 2, 4])),
+ ([0, 12, 14, 19], np.ix_([0, 2, 3], [0, 2, 4])),
+ ],
+)
+def test_subset_block_2d(flatblocks, expectidx):
+ array = dask.array.from_array(np.arange(25).reshape((5, 5)), chunks=1)
+ subset = subset_to_blocks(array, flatblocks)
+ assert len(subset.dask.layers) == 2
+ assert_equal(subset, array.compute()[expectidx])
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/e26f0d6f98911d6607bb233852456083967c7ebf...f9f97ba072d70d83799b9d4645e1af0b795d008c
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/e26f0d6f98911d6607bb233852456083967c7ebf...f9f97ba072d70d83799b9d4645e1af0b795d008c
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20221021/04ca98c7/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list