[Git][debian-gis-team/flox][upstream] New upstream version 0.10.4
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Sun Aug 10 16:00:07 BST 2025
Antonio Valentino pushed to branch upstream at Debian GIS Project / flox
Commits:
62f08092 by Antonio Valentino at 2025-08-08T06:01:43+00:00
New upstream version 0.10.4
- - - - -
20 changed files:
- .github/workflows/benchmarks.yml
- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .github/workflows/upstream-dev-ci.yaml
- ci/env-numpy1.yml
- ci/environment.yml
- ci/no-dask.yml
- ci/no-numba.yml
- ci/no-xarray.yml
- docs/source/arrays.md
- flox/aggregate_flox.py
- + flox/aggregate_sparse.py
- flox/aggregations.py
- flox/core.py
- flox/lib.py
- flox/xrutils.py
- tests/__init__.py
- tests/strategies.py
- tests/test_core.py
- tests/test_properties.py
Changes:
=====================================
.github/workflows/benchmarks.yml
=====================================
@@ -10,7 +10,7 @@ jobs:
# if: ${{ contains( github.event.pull_request.labels.*.name, 'run-benchmark') && github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} # Run if the PR has been labelled correctly.
if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} # Always run.
name: Linux
- runs-on: ubuntu-20.04
+ runs-on: ubuntu-latest
env:
ASV_DIR: "./asv_bench"
=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -77,7 +77,7 @@ jobs:
--ignore flox/tests \
--cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v5.4.0
+ uses: codecov/codecov-action at v5.4.2
with:
file: ./coverage.xml
flags: unittests
@@ -132,7 +132,7 @@ jobs:
python -m mypy --install-types --non-interactive --cache-dir=.mypy_cache/ --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
- uses: codecov/codecov-action at v5.4.0
+ uses: codecov/codecov-action at v5.4.2
with:
file: mypy_report/cobertura.xml
flags: mypy
=====================================
.github/workflows/ci.yaml
=====================================
@@ -76,7 +76,7 @@ jobs:
python -c "import xarray; xarray.show_versions()"
pytest --durations=20 --durations-min=0.5 -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v5.4.0
+ uses: codecov/codecov-action at v5.4.2
with:
file: ./coverage.xml
flags: unittests
=====================================
.github/workflows/upstream-dev-ci.yaml
=====================================
@@ -78,7 +78,8 @@ jobs:
git+https://github.com/Unidata/cftime
python -m pip install \
git+https://github.com/dask/dask \
- git+https://github.com/ml31415/numpy-groupies
+ git+https://github.com/ml31415/numpy-groupies \
+ git+https://github.com/pydata/sparse
- name: Install flox
run: |
=====================================
ci/env-numpy1.yml
=====================================
@@ -11,6 +11,7 @@ dependencies:
- pandas
- numpy<2
- scipy
+ - sparse
- lxml # for mypy coverage report
- matplotlib
- pip
=====================================
ci/environment.yml
=====================================
@@ -11,6 +11,7 @@ dependencies:
- pandas
- numpy>=1.22
- scipy
+ - sparse
- lxml # for mypy coverage report
- matplotlib
- pip
=====================================
ci/no-dask.yml
=====================================
@@ -8,6 +8,7 @@ dependencies:
- cftime
- numpy>=1.22
- scipy
+ - sparse
- pip
- pytest
- pytest-cov
=====================================
ci/no-numba.yml
=====================================
@@ -11,6 +11,7 @@ dependencies:
- pandas
- numpy>=1.22
- scipy
+ - sparse
- lxml # for mypy coverage report
- matplotlib
- pip
=====================================
ci/no-xarray.yml
=====================================
@@ -7,6 +7,7 @@ dependencies:
- pandas
- numpy>=1.22
- scipy
+ - sparse
- pip
- pytest
- pytest-cov
=====================================
docs/source/arrays.md
=====================================
@@ -1,5 +1,13 @@
# Duck Array Support
+## Sparse Arrays
+
+`sparse.COO` arrays from the `pydata/sparse` project are supported using algorithms that work on the underlying dense data.
+See `aggregate_sparse.py` for details.
+At the moment the following reductions are supported: `sum`, `nansum`, `min`, `nanmin`, `max`, `nanmax`, `count`.
+
+## Other array types
+
Aggregating over other array types will work if the array types supports the following methods, [ufunc.reduceat](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.reduceat.html) or [ufunc.at](https://numpy.org/doc/stable/reference/generated/numpy.ufunc.at.html)
| Reduction | `method="numpy"` | `method="flox"` |
=====================================
flox/aggregate_flox.py
=====================================
@@ -146,7 +146,7 @@ def _np_grouped_op(
# assumes input is sorted, which I do in core._prepare_for_flox
aux = group_idx
- flag = np.concatenate((np.array([True], like=array), aux[1:] != aux[:-1]))
+ flag = np.concatenate((np.asarray([True], like=aux), aux[1:] != aux[:-1]))
uniques = aux[flag]
(inv_idx,) = flag.nonzero()
@@ -165,7 +165,7 @@ def _np_grouped_op(
out = np.full((nq,) + array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
kwargs["group_idx"] = group_idx
- if (len(uniques) == size) and (uniques == np.arange(size, like=array)).all():
+ if (len(uniques) == size) and (uniques == np.arange(size, like=aux)).all():
# The previous version of this if condition
# ((uniques[1:] - uniques[:-1]) == 1).all():
# does not work when group_idx is [1, 2] for e.g.
@@ -257,7 +257,7 @@ def ffill(group_idx, array, *, axis, **kwargs):
ndim = array.ndim
assert axis == (ndim - 1), (axis, ndim - 1)
- flag = np.concatenate((np.array([True], like=array), group_idx[1:] != group_idx[:-1]))
+ flag = np.concatenate((np.asarray([True], like=group_idx), group_idx[1:] != group_idx[:-1]))
(group_starts,) = flag.nonzero()
# https://stackoverflow.com/questions/41190852/most-efficient-way-to-forward-fill-nan-values-in-numpy-array
=====================================
flox/aggregate_sparse.py
=====================================
@@ -0,0 +1,201 @@
+# Unlike the other aggregate_* submodules, this one simply defines a wrapper function
+# because we run the groupby on the underlying dense data.
+
+from functools import partial
+
+import numpy as np
+import sparse
+
+from flox.core import _factorize_multiple, _is_sparse_supported_reduction, factorize_
+from flox.xrdtypes import INF, NINF, _get_fill_value
+from flox.xrutils import notnull
+
+
+def nanadd(a, b):
+ """
+ Annoyingly, there is no numpy ufunc for nan-skipping elementwise addition
+ unlike np.fmin, np.fmax :(
+
+ From https://stackoverflow.com/a/50642947/1707127
+ """
+ return np.where(np.isnan(a + b), np.where(np.isnan(a), b, a), a + b)
+
+
+BINARY_OPS = {
+ "sum": np.add,
+ "nansum": nanadd,
+ "max": np.maximum,
+ "nanmax": np.fmax,
+ "min": np.minimum,
+ "nanmin": np.fmin,
+}
+HYPER_OPS = {"sum": np.multiply, "nansum": np.multiply}
+IDENTITY = {
+ "sum": 0,
+ "nansum": 0,
+ "prod": 1,
+ "nanprod": 1,
+ "max": NINF,
+ "nanmax": NINF,
+ "min": INF,
+ "nanmin": INF,
+}
+
+
+def _sparse_agg(
+ group_idx: np.ndarray,
+ array: sparse.COO,
+ func: str,
+ engine: str,
+ axis: int = -1,
+ size: int | None = None,
+ fill_value=None,
+ dtype=None,
+ **kwargs,
+):
+ """Wrapper function, that unwraps the underlying dense arrays, executes the groupby,
+ and constructs the output sparse array."""
+ from flox.aggregations import generic_aggregate
+
+ if not isinstance(array, sparse.COO):
+ raise ValueError("Sparse aggregations only supported for sparse.COO arrays")
+
+ if not _is_sparse_supported_reduction(func):
+ raise ValueError(f"{func} is unsupported for sparse arrays.")
+
+ group_idx_subset = group_idx[array.coords[axis, :]]
+ if array.ndim > 1:
+ new_by = tuple(array.coords[:axis, :]) + (group_idx_subset,)
+ else:
+ new_by = (group_idx_subset,)
+ codes, groups, shape = _factorize_multiple(
+ new_by, expected_groups=(None,) * len(new_by), any_by_dask=False
+ )
+ # factorize again so we can construct a sparse result
+ sparse_codes, sparse_groups, sparse_shape, _, sparse_size, _ = factorize_(codes, axes=(0,))
+
+ dense_result = generic_aggregate(
+ sparse_codes,
+ array.data,
+ func=func,
+ engine=engine,
+ dtype=dtype,
+ size=sparse_size,
+ fill_value=fill_value,
+ )
+ dense_counts = generic_aggregate(
+ sparse_codes,
+ array.data,
+ # This counts is used to handle fill_value, so we need a count
+ # of populated data, regardless of NaN value
+ func="len",
+ engine=engine,
+ dtype=int,
+ size=sparse_size,
+ fill_value=0,
+ )
+ assert len(sparse_groups) == 1
+ result_coords = np.stack(tuple(g[i] for g, i in zip(groups, np.unravel_index(*sparse_groups, shape))))
+
+ full_shape = array.shape[:-1] + (size,)
+ count = sparse.COO(coords=result_coords, data=dense_counts, shape=full_shape, fill_value=0)
+
+ assert axis in (-1, array.ndim - 1)
+ grouped_count = generic_aggregate(
+ group_idx, group_idx, engine=engine, func="len", dtype=np.int64, size=size, fill_value=0
+ )
+ total_count = sparse.COO.from_numpy(
+ np.expand_dims(grouped_count, tuple(range(array.ndim - 1))), fill_value=0
+ )
+
+ assert func in BINARY_OPS
+ binop = BINARY_OPS[func]
+ ident = _get_fill_value(array.dtype, IDENTITY[func])
+ diff_count = total_count - count
+ if (hyper_op := HYPER_OPS.get(func, None)) is not None:
+ fill = hyper_op(diff_count, array.fill_value) if (diff_count > 0).any() else ident
+ else:
+ if "max" in func or "min" in func:
+ # Note that fill_value for total_count, and count is 0.
+ # So the fill_value for the `fill` result is the False branch i.e. `ident`
+ fill = np.where(diff_count > 0, array.fill_value, ident)
+ else:
+ raise NotImplementedError
+
+ result = sparse.COO(coords=result_coords, data=dense_result, shape=full_shape, fill_value=ident)
+ with_fill = binop(result, fill)
+ return with_fill
+
+
+def nanlen(
+ group_idx: np.ndarray,
+ array: sparse.COO,
+ engine: str,
+ axis: int = -1,
+ size: int | None = None,
+ fill_value=None,
+ dtype=None,
+ **kwargs,
+):
+ new_array = sparse.COO(
+ coords=array.coords,
+ data=notnull(array.data),
+ shape=array.shape,
+ fill_value=notnull(array.fill_value),
+ )
+ return _sparse_agg(
+ group_idx, new_array, func="sum", engine=engine, axis=axis, size=size, fill_value=0, dtype=dtype
+ )
+
+
+def mean(
+ group_idx: np.ndarray,
+ array: sparse.COO,
+ engine: str,
+ axis: int = -1,
+ size: int | None = None,
+ fill_value=None,
+ dtype=None,
+ **kwargs,
+):
+ sums = sum(
+ group_idx, array, func="sum", engine=engine, axis=axis, size=size, fill_value=fill_value, dtype=dtype
+ )
+ counts = nanlen(
+ group_idx, array, func="sum", engine=engine, axis=axis, size=size, fill_value=0, dtype=dtype
+ )
+ return sums / counts
+
+
+def nanmean(
+ group_idx: np.ndarray,
+ array: sparse.COO,
+ engine: str,
+ axis: int = -1,
+ size: int | None = None,
+ fill_value=None,
+ dtype=None,
+ **kwargs,
+):
+ sums = sum(
+ group_idx,
+ array,
+ func="nansum",
+ engine=engine,
+ axis=axis,
+ size=size,
+ fill_value=fill_value,
+ dtype=dtype,
+ )
+ counts = nanlen(
+ group_idx, array, func="sum", engine=engine, axis=axis, size=size, fill_value=0, dtype=dtype
+ )
+ return sums / counts
+
+
+sum = partial(_sparse_agg, func="sum")
+nansum = partial(_sparse_agg, func="nansum")
+max = partial(_sparse_agg, func="max")
+nanmax = partial(_sparse_agg, func="nanmax")
+min = partial(_sparse_agg, func="min")
+nanmin = partial(_sparse_agg, func="nanmin")
=====================================
flox/aggregations.py
=====================================
@@ -14,6 +14,7 @@ from numpy.typing import ArrayLike, DTypeLike
from . import aggregate_flox, aggregate_npg, xrutils
from . import xrdtypes as dtypes
+from .lib import sparse_array_type
if TYPE_CHECKING:
FuncTuple = tuple[Callable | str, ...]
@@ -72,7 +73,14 @@ def generic_aggregate(
if func in ["nanfirst", "nanlast"] and array.dtype.kind in "US":
func = func[3:]
- if engine == "flox":
+ if is_sparse := isinstance(array, sparse_array_type):
+ # this is not an infinite loop because aggregate_sparse will call
+ # generic_aggregate with dense data
+ from flox import aggregate_sparse
+
+ method = partial(getattr(aggregate_sparse, func), engine=engine)
+
+ elif engine == "flox":
try:
method = getattr(aggregate_flox, func)
except AttributeError:
@@ -105,7 +113,9 @@ def generic_aggregate(
f"Expected engine to be one of ['flox', 'numpy', 'numba', 'numbagg']. Received {engine} instead."
)
- group_idx = np.asarray(group_idx, like=array)
+ # UGLY! but this avoids auto-densification errors
+ if not is_sparse:
+ group_idx = np.asarray(group_idx, like=array)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered")
=====================================
flox/core.py
=====================================
@@ -48,7 +48,7 @@ from .aggregations import (
quantile_new_dims_func,
)
from .cache import memoize
-from .lib import ArrayLayer
+from .lib import ArrayLayer, dask_array_type, sparse_array_type
from .xrutils import (
_contains_cftime_datetimes,
_to_pytimedelta,
@@ -178,11 +178,11 @@ class ReindexStrategy:
self.blockwise = True if self.blockwise is None else self.blockwise
def get_dask_meta(self, other, *, fill_value, dtype) -> Any:
- import dask
-
if self.array_type is ReindexArrayType.AUTO:
- other_type = type(other._meta) if isinstance(other, dask.array.Array) else type(other)
- return other_type([], dtype=dtype)
+ other = other._meta if isinstance(other, dask_array_type) else other
+ if isinstance(other, sparse_array_type):
+ return type(other).from_numpy(np.array([], dtype=dtype))
+ return type(other)([], dtype=dtype)
elif self.array_type is ReindexArrayType.NUMPY:
return np.ndarray([], dtype=dtype)
elif self.array_type is ReindexArrayType.SPARSE_COO:
@@ -257,6 +257,12 @@ def _is_bool_supported_reduction(func: T_Agg) -> bool:
def _is_sparse_supported_reduction(func: T_Agg) -> bool:
+ if isinstance(func, Aggregation):
+ func = func.name
+ return not _is_arg_reduction(func) and any(f in func for f in ["len", "sum", "max", "min", "mean"])
+
+
+def _is_reindex_sparse_supported_reduction(func: T_Agg) -> bool:
if isinstance(func, Aggregation):
func = func.name
return HAS_SPARSE and all(f not in func for f in ["first", "last", "prod", "var", "std"])
@@ -855,10 +861,12 @@ def reindex_(
new_dtype = array.dtype
if array_type is ReindexArrayType.AUTO:
- # TODO: generalize here
- # Right now, we effectively assume NEP-18 I think
- # assert isinstance(array, np.ndarray)
- array_type = ReindexArrayType.NUMPY
+ if isinstance(array, sparse_array_type):
+ array_type = ReindexArrayType.SPARSE_COO
+ else:
+ # TODO: generalize here
+ # Right now, we effectively assume NEP-18 I think
+ array_type = ReindexArrayType.NUMPY
if array_type is ReindexArrayType.NUMPY:
reindexed = reindex_numpy(array, from_, to, fill_value, new_dtype, axis)
@@ -1244,7 +1252,7 @@ def chunk_reduce(
previous_reduction: T_Func = ""
for reduction, fv, kw, dt in zip(funcs, fill_values, kwargss, dtypes):
if empty:
- result = np.full(shape=final_array_shape, fill_value=fv)
+ result = np.full(shape=final_array_shape, fill_value=fv, like=array)
elif is_nanlen(reduction) and is_nanlen(previous_reduction):
result = results["intermediates"][-1]
else:
@@ -1338,6 +1346,15 @@ def _finalize_results(
if fill_value is xrdtypes.NA:
new_dtype, fill_value = xrdtypes.maybe_promote(finalized[agg.name].dtype)
finalized[agg.name] = finalized[agg.name].astype(new_dtype)
+
+ if isinstance(count_mask, sparse_array_type):
+ # This is guaranteed because for `counts`, we use `fill_value=0`
+ # and we are in the counts < (min_count > 0) branch.
+ # We need to flip the fill_value so that the `np.where` call preserves
+ # the fill_value of finalized[agg.name], this is needed when dask concatenates results.
+ assert count_mask.fill_value is np.True_, "Logic bug. I expected fill_value to be True"
+ count_mask = type(count_mask).from_numpy(count_mask.todense(), fill_value=False)
+
finalized[agg.name] = np.where(count_mask, fill_value, finalized[agg.name])
# Final reindexing has to be here to be lazy
@@ -1373,7 +1390,9 @@ def _aggregate(
def _expand_dims(results: IntermediateDict) -> IntermediateDict:
- results["intermediates"] = tuple(np.expand_dims(array, DUMMY_AXIS) for array in results["intermediates"])
+ results["intermediates"] = tuple(
+ np.expand_dims(array, axis=DUMMY_AXIS) for array in results["intermediates"]
+ )
return results
@@ -2861,7 +2880,7 @@ def groupby_reduce(
if reindex.array_type is ReindexArrayType.SPARSE_COO:
if not HAS_SPARSE:
raise ImportError("Package 'sparse' must be installed to reindex to a sparse.COO array.")
- if not _is_sparse_supported_reduction(func):
+ if not _is_reindex_sparse_supported_reduction(func):
raise NotImplementedError(
f"Aggregation {func=!r} is not supported when reindexing to a sparse array. "
"Please raise an issue"
=====================================
flox/lib.py
=====================================
@@ -2,6 +2,20 @@ from dataclasses import dataclass
from .types import DaskArray, Graph
+try:
+ import dask.array as da
+
+ dask_array_type = da.Array
+except ImportError:
+ dask_array_type = () # type: ignore[assignment, misc]
+
+try:
+ import sparse
+
+ sparse_array_type = sparse.COO
+except ImportError:
+ sparse_array_type = ()
+
@dataclass
class ArrayLayer:
=====================================
flox/xrutils.py
=====================================
@@ -155,8 +155,11 @@ def notnull(data):
return np.broadcast_to(np.array(True), data.shape)
else:
out = isnull(data)
- np.logical_not(out, out=out)
- return out
+ if np.isscalar(out):
+ return ~out
+ else:
+ np.logical_not(out, out=out)
+ return out
def isnull(data: Any):
=====================================
tests/__init__.py
=====================================
@@ -1,28 +1,16 @@
import importlib
from contextlib import nullcontext
+from typing import Any
import numpy as np
import packaging.version
import pandas as pd
import pytest
-pd_types = (pd.Index,)
-
-try:
- import dask
- import dask.array as da
-
- dask_array_type = da.Array
-except ImportError:
- dask_array_type = () # type: ignore[assignment, misc]
-
-try:
- import sparse
-
- sparse_array_type = sparse.COO
-except ImportError:
- sparse_array_type = ()
+from flox.lib import dask_array_type, sparse_array_type
+from flox.xrutils import is_duck_dask_array
+pd_types = (pd.Index,)
try:
import xarray as xr
@@ -72,6 +60,8 @@ class CountingScheduler:
self.max_computes = max_computes
def __call__(self, dsk, keys, **kwargs):
+ import dask
+
self.total_computes += 1
if self.total_computes > self.max_computes:
raise RuntimeError(f"Too many computes. Total: {self.total_computes} > max: {self.max_computes}.")
@@ -82,6 +72,8 @@ def raise_if_dask_computes(max_computes=0):
# return a dummy context manager so that this can be used for non-dask objects
if not has_dask:
return nullcontext()
+ import dask
+
scheduler = CountingScheduler(max_computes)
return dask.config.set(scheduler=scheduler)
@@ -266,3 +258,24 @@ def dask_assert_eq(
f"(meta: {type(b_meta)}, computed: {type(b_computed)})"
)
assert type(b_meta) is type(b_computed), msg
+
+
+def to_numpy(data) -> np.ndarray[Any, np.dtype[Any]]:
+ try:
+ return data.to_numpy()
+ except AttributeError:
+ pass
+
+ # TODO first attempt to call .to_numpy() once some libraries implement it
+ if is_duck_dask_array(data):
+ data = data.compute()
+ # if isinstance(data, array_type("cupy")):
+ # data = data.get()
+ # # pint has to be imported dynamically as pint imports xarray
+ # if isinstance(data, array_type("pint")):
+ # data = data.magnitude
+ if isinstance(data, sparse_array_type):
+ data = data.todense()
+ data = np.asarray(data)
+
+ return data
=====================================
tests/strategies.py
=====================================
@@ -8,6 +8,7 @@ import dask
import hypothesis.extra.numpy as npst
import hypothesis.strategies as st
import numpy as np
+import sparse
from . import ALL_FUNCS, SCIPY_STATS_FUNCS
@@ -65,6 +66,20 @@ def cftime_arrays(
return cftime.num2date(values, units=unit, calendar=cal)
+def insert_nans(draw: st.DrawFn, array: np.ndarray) -> np.ndarray:
+ if array.dtype.kind in "cf":
+ nan_idx = draw(
+ st.lists(
+ st.integers(min_value=0, max_value=array.shape[-1] - 1),
+ max_size=array.shape[-1] - 1,
+ unique=True,
+ )
+ )
+ if nan_idx:
+ array[..., nan_idx] = np.nan
+ return array
+
+
numeric_dtypes = (
npst.integer_dtypes(endianness="=")
| npst.unsigned_integer_dtypes(endianness="=")
@@ -96,20 +111,18 @@ NON_NUMPY_FUNCS = [
SKIPPED_FUNCS = ["var", "std", "nanvar", "nanstd"]
func_st = st.sampled_from([f for f in ALL_FUNCS if f not in NON_NUMPY_FUNCS and f not in SKIPPED_FUNCS])
-numeric_arrays = npst.arrays(
- elements={"allow_subnormal": False}, shape=npst.array_shapes(), dtype=numeric_dtypes
-)
-numeric_like_arrays = npst.arrays(
- elements={"allow_subnormal": False}, shape=npst.array_shapes(), dtype=numeric_like_dtypes
-)
-all_arrays = (
- npst.arrays(
- elements={"allow_subnormal": False},
- shape=npst.array_shapes(),
- dtype=numeric_like_dtypes,
- )
- | cftime_arrays()
-)
+
+
+ at st.composite
+def numpy_arrays(draw: st.DrawFn, *, dtype) -> np.ndarray:
+ array = draw(npst.arrays(elements={"allow_subnormal": False}, shape=npst.array_shapes(), dtype=dtype))
+ array = insert_nans(draw, array)
+ return array
+
+
+numeric_arrays = numpy_arrays(dtype=numeric_dtypes)
+numeric_like_arrays = numpy_arrays(dtype=numeric_like_dtypes)
+all_arrays = numeric_like_arrays | cftime_arrays()
def by_arrays(
@@ -153,16 +166,26 @@ def chunked_arrays(
) -> dask.array.Array:
array = draw(arrays)
chunks = draw(chunks(shape=array.shape))
+ return from_array(array, chunks=chunks)
- if array.dtype.kind in "cf":
- nan_idx = draw(
- st.lists(
- st.integers(min_value=0, max_value=array.shape[-1] - 1),
- max_size=array.shape[-1] - 1,
- unique=True,
- )
- )
- if nan_idx:
- array[..., nan_idx] = np.nan
- return from_array(array, chunks=chunks)
+ at st.composite
+def sparse_arrays(
+ draw: st.DrawFn,
+ *,
+ elements={"allow_subnormal": False},
+ shapes=npst.array_shapes(),
+ dtypes=npst.boolean_dtypes() | numeric_dtypes,
+ sparse_class=sparse.COO,
+) -> sparse.COO:
+ dtype = draw(dtypes)
+ fill_value = draw(npst.from_dtype(dtype=dtype, **elements))
+ # assume(dtype.kind not in "mM") # sparse doesn't support .view
+ array = draw(npst.arrays(elements=elements, shape=shapes, dtype=st.just(dtype), fill=st.just(fill_value)))
+ if draw(st.booleans()) and dtype.kind == "f":
+ array = insert_nans(draw, array)
+ if draw(st.booleans()) and dtype.kind == "f":
+ # need to increase probability of NaN fill_value
+ fill_value = np.nan
+ sparse_array = sparse_class.from_numpy(array, fill_value=fill_value)
+ return sparse_array
=====================================
tests/test_core.py
=====================================
@@ -49,6 +49,7 @@ from . import (
requires_cubed,
requires_dask,
requires_sparse,
+ to_numpy,
)
logger = logging.getLogger("flox")
@@ -221,6 +222,7 @@ def gen_array_by(size, func):
return array, by
+ at pytest.mark.parametrize("to_sparse", [pytest.param(True, marks=requires_sparse), False])
@pytest.mark.parametrize(
"chunks",
[
@@ -230,15 +232,21 @@ def gen_array_by(size, func):
pytest.param(4, marks=requires_dask),
],
)
- at pytest.mark.parametrize("size", ((1, 12), (12,), (12, 9)))
+ at pytest.mark.parametrize("size", [(1, 12), (12,), (12, 9)])
@pytest.mark.parametrize("nby", [1, 2, 3])
@pytest.mark.parametrize("add_nan_by", [True, False])
@pytest.mark.parametrize("func", ALL_FUNCS)
-def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
+def test_groupby_reduce_all(to_sparse, nby, size, chunks, func, add_nan_by, engine):
if ("arg" in func and engine in ["flox", "numbagg"]) or (func in BLOCKWISE_FUNCS and chunks != -1):
pytest.skip()
array, by = gen_array_by(size, func)
+ if to_sparse:
+ import sparse
+
+ array = sparse.COO.from_numpy(array)
+ if not _is_sparse_supported_reduction(func):
+ pytest.skip()
if chunks:
array = dask.array.from_array(array, chunks=chunks)
by = (by,) * nby
@@ -279,7 +287,7 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
warnings.filterwarnings("ignore", r"Mean of empty slice")
# computing silences a bunch of dask warnings
- array_ = array.compute() if chunks is not None else array
+ array_ = to_numpy(array)
if "arg" in func and add_nan_by:
# NaNs are in by, but we can't call np.argmax([..., NaN, .. ])
# That would return index of the NaN
@@ -291,7 +299,7 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
else:
expected = array_func(array_[..., ~nanmask], axis=-1, **kwargs)
for _ in range(nby):
- expected = np.expand_dims(expected, -1)
+ expected = np.expand_dims(expected, axis=-1)
if func in BLOCKWISE_FUNCS:
assert chunks == -1
@@ -1966,7 +1974,7 @@ def test_nanlen_string(dtype, engine) -> None:
assert_equal(expected, actual)
-def test_cumusm() -> None:
+def test_cumsum() -> None:
array = np.array([1, 1, 1], dtype=np.uint64)
by = np.array([0] * array.shape[-1])
expected = np.nancumsum(array, axis=-1)
@@ -2131,10 +2139,11 @@ def test_reindex_sparse(size):
assert mocked_reindex_func.call_count > 1
+ at requires_dask
def test_sparse_errors():
call = partial(
groupby_reduce,
- [1, 2, 3],
+ dask.array.from_array([1, 2, 3], chunks=(1,)),
[0, 1, 1],
reindex=REINDEX_SPARSE_STRAT,
fill_value=0,
@@ -2148,3 +2157,34 @@ def test_sparse_errors():
else:
with pytest.raises(ValueError):
call(func="first")
+
+
+ at requires_sparse
+ at pytest.mark.parametrize(
+ "chunks",
+ [
+ None,
+ pytest.param(-1, marks=requires_dask),
+ pytest.param(3, marks=requires_dask),
+ pytest.param(4, marks=requires_dask),
+ ],
+)
+ at pytest.mark.parametrize("shape", [(1, 12), (12,), (12, 9)])
+ at pytest.mark.parametrize("fill_value", [np.nan, 0])
+ at pytest.mark.parametrize("func", ["sum", "mean", "min", "max", "nansum", "nanmean", "nanmin", "nanmax"])
+def test_sparse_nan_fill_value_reductions(chunks, fill_value, shape, func):
+ import sparse
+
+ numpy_array, by = gen_array_by(shape, func)
+ array = sparse.COO.from_numpy(numpy_array, fill_value=fill_value)
+ if chunks:
+ array = dask.array.from_array(array, chunks=chunks)
+
+ npfunc = _get_array_func(func)
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", r"Mean of empty slice")
+ warnings.filterwarnings("ignore", r"All-NaN slice encountered")
+ # warnings.filterwarnings("ignore", r"encountered in divide")
+ expected = np.expand_dims(npfunc(numpy_array, axis=-1), axis=-1)
+ actual, *_ = groupby_reduce(array, by, func=func, axis=-1)
+ assert_equal(actual, expected)
=====================================
tests/test_properties.py
=====================================
@@ -7,6 +7,7 @@ import pytest
pytest.importorskip("hypothesis")
pytest.importorskip("dask")
+pytest.importorskip("sparse")
pytest.importorskip("cftime")
import dask
@@ -16,11 +17,27 @@ import numpy as np
from hypothesis import assume, given, note, settings
import flox
-from flox.core import groupby_reduce, groupby_scan
-from flox.xrutils import _contains_cftime_datetimes, _to_pytimedelta, datetime_to_numeric, isnull, notnull
+from flox.core import _is_sparse_supported_reduction, groupby_reduce, groupby_scan
+from flox.lib import sparse_array_type
+from flox.xrutils import (
+ _contains_cftime_datetimes,
+ _to_pytimedelta,
+ datetime_to_numeric,
+ is_duck_dask_array,
+ isnull,
+ notnull,
+)
-from . import BLOCKWISE_FUNCS, assert_equal
-from .strategies import all_arrays, by_arrays, chunked_arrays, func_st, numeric_dtypes, numeric_like_arrays
+from . import BLOCKWISE_FUNCS, assert_equal, to_numpy
+from .strategies import (
+ all_arrays,
+ by_arrays,
+ chunked_arrays,
+ func_st,
+ numeric_dtypes,
+ numeric_like_arrays,
+ sparse_arrays,
+)
from .strategies import chunks as chunks_strategy
dask.config.set(scheduler="sync")
@@ -147,20 +164,27 @@ def test_groupby_reduce(data, array, func: str) -> None:
assert_equal(expected, actual, tolerance)
+ at settings(deadline=None)
@given(
data=st.data(),
- array=chunked_arrays(arrays=numeric_like_arrays),
+ array=chunked_arrays(arrays=numeric_like_arrays | sparse_arrays()) | sparse_arrays(),
func=func_st,
)
-def test_groupby_reduce_numpy_vs_dask(data, array, func: str) -> None:
- numpy_array = array.compute()
+def test_groupby_reduce_numpy_vs_other(data, array, func: str) -> None:
+ if (
+ isinstance(array, sparse_array_type)
+ or (is_duck_dask_array(array) and isinstance(array._meta, sparse_array_type))
+ and not _is_sparse_supported_reduction(func)
+ ):
+ assume(False)
+ numpy_array = to_numpy(array)
# overflow behaviour differs between bincount and sum (for example)
assume(not_overflowing_array(numpy_array))
# TODO: fix var for complex numbers upstream
assume(not (("quantile" in func or "var" in func or "std" in func) and array.dtype.kind == "c"))
# # arg* with nans in array are weird
assume("arg" not in func and not np.any(isnull(numpy_array.ravel())))
- if func in ["nanmedian", "nanquantile", "median", "quantile"]:
+ if hasattr(array, "rechunk") and func in ["nanmedian", "nanquantile", "median", "quantile"]:
array = array.rechunk({-1: -1})
axis = -1
@@ -175,12 +199,12 @@ def test_groupby_reduce_numpy_vs_dask(data, array, func: str) -> None:
**flox_kwargs,
finalize_kwargs=kwargs,
)
- result_dask, *_ = groupby_reduce(array, by, **kwargs)
+ result_other, *_ = groupby_reduce(array, by, **kwargs)
result_numpy, *_ = groupby_reduce(numpy_array, by, **kwargs)
- assert_equal(result_numpy, result_dask)
+ assert isinstance(result_other, type(array))
+ assert_equal(result_numpy, result_other)
- at settings(report_multiple_bugs=False)
@given(
data=st.data(),
array=chunked_arrays(arrays=numeric_like_arrays),
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/62f08092a586642b2fe27134a180700ea512e8f0
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/62f08092a586642b2fe27134a180700ea512e8f0
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20250810/09b04f57/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list