[Git][debian-gis-team/flox][master] 6 commits: New upstream version 0.8.5
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Sat Dec 2 23:25:34 GMT 2023
Antonio Valentino pushed to branch master at Debian GIS Project / flox
Commits:
6557f96e by Antonio Valentino at 2023-12-02T21:44:25+00:00
New upstream version 0.8.5
- - - - -
feda024e by Antonio Valentino at 2023-12-02T21:44:26+00:00
Update upstream source from tag 'upstream/0.8.5'
Update to upstream version '0.8.5'
with Debian dir 9d1523b04c6e52d1c5f78d41253e3d03c5e951c4
- - - - -
1da7c136 by Antonio Valentino at 2023-12-02T21:45:06+00:00
New upstream release
- - - - -
112afada by Antonio Valentino at 2023-12-02T21:51:37+00:00
Add dependency on scipy
- - - - -
d6262c4e by Antonio Valentino at 2023-12-02T23:16:22+00:00
Skip tests requiring scipy v1.11.x
- - - - -
4c03516e by Antonio Valentino at 2023-12-02T23:20:23+00:00
Re-enable test_xarray_resample, test_multi_index_groupby_sum and test_dtype_accumulation.
- - - - -
20 changed files:
- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- asv_bench/benchmarks/cohorts.py
- asv_bench/benchmarks/reduce.py
- ci/benchmark.yml
- ci/docs.yml
- ci/environment.yml
- ci/minimal-requirements.yml
- ci/no-dask.yml
- ci/no-numba.yml
- ci/no-xarray.yml
- debian/changelog
- debian/control
- debian/rules
- flox/core.py
- flox/visualize.py
- flox/xarray.py
- pyproject.toml
- tests/test_core.py
- tests/test_xarray.py
Changes:
=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -59,7 +59,7 @@ jobs:
environment-name: flox-tests
init-shell: bash
cache-environment: true
- cache-env-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
+ cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: |
python=${{ env.PYTHON_VERSION }}
@@ -112,7 +112,7 @@ jobs:
environment-name: flox-tests
init-shell: bash
cache-environment: true
- cache-env-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
+ cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: |
python=${{ env.PYTHON_VERSION }}
- name: Install flox
=====================================
.github/workflows/ci.yaml
=====================================
@@ -119,8 +119,9 @@ jobs:
environment-name: xarray-tests
init-shell: bash
cache-environment: true
- create-args: |
- python=3.10
+ create-args: >-
+ python=3.11
+ pint>=0.22
- name: Install xarray
run: |
python -m pip install --no-deps .
@@ -144,4 +145,7 @@ jobs:
id: status
run: |
set -euo pipefail
- python -m pytest -n auto xarray/tests/test_groupby.py
+ python -m pytest -n auto \
+ xarray/tests/test_groupby.py \
+ xarray/tests/test_units.py::TestDataArray::test_computation_objects \
+ xarray/tests/test_units.py::TestDataset::test_computation_objects
=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -1,10 +1,8 @@
import dask
import numpy as np
import pandas as pd
-import xarray as xr
import flox
-from flox.xarray import xarray_reduce
class Cohorts:
@@ -47,6 +45,10 @@ class Cohorts:
track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy
track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy
track_num_layers.unit = "layers" # type: ignore[attr-defined] # Lazy
+ for f in [track_num_tasks, track_num_tasks_optimized, track_num_layers]:
+ f.repeat = 1 # type: ignore[attr-defined] # Lazy
+ f.rounds = 1 # type: ignore[attr-defined] # Lazy
+ f.number = 1 # type: ignore[attr-defined] # Lazy
class NWMMidwest(Cohorts):
@@ -85,7 +87,7 @@ class ERA5DayOfYear(ERA5Dataset, Cohorts):
class ERA5DayOfYearRechunked(ERA5DayOfYear, Cohorts):
def setup(self, *args, **kwargs):
super().setup()
- super().rechunk()
+ self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 24))
class ERA5MonthHour(ERA5Dataset, Cohorts):
@@ -129,11 +131,10 @@ class PerfectMonthlyRechunked(PerfectMonthly):
super().rechunk()
-def time_cohorts_era5_single():
- TIME = 900 # 92044 in Google ARCO ERA5
- da = xr.DataArray(
- dask.array.ones((TIME, 721, 1440), chunks=(1, -1, -1)),
- dims=("time", "lat", "lon"),
- coords=dict(time=pd.date_range("1959-01-01", freq="6H", periods=TIME)),
- )
- xarray_reduce(da, da.time.dt.day, method="cohorts", func="any")
+class ERA5Google(Cohorts):
+ def setup(self, *args, **kwargs):
+ TIME = 900 # 92044 in Google ARCO ERA5
+ self.time = pd.Series(pd.date_range("1959-01-01", freq="6H", periods=TIME))
+ self.axis = (2,)
+ self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, 1))
+ self.by = self.time.dt.day.values
=====================================
asv_bench/benchmarks/reduce.py
=====================================
@@ -59,17 +59,17 @@ class ChunkReduce:
expected_groups=expected_groups[expected_name],
)
- @skip_for_params(numbagg_skip)
- @parameterize({"func": funcs, "expected_name": expected_names, "engine": engines})
- def peakmem_reduce(self, func, expected_name, engine):
- flox.groupby_reduce(
- self.array,
- self.labels,
- func=func,
- engine=engine,
- axis=self.axis,
- expected_groups=expected_groups[expected_name],
- )
+ # @skip_for_params(numbagg_skip)
+ # @parameterize({"func": funcs, "expected_name": expected_names, "engine": engines})
+ # def peakmem_reduce(self, func, expected_name, engine):
+ # flox.groupby_reduce(
+ # self.array,
+ # self.labels,
+ # func=func,
+ # engine=engine,
+ # axis=self.axis,
+ # expected_groups=expected_groups[expected_name],
+ # )
class ChunkReduce1D(ChunkReduce):
=====================================
ci/benchmark.yml
=====================================
@@ -13,3 +13,4 @@ dependencies:
- numpy_groupies>=0.9.19
- numbagg>=0.3
- wheel
+ - scipy
=====================================
ci/docs.yml
=====================================
@@ -6,6 +6,7 @@ dependencies:
- pip
- xarray
- numpy>=1.22
+ - scipy
- numpydoc
- numpy_groupies>=0.9.19
- toolz
=====================================
ci/environment.yml
=====================================
@@ -9,6 +9,7 @@ dependencies:
- netcdf4
- pandas
- numpy>=1.22
+ - scipy
- lxml # for mypy coverage report
- matplotlib
- pip
@@ -24,4 +25,3 @@ dependencies:
- toolz
- numba
- numbagg>=0.3
- - scipy
=====================================
ci/minimal-requirements.yml
=====================================
@@ -10,6 +10,7 @@ dependencies:
- pytest-pretty
- pytest-xdist
- numpy==1.22
+ - scipy
- numpy_groupies==0.9.19
- pandas
- pooch
=====================================
ci/no-dask.yml
=====================================
@@ -6,6 +6,7 @@ dependencies:
- netcdf4
- pandas
- numpy>=1.22
+ - scipy
- pip
- pytest
- pytest-cov
=====================================
ci/no-numba.yml
=====================================
@@ -9,6 +9,7 @@ dependencies:
- netcdf4
- pandas
- numpy>=1.22
+ - scipy
- lxml # for mypy coverage report
- matplotlib
- pip
@@ -21,4 +22,3 @@ dependencies:
- numpy_groupies>=0.9.19
- pooch
- toolz
- - scipy
=====================================
ci/no-xarray.yml
=====================================
@@ -6,6 +6,7 @@ dependencies:
- netcdf4
- pandas
- numpy>=1.22
+ - scipy
- pip
- pytest
- pytest-cov
=====================================
debian/changelog
=====================================
@@ -1,3 +1,15 @@
+flox (0.8.5-1) UNRELEASED; urgency=medium
+
+ * New upstream release.
+ * debian/control:
+ - Add dependency on scipy.
+ * debian/rules:
+ - Skip tests requiring scipy v1.11.x.
+ - Re-enable test_xarray_resample, test_multi_index_groupby_sum
+ and test_dtype_accumulation.
+
+ -- Antonio Valentino <antonio.valentino at tiscali.it> Sat, 02 Dec 2023 21:44:30 +0000
+
flox (0.8.2-1) unstable; urgency=medium
* New upstream release.
=====================================
debian/control
=====================================
@@ -19,6 +19,7 @@ Build-Depends: debhelper-compat (= 13),
python3-packaging,
python3-pandas,
python3-pytest <!nocheck>,
+ python3-scipy,
python3-setuptools,
python3-setuptools-scm,
python3-toolz,
=====================================
debian/rules
=====================================
@@ -2,12 +2,11 @@
export PYBUILD_NAME=flox
export PYBUILD_TEST_ARGS=\
--k "not test_xarray_resample \
-and not test_xarray_reduce_single_grouper \
+-k "not test_xarray_reduce_single_grouper \
and not test_func_is_aggregation \
and not test_groupby_bins_indexed_coordinate \
-and not test_multi_index_groupby_sum \
-and not test_dtype_accumulation" \
+and not test_groupby_reduce_all \
+and not test_verify_complex_cohorts" \
$(CURDIR)/tests
export PYBUILD_AFTER_INSTALL=rm -rf \
{destdir}/{install_dir}/benchmarks
=====================================
flox/core.py
=====================================
@@ -1,6 +1,5 @@
from __future__ import annotations
-import copy
import itertools
import math
import operator
@@ -9,6 +8,7 @@ import warnings
from collections import namedtuple
from collections.abc import Sequence
from functools import partial, reduce
+from itertools import product
from numbers import Integral
from typing import (
TYPE_CHECKING,
@@ -23,6 +23,7 @@ import numpy as np
import numpy_groupies as npg
import pandas as pd
import toolz as tlz
+from scipy.sparse import csc_array
from . import xrdtypes
from .aggregate_flox import _prepare_for_flox
@@ -52,7 +53,7 @@ if TYPE_CHECKING:
T_DuckArray = Union[np.ndarray, DaskArray] # Any ?
T_By = T_DuckArray
T_Bys = tuple[T_By, ...]
- T_ExpectIndex = Union[pd.Index]
+ T_ExpectIndex = pd.Index
T_ExpectIndexTuple = tuple[T_ExpectIndex, ...]
T_ExpectIndexOpt = Union[T_ExpectIndex, None]
T_ExpectIndexOptTuple = tuple[T_ExpectIndexOpt, ...]
@@ -203,6 +204,16 @@ def _unique(a: np.ndarray) -> np.ndarray:
return np.sort(pd.unique(a.reshape(-1)))
+def slices_from_chunks(chunks):
+ """slightly modified from dask.array.core.slices_from_chunks to be lazy"""
+ cumdims = [tlz.accumulate(operator.add, bds, 0) for bds in chunks]
+ slices = (
+ (slice(s, s + dim) for s, dim in zip(starts, shapes))
+ for starts, shapes in zip(cumdims, chunks)
+ )
+ return product(*slices)
+
+
@memoize
def find_group_cohorts(labels, chunks, merge: bool = True) -> dict:
"""
@@ -215,9 +226,10 @@ def find_group_cohorts(labels, chunks, merge: bool = True) -> dict:
Parameters
----------
labels : np.ndarray
- mD Array of group labels
+ mD Array of integer group codes, factorized so that -1
+ represents NaNs.
chunks : tuple
- nD array that is being reduced
+ chunks of the array being reduced
merge : bool, optional
Attempt to merge cohorts when one cohort's chunks are a subset
of another cohort's chunks.
@@ -227,33 +239,59 @@ def find_group_cohorts(labels, chunks, merge: bool = True) -> dict:
cohorts: dict_values
Iterable of cohorts
"""
- import dask
-
# To do this, we must have values in memory so casting to numpy should be safe
labels = np.asarray(labels)
- # Build an array with the shape of labels, but where every element is the "chunk number"
- # 1. First subset the array appropriately
- axis = range(-labels.ndim, 0)
- # Easier to create a dask array and use the .blocks property
- array = dask.array.empty(tuple(sum(c) for c in chunks), chunks=chunks)
- labels = np.broadcast_to(labels, array.shape[-labels.ndim :])
-
- # Iterate over each block and create a new block of same shape with "chunk number"
- shape = tuple(array.blocks.shape[ax] for ax in axis)
- # Use a numpy object array to enable assignment in the loop
- # TODO: is it possible to just use a nested list?
- # That is what we need for `np.block`
- blocks = np.empty(shape, dtype=object)
- array_chunks = tuple(np.array(c) for c in array.chunks)
- for idx, blockindex in enumerate(np.ndindex(array.numblocks)):
- chunkshape = tuple(c[i] for c, i in zip(array_chunks, blockindex))
- blocks[blockindex] = np.full(chunkshape, idx)
- which_chunk = np.block(blocks.tolist()).reshape(-1)
-
- raveled = labels.reshape(-1)
- # these are chunks where a label is present
- label_chunks = pd.Series(which_chunk).groupby(raveled).unique()
+ shape = tuple(sum(c) for c in chunks)
+ nchunks = math.prod(len(c) for c in chunks)
+
+ # assumes that `labels` are factorized
+ nlabels = labels.max() + 1
+
+ labels = np.broadcast_to(labels, shape[-labels.ndim :])
+
+ rows = []
+ cols = []
+ # Add one to handle the -1 sentinel value
+ label_is_present = np.zeros((nlabels + 1,), dtype=bool)
+ ilabels = np.arange(nlabels)
+ for idx, region in enumerate(slices_from_chunks(chunks)):
+ # This is a quite fast way to find unique integers, when we know how many there are
+ # inspired by a similar idea in numpy_groupies for first, last
+ # instead of explicitly finding uniques, repeatedly write True to the same location
+ subset = labels[region]
+ # The reshape is not strictly necessary but is about 100ms faster on a test problem.
+ label_is_present[subset.reshape(-1)] = True
+ # skip the -1 sentinel by slicing
+ uniques = ilabels[label_is_present[:-1]]
+ rows.append([idx] * len(uniques))
+ cols.append(uniques)
+ label_is_present[:] = False
+ rows_array = np.concatenate(rows)
+ cols_array = np.concatenate(cols)
+ data = np.broadcast_to(np.array(1, dtype=np.uint8), rows_array.shape)
+ bitmask = csc_array((data, (rows_array, cols_array)), dtype=bool, shape=(nchunks, nlabels))
+ label_chunks = {
+ lab: bitmask.indices[slice(bitmask.indptr[lab], bitmask.indptr[lab + 1])]
+ for lab in range(nlabels)
+ }
+
+ ## numpy bitmask approach, faster than finding uniques, but lots of memory
+ # bitmask = np.zeros((nchunks, nlabels), dtype=bool)
+ # for idx, region in enumerate(slices_from_chunks(chunks)):
+ # bitmask[idx, labels[region]] = True
+ # bitmask = bitmask[:, :-1]
+ # chunk = np.arange(nchunks) # [:, np.newaxis] * bitmask
+ # label_chunks = {lab: chunk[bitmask[:, lab]] for lab in range(nlabels - 1)}
+
+ ## Pandas GroupBy approach, quite slow!
+ # which_chunk = np.empty(shape, dtype=np.int64)
+ # for idx, region in enumerate(slices_from_chunks(chunks)):
+ # which_chunk[region] = idx
+ # which_chunk = which_chunk.reshape(-1)
+ # raveled = labels.reshape(-1)
+ # # these are chunks where a label is present
+ # label_chunks = pd.Series(which_chunk).groupby(raveled).unique()
# These invert the label_chunks mapping so we know which labels occur together.
def invert(x) -> tuple[np.ndarray, ...]:
@@ -264,40 +302,46 @@ def find_group_cohorts(labels, chunks, merge: bool = True) -> dict:
# If our dataset has chunksize one along the axis,
# then no merging is possible.
- single_chunks = all((ac == 1).all() for ac in array_chunks)
-
- if merge and not single_chunks:
+ single_chunks = all(all(a == 1 for a in ac) for ac in chunks)
+ one_group_per_chunk = (bitmask.sum(axis=1) == 1).all()
+ if not one_group_per_chunk and not single_chunks and merge:
# First sort by number of chunks occupied by cohort
sorted_chunks_cohorts = dict(
sorted(chunks_cohorts.items(), key=lambda kv: len(kv[0]), reverse=True)
)
- items = tuple(sorted_chunks_cohorts.items())
+ # precompute needed metrics for the quadratic loop below.
+ items = tuple((k, len(k), set(k), v) for k, v in sorted_chunks_cohorts.items() if k)
merged_cohorts = {}
- merged_keys = []
+ merged_keys: set[tuple] = set()
# Now we iterate starting with the longest number of chunks,
# and then merge in cohorts that are present in a subset of those chunks
# I think this is suboptimal and must fail at some point.
# But it might work for most cases. There must be a better way...
- for idx, (k1, v1) in enumerate(items):
+ for idx, (k1, len_k1, set_k1, v1) in enumerate(items):
if k1 in merged_keys:
continue
- merged_cohorts[k1] = copy.deepcopy(v1)
- for k2, v2 in items[idx + 1 :]:
- if k2 in merged_keys:
- continue
- if set(k2).issubset(set(k1)):
- merged_cohorts[k1].extend(v2)
- merged_keys.append(k2)
-
- # make sure each cohort is sorted after merging
- sorted_merged_cohorts = {k: sorted(v) for k, v in merged_cohorts.items()}
+ new_key = set_k1
+ new_value = v1
+ # iterate in reverse since we expect small cohorts
+ # to be most likely merged in to larger ones
+ for k2, len_k2, set_k2, v2 in reversed(items[idx + 1 :]):
+ if k2 not in merged_keys:
+ if (len(set_k2 & new_key) / len_k2) > 0.75:
+ new_key |= set_k2
+ new_value += v2
+ merged_keys.update((k2,))
+ sorted_ = sorted(new_value)
+ merged_cohorts[tuple(sorted(new_key))] = sorted_
+ if idx == 0 and (len(sorted_) == nlabels) and (np.array(sorted_) == ilabels).all():
+ break
+
# sort by first label in cohort
# This will help when sort=True (default)
# and we have to resort the dask array
- return dict(sorted(sorted_merged_cohorts.items(), key=lambda kv: kv[1][0]))
+ return dict(sorted(merged_cohorts.items(), key=lambda kv: kv[1][0]))
else:
return chunks_cohorts
@@ -1373,7 +1417,6 @@ def dask_groupby_agg(
inds = tuple(range(array.ndim))
name = f"groupby_{agg.name}"
- token = dask.base.tokenize(array, by, agg, expected_groups, axis)
if expected_groups is None and reindex:
expected_groups = _get_expected_groups(by, sort=sort)
@@ -1394,6 +1437,9 @@ def dask_groupby_agg(
by = dask.array.from_array(by, chunks=chunks)
_, (array, by) = dask.array.unify_chunks(array, inds, by, inds[-by.ndim :])
+ # tokenize here since by has already been hashed if its numpy
+ token = dask.base.tokenize(array, by, agg, expected_groups, axis)
+
# preprocess the array:
# - for argreductions, this zips the index together with the array block
# - not necessary for blockwise with argreductions
@@ -1510,7 +1556,7 @@ def dask_groupby_agg(
index = pd.Index(cohort)
subset = subset_to_blocks(intermediate, blks, array.blocks.shape[-len(axis) :])
reindexed = dask.array.map_blocks(
- reindex_intermediates, subset, agg=agg, unique_groups=index, meta=subset._meta
+ reindex_intermediates, subset, agg, index, meta=subset._meta
)
# now that we have reindexed, we can set reindex=True explicitlly
reduced_.append(
@@ -1856,7 +1902,7 @@ def groupby_reduce(
engine: T_EngineOpt = None,
reindex: bool | None = None,
finalize_kwargs: dict[Any, Any] | None = None,
-) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy yet
+) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]:
"""
GroupBy reductions using tree reductions for dask.array
@@ -2184,4 +2230,4 @@ def groupby_reduce(
if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)):
result = result.astype(bool)
- return (result, *groups) # type: ignore[return-value] # Unpack not in mypy yet
+ return (result, *groups)
=====================================
flox/visualize.py
=====================================
@@ -121,44 +121,44 @@ def get_colormap(N):
ncolors = len(cmap.colors)
q = N // ncolors
r = N % ncolors
- cmap = mpl.colors.ListedColormap(np.concatenate([cmap.colors] * q + [cmap.colors[:r]]))
- cmap.set_under(color="w")
+ cmap = mpl.colors.ListedColormap(np.concatenate([cmap.colors] * q + [cmap.colors[: r + 1]]))
+ cmap.set_under(color="k")
return cmap
-def factorize_cohorts(by, cohorts):
- factorized = np.full(by.shape, -1)
+def factorize_cohorts(chunks, cohorts):
+ chunk_grid = tuple(len(c) for c in chunks)
+ nchunks = np.prod(chunk_grid)
+ factorized = np.full((nchunks,), -1, dtype=np.int64)
for idx, cohort in enumerate(cohorts):
- factorized[np.isin(by, cohort)] = idx
- return factorized
+ factorized[list(cohort)] = idx
+ return factorized.reshape(chunk_grid)
-def visualize_cohorts_2d(by, array):
+def visualize_cohorts_2d(by, chunks):
assert by.ndim == 2
print("finding cohorts...")
- before_merged = find_group_cohorts(
- by, [array.chunks[ax] for ax in range(-by.ndim, 0)], merge=False
- ).values()
- merged = find_group_cohorts(
- by, [array.chunks[ax] for ax in range(-by.ndim, 0)], merge=True
- ).values()
+ chunks = [chunks[ax] for ax in range(-by.ndim, 0)]
+ before_merged = find_group_cohorts(by, chunks, merge=False)
+ merged = find_group_cohorts(by, chunks, merge=True)
print("finished cohorts...")
- xticks = np.cumsum(array.chunks[-1])
- yticks = np.cumsum(array.chunks[-2])
+ xticks = np.cumsum(chunks[-1])
+ yticks = np.cumsum(chunks[-2])
- f, ax = plt.subplots(2, 2, constrained_layout=True, sharex=True, sharey=True)
+ f, ax = plt.subplots(1, 3, constrained_layout=True, sharex=False, sharey=False)
ax = ax.ravel()
- ax[1].set_visible(False)
- ax = ax[[0, 2, 3]]
+ # ax[1].set_visible(False)
+ # ax = ax[[0, 2, 3]]
ngroups = len(_unique(by))
- h0 = ax[0].imshow(by, cmap=get_colormap(ngroups))
- h1 = _visualize_cohorts(by, before_merged, ax=ax[1])
- h2 = _visualize_cohorts(by, merged, ax=ax[2])
+ h0 = ax[0].imshow(by, vmin=0, cmap=get_colormap(ngroups))
+ h1 = _visualize_cohorts(chunks, before_merged, ax=ax[1])
+ h2 = _visualize_cohorts(chunks, merged, ax=ax[2])
for axx in ax:
axx.grid(True, which="both")
+ for axx in ax[:1]:
axx.set_xticks(xticks)
axx.set_yticks(yticks)
for h, axx in zip([h0, h1, h2], ax):
@@ -167,14 +167,15 @@ def visualize_cohorts_2d(by, array):
ax[0].set_title(f"by: {ngroups} groups")
ax[1].set_title(f"{len(before_merged)} cohorts")
ax[2].set_title(f"{len(merged)} merged cohorts")
- f.set_size_inches((6, 6))
+ f.set_size_inches((12, 6))
-def _visualize_cohorts(by, cohorts, ax=None):
+def _visualize_cohorts(chunks, cohorts, ax=None):
if ax is None:
_, ax = plt.subplots(1, 1)
- ax.imshow(factorize_cohorts(by, cohorts), vmin=0, cmap=get_colormap(len(cohorts)))
+ data = factorize_cohorts(chunks, cohorts)
+ return ax.imshow(data, vmin=0, cmap=get_colormap(len(cohorts)))
def visualize_groups_2d(labels, y0=0, **kwargs):
=====================================
flox/xarray.py
=====================================
@@ -28,10 +28,12 @@ if TYPE_CHECKING:
Dims = Union[str, Iterable[Hashable], None]
-def _restore_dim_order(result, obj, by):
+def _restore_dim_order(result, obj, by, no_groupby_reorder=False):
def lookup_order(dimension):
if dimension == by.name and by.ndim == 1:
(dimension,) = by.dims
+ if no_groupby_reorder:
+ return -1e6 # some arbitrarily low value
if dimension in obj.dims:
axis = obj.get_axis_num(dimension)
else:
@@ -491,7 +493,12 @@ def xarray_reduce(
template = obj
if actual[var].ndim > 1:
- actual[var] = _restore_dim_order(actual[var], template, by_da[0])
+ no_groupby_reorder = isinstance(
+ obj, xr.Dataset
+ ) # do not re-order dataarrays inside datasets
+ actual[var] = _restore_dim_order(
+ actual[var], template, by_da[0], no_groupby_reorder=no_groupby_reorder
+ )
if missing_dim:
for k, v in missing_dim.items():
=====================================
pyproject.toml
=====================================
@@ -22,6 +22,7 @@ dependencies = [
"numpy>=1.22",
"numpy_groupies>=0.9.19",
"toolz",
+ "scipy",
]
dynamic=["version"]
@@ -41,6 +42,7 @@ requires = [
"pandas",
"numpy>=1.22",
"numpy_groupies>=0.9.19",
+ "scipy",
"toolz",
"setuptools>=61.0.0",
"setuptools_scm[toml]>=7.0",
@@ -101,6 +103,7 @@ known-third-party = [
"pkg_resources",
"pytest",
"setuptools",
+ "scipy",
"xarray"
]
=====================================
tests/test_core.py
=====================================
@@ -179,7 +179,7 @@ def test_groupby_reduce(
elif func == "count":
expected_result = np.array(expected, dtype=np.intp)
- (result, groups) = groupby_reduce(
+ (result, *groups) = groupby_reduce(
array,
by,
func=func,
@@ -187,6 +187,7 @@ def test_groupby_reduce(
fill_value=123,
engine=engine,
)
+ (groups_array,) = groups
# we use pd.Index(expected_groups).to_numpy() which is always int64
# for the values in this test
if expected_groups is None:
@@ -196,7 +197,7 @@ def test_groupby_reduce(
else:
g_dtype = np.int64
- assert_equal(groups, np.array([0, 1, 2], g_dtype))
+ assert_equal(groups_array, np.array([0, 1, 2], g_dtype))
assert_equal(expected_result, result)
@@ -795,11 +796,14 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None:
labels = dask.array.from_array(labels, chunks=chunks)
with raise_if_dask_computes():
- actual, groups = groupby_reduce(
+ actual, *groups = groupby_reduce(
array, labels, func="count", fill_value=0, engine=engine, method=method, **kwargs
)
+ (groups_array,) = groups
expected = np.array([3, 1, 0], dtype=np.intp)
- for left, right in zip(groups, pd.IntervalIndex.from_arrays([1, 2, 4], [2, 4, 5]).to_numpy()):
+ for left, right in zip(
+ groups_array, pd.IntervalIndex.from_arrays([1, 2, 4], [2, 4, 5]).to_numpy()
+ ):
assert left == right
assert_equal(actual, expected)
@@ -830,12 +834,12 @@ def test_rechunk_for_blockwise(inchunks, expected):
@pytest.mark.parametrize(
"expected, labels, chunks, merge",
[
- [[[1, 2, 3, 4]], [1, 2, 3, 1, 2, 3, 4], (3, 4), True],
- [[[1, 2, 3], [4]], [1, 2, 3, 1, 2, 3, 4], (3, 4), False],
- [[[1], [2], [3], [4]], [1, 2, 3, 1, 2, 3, 4], (2, 2, 2, 1), False],
- [[[1], [2], [3], [4]], [1, 2, 3, 1, 2, 3, 4], (2, 2, 2, 1), True],
- [[[1, 2, 3], [4]], [1, 2, 3, 1, 2, 3, 4], (3, 3, 1), True],
- [[[1, 2, 3], [4]], [1, 2, 3, 1, 2, 3, 4], (3, 3, 1), False],
+ [[[0, 1, 2, 3]], [0, 1, 2, 0, 1, 2, 3], (3, 4), True],
+ [[[0, 1, 2], [3]], [0, 1, 2, 0, 1, 2, 3], (3, 4), False],
+ [[[0], [1], [2], [3]], [0, 1, 2, 0, 1, 2, 3], (2, 2, 2, 1), False],
+ [[[0], [1], [2], [3]], [0, 1, 2, 0, 1, 2, 3], (2, 2, 2, 1), True],
+ [[[0, 1, 2], [3]], [0, 1, 2, 0, 1, 2, 3], (3, 3, 1), True],
+ [[[0, 1, 2], [3]], [0, 1, 2, 0, 1, 2, 3], (3, 3, 1), False],
[
[[0], [1, 2, 3, 4], [5]],
np.repeat(np.arange(6), [4, 4, 12, 2, 3, 4]),
@@ -844,11 +848,26 @@ def test_rechunk_for_blockwise(inchunks, expected):
],
],
)
-def test_find_group_cohorts(expected, labels, chunks, merge):
+def test_find_group_cohorts(expected, labels, chunks: tuple[int], merge: bool) -> None:
actual = list(find_group_cohorts(labels, (chunks,), merge).values())
assert actual == expected, (actual, expected)
+ at pytest.mark.parametrize("chunksize", [12, 13, 14, 24, 36, 48, 72, 71])
+def test_verify_complex_cohorts(chunksize: int) -> None:
+ time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H"))
+ chunks = (chunksize,) * (len(time) // chunksize)
+ by = np.array(time.dt.dayofyear.values)
+
+ if len(by) != sum(chunks):
+ chunks += (len(by) - sum(chunks),)
+ chunk_cohorts = find_group_cohorts(by - 1, (chunks,))
+ chunks_ = np.sort(np.concatenate(tuple(chunk_cohorts.keys())))
+ groups = np.sort(np.concatenate(tuple(chunk_cohorts.values())))
+ assert_equal(np.unique(chunks_), np.arange(len(chunks), dtype=int))
+ assert_equal(groups, np.arange(366, dtype=int))
+
+
@requires_dask
@pytest.mark.parametrize(
"chunk_at,expected",
@@ -1034,13 +1053,13 @@ def test_bool_reductions(func, engine):
def test_map_reduce_blockwise_mixed() -> None:
t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
data = t.dt.dayofyear
- actual, _ = groupby_reduce(
+ actual, *_ = groupby_reduce(
dask.array.from_array(data.values, chunks=365),
t.dt.month,
func="mean",
method="map-reduce",
)
- expected, _ = groupby_reduce(data, t.dt.month, func="mean")
+ expected, *_ = groupby_reduce(data, t.dt.month, func="mean")
assert_equal(expected, actual)
=====================================
tests/test_xarray.py
=====================================
@@ -576,9 +576,56 @@ def test_fill_value_xarray_behaviour():
}
)
- expected_time = pd.date_range("2000-01-01", freq="3H", periods=19)
- expected = ds.reindex(time=expected_time)
- expected = ds.resample(time="3H").sum()
+ pd.date_range("2000-01-01", freq="3H", periods=19)
+ with xr.set_options(use_flox=False):
+ expected = ds.resample(time="3H").sum()
with xr.set_options(use_flox=True):
actual = ds.resample(time="3H").sum()
xr.testing.assert_identical(expected, actual)
+
+
+def test_fill_value_xarray_binning():
+ array = np.linspace(0, 10, 5 * 10, dtype=int).reshape(5, 10)
+
+ x = np.array([0, 0, 1, 2, 2])
+ y = np.arange(array.shape[1]) * 3
+ u = np.linspace(0, 1, 5)
+
+ data_array = xr.DataArray(data=array, coords={"x": x, "y": y, "u": ("x", u)}, dims=("x", "y"))
+ with xr.set_options(use_flox=False):
+ expected = data_array.groupby_bins("y", bins=4).mean()
+ with xr.set_options(use_flox=True):
+ actual = data_array.groupby_bins("y", bins=4).mean()
+
+ xr.testing.assert_identical(expected, actual)
+
+
+def test_groupby_2d_dataset():
+ d = {
+ "coords": {
+ "bit_index": {"dims": ("bit_index",), "attrs": {"name": "bit_index"}, "data": [0, 1]},
+ "index": {"dims": ("index",), "data": [0, 6, 8, 10, 14]},
+ "clifford": {"dims": ("index",), "attrs": {}, "data": [1, 1, 4, 10, 4]},
+ },
+ "dims": {"bit_index": 2, "index": 5},
+ "data_vars": {
+ "counts": {
+ "dims": ("bit_index", "index"),
+ "attrs": {
+ "name": "counts",
+ },
+ "data": [[18, 30, 45, 70, 38], [382, 370, 355, 330, 362]],
+ }
+ },
+ }
+
+ ds = xr.Dataset.from_dict(d)
+
+ with xr.set_options(use_flox=False):
+ expected = ds.groupby("clifford").mean()
+ with xr.set_options(use_flox=True):
+ actual = ds.groupby("clifford").mean()
+ assert (
+ expected.counts.dims == actual.counts.dims
+ ) # https://github.com/pydata/xarray/issues/8292
+ xr.testing.assert_identical(expected, actual)
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/1306583c9cf398e40bee6215c6dc398b10660571...4c03516ee7bb7d48114c6e857f5782e3c778d84a
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/1306583c9cf398e40bee6215c6dc398b10660571...4c03516ee7bb7d48114c6e857f5782e3c778d84a
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20231202/b82bbd97/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list