[Git][debian-gis-team/flox][master] 4 commits: New upstream version 0.9.2
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Sat Feb 10 11:48:24 GMT 2024
Antonio Valentino pushed to branch master at Debian GIS Project / flox
Commits:
54cced1c by Antonio Valentino at 2024-02-10T10:55:40+00:00
New upstream version 0.9.2
- - - - -
bee6cf4c by Antonio Valentino at 2024-02-10T10:55:41+00:00
Update upstream source from tag 'upstream/0.9.2'
Update to upstream version '0.9.2'
with Debian dir 41e546e0f9340ad2944402d47a2f6fdb357e5dd2
- - - - -
459a453f by Antonio Valentino at 2024-02-10T10:56:25+00:00
New upstream release
- - - - -
6d6a2365 by Antonio Valentino at 2024-02-10T10:59:17+00:00
Set distribution to unstable
- - - - -
16 changed files:
- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .github/workflows/testpypi-release.yaml
- .github/workflows/upstream-dev-ci.yaml
- ci/environment.yml
- ci/no-dask.yml
- debian/changelog
- flox/aggregate_flox.py
- flox/aggregate_npg.py
- flox/aggregate_numbagg.py
- flox/aggregations.py
- flox/core.py
- flox/xarray.py
- flox/xrutils.py
- tests/test_core.py
- tests/test_xarray.py
Changes:
=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -41,7 +41,7 @@ jobs:
env:
CONDA_ENV_FILE: ci/environment.yml
- PYTHON_VERSION: "3.11"
+ PYTHON_VERSION: "3.12"
steps:
- uses: actions/checkout at v4
@@ -77,7 +77,7 @@ jobs:
--ignore flox/tests \
--cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v3.1.4
+ uses: codecov/codecov-action at v4.0.0
with:
file: ./coverage.xml
flags: unittests
@@ -95,7 +95,7 @@ jobs:
shell: bash -l {0}
env:
CONDA_ENV_FILE: ci/environment.yml
- PYTHON_VERSION: "3.11"
+ PYTHON_VERSION: "3.12"
steps:
- uses: actions/checkout at v4
@@ -131,7 +131,7 @@ jobs:
python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
- uses: codecov/codecov-action at v3.1.4
+ uses: codecov/codecov-action at v4.0.0
with:
file: mypy_report/cobertura.xml
flags: mypy
=====================================
.github/workflows/ci.yaml
=====================================
@@ -25,7 +25,7 @@ jobs:
fail-fast: false
matrix:
os: ["ubuntu-latest", "windows-latest"]
- python-version: ["3.9", "3.11"]
+ python-version: ["3.9", "3.12"]
steps:
- uses: actions/checkout at v4
with:
@@ -49,7 +49,7 @@ jobs:
run: |
pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v3.1.4
+ uses: codecov/codecov-action at v4.0.0
with:
file: ./coverage.xml
flags: unittests
@@ -66,7 +66,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- python-version: ["3.11"]
+ python-version: ["3.12"]
env: ["no-xarray", "no-dask"]
include:
- env: "no-numba"
@@ -93,7 +93,7 @@ jobs:
run: |
python -m pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v3.1.4
+ uses: codecov/codecov-action at v4.0.0
with:
file: ./coverage.xml
flags: unittests
=====================================
.github/workflows/testpypi-release.yaml
=====================================
@@ -24,7 +24,7 @@ jobs:
- uses: actions/setup-python at v5
name: Install Python
with:
- python-version: "3.11"
+ python-version: "3.12"
- name: Install dependencies
run: |
@@ -65,7 +65,7 @@ jobs:
- uses: actions/setup-python at v5
name: Install Python
with:
- python-version: "3.11"
+ python-version: "3.12"
- uses: actions/download-artifact at v4
with:
name: releases
=====================================
.github/workflows/upstream-dev-ci.yaml
=====================================
@@ -33,7 +33,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- python-version: ["3.11"]
+ python-version: ["3.12"]
steps:
- uses: actions/checkout at v4
with:
=====================================
ci/environment.yml
=====================================
@@ -19,7 +19,6 @@ dependencies:
- pytest-xdist
- xarray
- pre-commit
- - numbagg>=0.3
- numpy_groupies>=0.9.19
- pooch
- toolz
=====================================
ci/no-dask.yml
=====================================
@@ -19,3 +19,4 @@ dependencies:
- pooch
- toolz
- numba
+ - numbagg>=0.3
=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+flox (0.9.2-1) unstable; urgency=medium
+
+ * New upstream release.
+
+ -- Antonio Valentino <antonio.valentino at tiscali.it> Sat, 10 Feb 2024 10:59:02 +0000
+
flox (0.9.0-1) unstable; urgency=medium
* New upstream release.
=====================================
flox/aggregate_flox.py
=====================================
@@ -2,7 +2,7 @@ from functools import partial
import numpy as np
-from .xrutils import isnull
+from .xrutils import is_scalar, isnull, notnull
def _prepare_for_flox(group_idx, array):
@@ -10,6 +10,7 @@ def _prepare_for_flox(group_idx, array):
Sort the input array once to save time.
"""
assert array.shape[-1] == group_idx.shape[0]
+
issorted = (group_idx[:-1] <= group_idx[1:]).all()
if issorted:
ordered_array = array
@@ -20,7 +21,102 @@ def _prepare_for_flox(group_idx, array):
return group_idx, ordered_array
-def _np_grouped_op(group_idx, array, op, axis=-1, size=None, fill_value=None, dtype=None, out=None):
+def _lerp(a, b, *, t, dtype, out=None):
+ """
+ COPIED from numpy.
+
+ Compute the linear interpolation weighted by gamma on each point of
+ two same shape array.
+
+ a : array_like
+ Left bound.
+ b : array_like
+ Right bound.
+ t : array_like
+ The interpolation weight.
+ """
+ if out is None:
+ out = np.empty_like(a, dtype=dtype)
+ diff_b_a = np.subtract(b, a)
+ # asanyarray is a stop-gap until gh-13105
+ np.add(a, diff_b_a * t, out=out)
+ np.subtract(b, diff_b_a * (1 - t), out=out, where=t >= 0.5)
+ return out
+
+
+def quantile_(array, inv_idx, *, q, axis, skipna, group_idx, dtype=None, out=None):
+ inv_idx = np.concatenate((inv_idx, [array.shape[-1]]))
+
+ array_nanmask = isnull(array)
+ actual_sizes = np.add.reduceat(~array_nanmask, inv_idx[:-1], axis=axis)
+ newshape = (1,) * (array.ndim - 1) + (inv_idx.size - 1,)
+ full_sizes = np.reshape(np.diff(inv_idx), newshape)
+ nanmask = full_sizes != actual_sizes
+
+ # The approach here is to use (complex_array.partition) because
+ # 1. The full np.lexsort((array, labels), axis=-1) is slow and unnecessary
+ # 2. Using record_array.partition(..., order=["labels", "array"]) is incredibly slow.
+ # partition will first sort by real part, then by imaginary part, so it is a two element lex-partition.
+ # So we set
+ # complex_array = group_idx + 1j * array
+ # group_idx is an integer (guaranteed), but array can have NaNs. Now,
+ # 1 + 1j*NaN = NaN + 1j * NaN
+ # so we must replace all NaNs with the maximum array value in the group so these NaNs
+ # get sorted to the end.
+ # Partly inspired by https://krstn.eu/np.nanpercentile()-there-has-to-be-a-faster-way/
+ # TODO: Don't know if this array has been copied in _prepare_for_flox. This is potentially wasteful
+ array = np.where(array_nanmask, -np.inf, array)
+ maxes = np.maximum.reduceat(array, inv_idx[:-1], axis=axis)
+ replacement = np.repeat(maxes, np.diff(inv_idx), axis=axis)
+ array[array_nanmask] = replacement[array_nanmask]
+
+ qin = q
+ q = np.atleast_1d(qin)
+ q = np.reshape(q, (len(q),) + (1,) * array.ndim)
+
+ # This is numpy's method="linear"
+ # TODO: could support all the interpolations here
+ virtual_index = q * (actual_sizes - 1) + inv_idx[:-1]
+
+ is_scalar_q = is_scalar(qin)
+ if is_scalar_q:
+ virtual_index = virtual_index.squeeze(axis=0)
+ idxshape = array.shape[:-1] + (actual_sizes.shape[-1],)
+ else:
+ idxshape = (q.shape[0],) + array.shape[:-1] + (actual_sizes.shape[-1],)
+
+ lo_ = np.floor(
+ virtual_index, casting="unsafe", out=np.empty(virtual_index.shape, dtype=np.int64)
+ )
+ hi_ = np.ceil(
+ virtual_index, casting="unsafe", out=np.empty(virtual_index.shape, dtype=np.int64)
+ )
+ kth = np.unique(np.concatenate([lo_.reshape(-1), hi_.reshape(-1)]))
+
+ # partition the complex array in-place
+ labels_broadcast = np.broadcast_to(group_idx, array.shape)
+ cmplx = labels_broadcast + 1j * array
+ cmplx.partition(kth=kth, axis=-1)
+ if is_scalar_q:
+ a_ = cmplx.imag
+ else:
+ a_ = np.broadcast_to(cmplx.imag, (q.shape[0],) + array.shape)
+
+ # get bounds, Broadcast to (num quantiles, ..., num labels)
+ loval = np.take_along_axis(a_, np.broadcast_to(lo_, idxshape), axis=axis)
+ hival = np.take_along_axis(a_, np.broadcast_to(hi_, idxshape), axis=axis)
+
+ # TODO: could support all the interpolations here
+ gamma = np.broadcast_to(virtual_index, idxshape) - lo_
+ result = _lerp(loval, hival, t=gamma, out=out, dtype=dtype)
+ if not skipna and np.any(nanmask):
+ result[..., nanmask] = np.nan
+ return result
+
+
+def _np_grouped_op(
+ group_idx, array, op, axis=-1, size=None, fill_value=None, dtype=None, out=None, **kwargs
+):
"""
most of this code is from shoyer's gist
https://gist.github.com/shoyer/f538ac78ae904c936844
@@ -38,16 +134,22 @@ def _np_grouped_op(group_idx, array, op, axis=-1, size=None, fill_value=None, dt
dtype = array.dtype
if out is None:
- out = np.full(array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
+ q = kwargs.get("q", None)
+ if q is None:
+ out = np.full(array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
+ else:
+ nq = len(np.atleast_1d(q))
+ out = np.full((nq,) + array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
+ kwargs["group_idx"] = group_idx
if (len(uniques) == size) and (uniques == np.arange(size, like=array)).all():
# The previous version of this if condition
# ((uniques[1:] - uniques[:-1]) == 1).all():
# does not work when group_idx is [1, 2] for e.g.
# This happens during binning
- op.reduceat(array, inv_idx, axis=axis, dtype=dtype, out=out)
+ op(array, inv_idx, axis=axis, dtype=dtype, out=out, **kwargs)
else:
- out[..., uniques] = op.reduceat(array, inv_idx, axis=axis, dtype=dtype)
+ out[..., uniques] = op(array, inv_idx, axis=axis, dtype=dtype, **kwargs)
return out
@@ -65,14 +167,18 @@ def _nan_grouped_op(group_idx, array, func, fillna, *args, **kwargs):
return result
-sum = partial(_np_grouped_op, op=np.add)
+sum = partial(_np_grouped_op, op=np.add.reduceat)
nansum = partial(_nan_grouped_op, func=sum, fillna=0)
-prod = partial(_np_grouped_op, op=np.multiply)
+prod = partial(_np_grouped_op, op=np.multiply.reduceat)
nanprod = partial(_nan_grouped_op, func=prod, fillna=1)
-max = partial(_np_grouped_op, op=np.maximum)
+max = partial(_np_grouped_op, op=np.maximum.reduceat)
nanmax = partial(_nan_grouped_op, func=max, fillna=-np.inf)
-min = partial(_np_grouped_op, op=np.minimum)
+min = partial(_np_grouped_op, op=np.minimum.reduceat)
nanmin = partial(_nan_grouped_op, func=min, fillna=np.inf)
+quantile = partial(_np_grouped_op, op=partial(quantile_, skipna=False))
+nanquantile = partial(_np_grouped_op, op=partial(quantile_, skipna=True))
+median = partial(partial(_np_grouped_op, q=0.5), op=partial(quantile_, skipna=False))
+nanmedian = partial(partial(_np_grouped_op, q=0.5), op=partial(quantile_, skipna=True))
# TODO: all, any
@@ -99,7 +205,7 @@ def nansum_of_squares(group_idx, array, *, axis=-1, size=None, fill_value=None,
def nanlen(group_idx, array, *args, **kwargs):
- return sum(group_idx, (~isnull(array)).astype(int), *args, **kwargs)
+ return sum(group_idx, (notnull(array)).astype(int), *args, **kwargs)
def mean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
=====================================
flox/aggregate_npg.py
=====================================
@@ -8,6 +8,11 @@ def _get_aggregate(engine):
return npg.aggregate_numpy if engine == "numpy" else npg.aggregate_numba
+def _casting_wrapper(func, grp, dtype):
+ """Used for generic aggregates. The group is dtype=object, need to cast back to fix weird bugs"""
+ return func(grp.astype(dtype))
+
+
def sum_of_squares(
group_idx,
array,
@@ -106,7 +111,7 @@ def median(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dty
return npg.aggregate_numpy.aggregate(
group_idx,
array,
- func=np.median,
+ func=partial(_casting_wrapper, np.median, dtype=array.dtype),
axis=axis,
size=size,
fill_value=fill_value,
@@ -118,7 +123,7 @@ def nanmedian(group_idx, array, engine, *, axis=-1, size=None, fill_value=None,
return npg.aggregate_numpy.aggregate(
group_idx,
array,
- func=np.nanmedian,
+ func=partial(_casting_wrapper, np.nanmedian, dtype=array.dtype),
axis=axis,
size=size,
fill_value=fill_value,
@@ -130,7 +135,7 @@ def quantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=None
return npg.aggregate_numpy.aggregate(
group_idx,
array,
- func=partial(np.quantile, q=q),
+ func=partial(_casting_wrapper, partial(np.quantile, q=q), dtype=array.dtype),
axis=axis,
size=size,
fill_value=fill_value,
@@ -142,7 +147,7 @@ def nanquantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=N
return npg.aggregate_numpy.aggregate(
group_idx,
array,
- func=partial(np.nanquantile, q=q),
+ func=partial(_casting_wrapper, partial(np.nanquantile, q=q), dtype=array.dtype),
axis=axis,
size=size,
fill_value=fill_value,
=====================================
flox/aggregate_numbagg.py
=====================================
@@ -3,6 +3,9 @@ from functools import partial
import numbagg
import numbagg.grouped
import numpy as np
+from packaging.version import Version
+
+NUMBAGG_SUPPORTS_DDOF = Version(numbagg.__version__) >= Version("0.7.0")
DEFAULT_FILL_VALUE = {
"nansum": 0,
@@ -42,6 +45,7 @@ def _numbagg_wrapper(
size=None,
fill_value=None,
dtype=None,
+ **kwargs,
):
cast_to = CAST_TO.get(func, None)
if cast_to:
@@ -56,6 +60,7 @@ def _numbagg_wrapper(
group_idx,
axis=axis,
num_labels=size,
+ **kwargs,
# The following are unsupported
# fill_value=fill_value,
# dtype=dtype,
@@ -65,30 +70,36 @@ def _numbagg_wrapper(
def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
- assert ddof != 0
-
+ kwargs = {}
+ if NUMBAGG_SUPPORTS_DDOF:
+ kwargs["ddof"] = ddof
+ elif ddof != 1:
+ raise ValueError("Need numbagg >= v0.7.0 to support ddof != 1")
return _numbagg_wrapper(
group_idx,
array,
axis=axis,
size=size,
func="nanvar",
- # ddof=0,
+ **kwargs,
# fill_value=fill_value,
# dtype=dtype,
)
def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
- assert ddof != 0
-
+ kwargs = {}
+ if NUMBAGG_SUPPORTS_DDOF:
+ kwargs["ddof"] = ddof
+ elif ddof != 1:
+ raise ValueError("Need numbagg >= v0.7.0 to support ddof != 1")
return _numbagg_wrapper(
group_idx,
array,
axis=axis,
size=size,
- func="nanstd"
- # ddof=0,
+ func="nanstd",
+ **kwargs,
# fill_value=fill_value,
# dtype=dtype,
)
=====================================
flox/aggregations.py
=====================================
@@ -1,12 +1,14 @@
from __future__ import annotations
import copy
+import logging
import warnings
-from functools import partial
+from dataclasses import dataclass
+from functools import cached_property, partial
from typing import TYPE_CHECKING, Any, Callable, Literal, TypedDict
import numpy as np
-from numpy.typing import DTypeLike
+from numpy.typing import ArrayLike, DTypeLike
from . import aggregate_flox, aggregate_npg, xrutils
from . import xrdtypes as dtypes
@@ -16,6 +18,9 @@ if TYPE_CHECKING:
OptionalFuncTuple = tuple[Callable | str | None, ...]
+logger = logging.getLogger("flox")
+
+
def _is_arg_reduction(func: str | Aggregation) -> bool:
if isinstance(func, str) and func in ["argmin", "argmax", "nanargmax", "nanargmin"]:
return True
@@ -62,22 +67,25 @@ def generic_aggregate(
try:
method = getattr(aggregate_flox, func)
except AttributeError:
+ logger.debug(f"Couldn't find {func} for engine='flox'. Falling back to numpy")
method = get_npg_aggregation(func, engine="numpy")
elif engine == "numbagg":
from . import aggregate_numbagg
try:
- if (
- # numabgg hardcodes ddof=1
- ("var" in func or "std" in func)
- and kwargs.get("ddof", 0) == 0
- ):
- method = get_npg_aggregation(func, engine="numpy")
-
+ if "var" in func or "std" in func:
+ ddof = kwargs.get("ddof", 0)
+ if aggregate_numbagg.NUMBAGG_SUPPORTS_DDOF or (ddof != 0):
+ method = getattr(aggregate_numbagg, func)
+ else:
+ logger.debug(f"numbagg too old for ddof={ddof}. Falling back to numpy")
+ method = get_npg_aggregation(func, engine="numpy")
else:
method = getattr(aggregate_numbagg, func)
+
except AttributeError:
+ logger.debug(f"Couldn't find {func} for engine='numbagg'. Falling back to numpy")
method = get_npg_aggregation(func, engine="numpy")
elif engine in ["numpy", "numba"]:
@@ -140,6 +148,24 @@ def _atleast_1d(inp, min_length: int = 1):
return inp
+def returns_empty_tuple(*args, **kwargs):
+ return ()
+
+
+ at dataclass
+class Dim:
+ values: ArrayLike
+ name: str | None
+
+ @cached_property
+ def is_scalar(self) -> bool:
+ return xrutils.is_scalar(self.values)
+
+ @cached_property
+ def size(self) -> int:
+ return 0 if self.is_scalar else len(self.values) # type: ignore[arg-type]
+
+
class Aggregation:
def __init__(
self,
@@ -155,6 +181,7 @@ class Aggregation:
dtypes=None,
final_dtype: DTypeLike | None = None,
reduction_type: Literal["reduce", "argreduce"] = "reduce",
+ new_dims_func: Callable | None = None,
):
"""
Blueprint for computing grouped aggregations.
@@ -197,6 +224,10 @@ class Aggregation:
per reduction in ``chunk`` as a tuple.
final_dtype : DType, optional
DType for output. By default, uses dtype of array being reduced.
+ new_dims_func: Callable
+ Function that receives finalize_kwargs and returns a tupleof sizes of any new dimensions
+ added by the reduction. For e.g. quantile for q=(0.5, 0.85) adds a new dimension of size 2,
+ so returns (2,)
"""
self.name = name
# preprocess before blockwise
@@ -230,6 +261,17 @@ class Aggregation:
# The following are set by _initialize_aggregation
self.finalize_kwargs: dict[Any, Any] = {}
self.min_count: int = 0
+ self.new_dims_func: Callable = (
+ returns_empty_tuple if new_dims_func is None else new_dims_func
+ )
+
+ @cached_property
+ def new_dims(self) -> tuple[Dim]:
+ return self.new_dims_func(**self.finalize_kwargs)
+
+ @cached_property
+ def num_new_vector_dims(self) -> int:
+ return len(tuple(dim for dim in self.new_dims if not dim.is_scalar))
def _normalize_dtype_fill_value(self, value, name):
value = _atleast_1d(value)
@@ -487,11 +529,27 @@ median = Aggregation(
nanmedian = Aggregation(
name="nanmedian", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64
)
+
+
+def quantile_new_dims_func(q) -> tuple[Dim]:
+ return (Dim(name="quantile", values=q),)
+
+
quantile = Aggregation(
- name="quantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64
+ name="quantile",
+ fill_value=dtypes.NA,
+ chunk=None,
+ combine=None,
+ final_dtype=np.float64,
+ new_dims_func=quantile_new_dims_func,
)
nanquantile = Aggregation(
- name="nanquantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64
+ name="nanquantile",
+ fill_value=dtypes.NA,
+ chunk=None,
+ combine=None,
+ final_dtype=np.float64,
+ new_dims_func=quantile_new_dims_func,
)
mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None)
nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None)
@@ -600,9 +658,10 @@ def _initialize_aggregation(
# where the identity element is 0, 1
if min_count > 0:
agg.min_count = min_count
- agg.chunk += ("nanlen",)
agg.numpy += ("nanlen",)
- agg.combine += ("sum",)
+ if agg.chunk != (None,):
+ agg.chunk += ("nanlen",)
+ agg.combine += ("sum",)
agg.fill_value["intermediate"] += (0,)
agg.fill_value["numpy"] += (0,)
agg.dtype["intermediate"] += (np.intp,)
=====================================
flox/core.py
=====================================
@@ -34,9 +34,16 @@ from .aggregations import (
_atleast_1d,
_initialize_aggregation,
generic_aggregate,
+ quantile_new_dims_func,
)
from .cache import memoize
-from .xrutils import is_duck_array, is_duck_dask_array, isnull, module_available
+from .xrutils import (
+ is_duck_array,
+ is_duck_dask_array,
+ isnull,
+ module_available,
+ notnull,
+)
if module_available("numpy", minversion="2.0.0"):
from numpy.lib.array_utils import ( # type: ignore[import-not-found]
@@ -156,7 +163,7 @@ def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex:
if is_duck_dask_array(by):
raise ValueError("Please provide expected_groups if not grouping by a numpy array.")
flatby = by.reshape(-1)
- expected = pd.unique(flatby[~isnull(flatby)])
+ expected = pd.unique(flatby[notnull(flatby)])
return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0]
@@ -936,6 +943,7 @@ def chunk_reduce(
assert group_idx.ndim == 1
empty = np.all(props.nanmask)
+ hasnan = np.any(props.nanmask)
results: IntermediateDict = {"groups": [], "intermediates": []}
if reindex and expected_groups is not None:
@@ -990,10 +998,17 @@ def chunk_reduce(
# been "offset" and is really expected_groups in nearly all cases
seen_groups=seen_groups,
)
- if np.any(props.nanmask):
+ if hasnan:
# remove NaN group label which should be last
result = result[..., :-1]
- result = result.reshape(final_array_shape[:-1] + found_groups_shape)
+ # TODO: Figure out how to generalize this
+ if reduction in ("quantile", "nanquantile"):
+ new_dims_shape = tuple(
+ dim.size for dim in quantile_new_dims_func(**kw) if not dim.is_scalar
+ )
+ else:
+ new_dims_shape = tuple()
+ result = result.reshape(new_dims_shape + final_array_shape[:-1] + found_groups_shape)
results["intermediates"].append(result)
previous_reduction = reduction
@@ -1028,7 +1043,7 @@ def _finalize_results(
3. Mask using counts and fill with user-provided fill_value.
4. reindex to expected_groups
"""
- squeezed = _squeeze_results(results, axis)
+ squeezed = _squeeze_results(results, tuple(agg.num_new_vector_dims + ax for ax in axis))
min_count = agg.min_count
if min_count > 0:
@@ -1095,7 +1110,7 @@ def _find_unique_groups(x_chunk) -> np.ndarray:
from dask.utils import deepmap
unique_groups = _unique(np.asarray(tuple(flatten(deepmap(listify_groups, x_chunk)))))
- unique_groups = unique_groups[~isnull(unique_groups)]
+ unique_groups = unique_groups[notnull(unique_groups)]
if len(unique_groups) == 0:
unique_groups = np.array([np.nan])
@@ -1654,8 +1669,13 @@ def dask_groupby_agg(
else:
raise ValueError(f"Unknown method={method}.")
- out_inds = inds[: -len(axis)] + (inds[-1],)
- output_chunks = reduced.chunks[: -len(axis)] + group_chunks
+ # Adjust output for any new dimensions added, example for multiple quantiles
+ new_dims_shape = tuple(dim.size for dim in agg.new_dims if not dim.is_scalar)
+ new_inds = tuple(range(-len(new_dims_shape), 0))
+ out_inds = new_inds + inds[: -len(axis)] + (inds[-1],)
+ output_chunks = new_dims_shape + reduced.chunks[: -len(axis)] + group_chunks
+ new_axes = dict(zip(new_inds, new_dims_shape))
+
if method == "blockwise" and len(axis) > 1:
# The final results are available but the blocks along axes
# need to be reshaped to axis=-1
@@ -1674,6 +1694,7 @@ def dask_groupby_agg(
key=agg.name,
name=f"{name}-{token}",
concatenate=False,
+ new_axes=new_axes,
)
return (result, groups)
@@ -1959,6 +1980,10 @@ def _choose_engine(by, agg: Aggregation):
not_arg_reduce = not _is_arg_reduction(agg)
+ if agg.name in ["quantile", "nanquantile", "median", "nanmedian"]:
+ logger.info(f"_choose_engine: Choosing 'flox' since {agg.name}")
+ return "flox"
+
# numbagg only supports nan-skipping reductions
# without dtype specified
has_blockwise_nan_skipping = (agg.chunk[0] is None and "nan" in agg.name) or any(
@@ -2106,8 +2131,17 @@ def groupby_reduce(
"See https://github.com/numbagg/numbagg/issues/121."
)
- if func == "quantile" and (finalize_kwargs is None or "q" not in finalize_kwargs):
- raise ValueError("Please pass `q` for quantile calculations.")
+ if func in ["quantile", "nanquantile"]:
+ if finalize_kwargs is None or "q" not in finalize_kwargs:
+ raise ValueError("Please pass `q` for quantile calculations.")
+ else:
+ nq = len(_atleast_1d(finalize_kwargs["q"]))
+ if nq > 1 and engine == "numpy":
+ raise ValueError(
+ "Multiple quantiles not supported with engine='numpy'."
+ "Use engine='flox' instead (it is also much faster), "
+ "or set engine=None to use the default."
+ )
bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by)
nby = len(bys)
@@ -2262,6 +2296,20 @@ def groupby_reduce(
# TODO: How else to narrow that array.chunks is there?
assert isinstance(array, DaskArray)
+ if (not any_by_dask and method is None) or method == "cohorts":
+ preferred_method, chunks_cohorts = find_group_cohorts(
+ by_,
+ [array.chunks[ax] for ax in range(-by_.ndim, 0)],
+ expected_groups=expected_,
+ # when provided with cohorts, we *always* 'merge'
+ merge=(method == "cohorts"),
+ )
+ else:
+ preferred_method = "map-reduce"
+ chunks_cohorts = {}
+
+ method = _choose_method(method, preferred_method, agg, by_, nax)
+
if agg.chunk[0] is None and method != "blockwise":
raise NotImplementedError(
f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'."
@@ -2283,19 +2331,6 @@ def groupby_reduce(
f"Received method={method!r}"
)
- if (not any_by_dask and method is None) or method == "cohorts":
- preferred_method, chunks_cohorts = find_group_cohorts(
- by_,
- [array.chunks[ax] for ax in axis_],
- expected_groups=expected_,
- # when provided with cohorts, we *always* 'merge'
- merge=(method == "cohorts"),
- )
- else:
- preferred_method = "map-reduce"
- chunks_cohorts = {}
-
- method = _choose_method(method, preferred_method, agg, by_, nax)
# TODO: clean this up
reindex = _validate_reindex(
reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array)
=====================================
flox/xarray.py
=====================================
@@ -9,7 +9,7 @@ import xarray as xr
from packaging.version import Version
from xarray.core.duck_array_ops import _datetime_nanmin
-from .aggregations import Aggregation, _atleast_1d
+from .aggregations import Aggregation, Dim, _atleast_1d, quantile_new_dims_func
from .core import (
_convert_expected_groups_to_index,
_get_expected_groups,
@@ -74,7 +74,7 @@ def xarray_reduce(
dim: Dims | ellipsis = None,
fill_value=None,
dtype: np.typing.DTypeLike = None,
- method: str = "map-reduce",
+ method: str | None = None,
engine: str | None = None,
keep_attrs: bool | None = True,
skipna: bool | None = None,
@@ -387,6 +387,17 @@ def xarray_reduce(
result, *groups = groupby_reduce(array, *by, func=func, **kwargs)
+ # Transpose the new quantile dimension to the end. This is ugly.
+ # but new core dimensions are expected at the end :/
+ # but groupby_reduce inserts them at the beginning
+ if func in ["quantile", "nanquantile"]:
+ (newdim,) = quantile_new_dims_func(**finalize_kwargs)
+ if not newdim.is_scalar:
+ # NOTE: _restore_dim_order will move any new dims to the end anyway.
+ # This transpose is simply makes it easy to specify output_core_dims
+ # output dim order: (*broadcast_dims, *group_dims, quantile_dim)
+ result = np.moveaxis(result, 0, -1)
+
# Output of count has an int dtype.
if requires_numeric and func != "count":
if is_npdatetime:
@@ -412,8 +423,18 @@ def xarray_reduce(
input_core_dims = [[d for d in grouper_dims if d not in dim_tuple] + list(dim_tuple)]
input_core_dims += [list(b.dims) for b in by_da]
+ newdims: tuple[Dim, ...] = (
+ quantile_new_dims_func(**finalize_kwargs) if func in ["quantile", "nanquantile"] else ()
+ )
+
output_core_dims = [d for d in input_core_dims[0] if d not in dim_tuple]
output_core_dims.extend(group_names)
+ vector_dims = [dim.name for dim in newdims if not dim.is_scalar]
+ output_core_dims.extend(vector_dims)
+
+ output_sizes = group_sizes
+ output_sizes.update({dim.name: dim.size for dim in newdims if dim.size != 0})
+
actual = xr.apply_ufunc(
wrapper,
ds_broad.drop_vars(tuple(missing_dim)).transpose(..., *grouper_dims),
@@ -424,7 +445,7 @@ def xarray_reduce(
output_core_dims=[output_core_dims],
dask="allowed",
dask_gufunc_kwargs=dict(
- output_sizes=group_sizes, output_dtypes=[dtype] if dtype is not None else None
+ output_sizes=output_sizes, output_dtypes=[dtype] if dtype is not None else None
),
keep_attrs=keep_attrs,
kwargs={
@@ -451,6 +472,9 @@ def xarray_reduce(
if all(d not in ds_broad[var].dims for d in dim_tuple):
actual[var] = ds_broad[var]
+ for newdim in newdims:
+ actual.coords[newdim.name] = newdim.values if newdim.is_scalar else np.array(newdim.values)
+
expect3: T_ExpectIndex | np.ndarray
for name, expect2, by_ in zip(group_names, expected_groups_valid_list, by_da):
# Can't remove this until xarray handles IntervalIndex:
@@ -492,7 +516,7 @@ def xarray_reduce(
else:
template = obj
- if actual[var].ndim > 1:
+ if actual[var].ndim > 1 + len(vector_dims):
no_groupby_reorder = isinstance(
obj, xr.Dataset
) # do not re-order dataarrays inside datasets
=====================================
flox/xrutils.py
=====================================
@@ -100,6 +100,20 @@ def is_scalar(value: Any, include_0d: bool = True) -> bool:
)
+def notnull(data):
+ if not is_duck_array(data):
+ data = np.asarray(data)
+
+ scalar_type = data.dtype.type
+ if issubclass(scalar_type, (np.bool_, np.integer, np.character, np.void)):
+ # these types cannot represent missing values
+ return np.ones_like(data, dtype=bool)
+ else:
+ out = isnull(data)
+ np.logical_not(out, out=out)
+ return out
+
+
def isnull(data):
if not is_duck_array(data):
data = np.asarray(data)
=====================================
tests/test_core.py
=====================================
@@ -254,14 +254,19 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
fill_value = np.nan
tolerance = {"rtol": 1e-14, "atol": 1e-16}
elif "quantile" in func:
- finalize_kwargs = [{"q": DEFAULT_QUANTILE}]
+ finalize_kwargs = [{"q": DEFAULT_QUANTILE}, {"q": [DEFAULT_QUANTILE / 2, DEFAULT_QUANTILE]}]
fill_value = None
tolerance = None
else:
fill_value = None
tolerance = None
+ # for constructing expected
+ array_func = _get_array_func(func)
+
for kwargs in finalize_kwargs:
+ if "quantile" in func and isinstance(kwargs["q"], list) and engine != "flox":
+ continue
flox_kwargs = dict(func=func, engine=engine, finalize_kwargs=kwargs, fill_value=fill_value)
with np.errstate(invalid="ignore", divide="ignore"):
with warnings.catch_warnings():
@@ -280,18 +285,19 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
array_[..., nanmask] = np.nan
expected = getattr(np, func_)(array_, axis=-1, **kwargs)
else:
- array_func = _get_array_func(func)
expected = array_func(array_[..., ~nanmask], axis=-1, **kwargs)
for _ in range(nby):
expected = np.expand_dims(expected, -1)
if func in BLOCKWISE_FUNCS:
assert chunks == -1
- flox_kwargs["method"] = "blockwise"
actual, *groups = groupby_reduce(array, *by, **flox_kwargs)
- assert actual.ndim == (array.ndim + nby - 1)
- assert expected.ndim == (array.ndim + nby - 1)
+ if "quantile" in func and isinstance(kwargs["q"], list):
+ assert actual.ndim == expected.ndim == (array.ndim + nby)
+ else:
+ assert actual.ndim == expected.ndim == (array.ndim + nby - 1)
+
expected_groups = tuple(np.array([idx + 1.0]) for idx in range(nby))
for actual_group, expect in zip(groups, expected_groups):
assert_equal(actual_group, expect)
@@ -299,6 +305,20 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
assert actual.dtype.kind == "i"
assert_equal(expected, actual, tolerance)
+ if "nan" not in func and "arg" not in func:
+ # test non-NaN skipping behaviour when NaNs are present
+ nanned = array_.copy()
+ # remove nans in by to reduce complexity
+ # We are checking for consistent behaviour with NaNs in array
+ by_ = tuple(np.nan_to_num(b, nan=np.nanmin(b)) for b in by)
+ nanned[[1, 4, 5], ...] = np.nan
+ nanned.reshape(-1)[0] = np.nan
+ actual, *_ = groupby_reduce(nanned, *by_, **flox_kwargs)
+ expected_0 = array_func(nanned, axis=-1, **kwargs)
+ for _ in range(nby):
+ expected_0 = np.expand_dims(expected_0, -1)
+ assert_equal(expected_0, actual, tolerance)
+
if not has_dask or chunks is None or func in BLOCKWISE_FUNCS:
continue
@@ -583,10 +603,25 @@ def test_nanfirst_nanlast_disallowed_dask(axis, func):
@requires_dask
+ at pytest.mark.xfail
+ at pytest.mark.parametrize("func", ["first", "last"])
+def test_first_last_allowed_dask(func):
+ # blockwise should be fine... but doesn't work now.
+ groupby_reduce(dask.array.empty((2, 3, 2)), np.ones((2, 3, 2)), func=func, axis=-1)
+
+
+ at requires_dask
+ at pytest.mark.xfail
@pytest.mark.parametrize("func", ["first", "last"])
def test_first_last_disallowed_dask(func):
- with pytest.raises(NotImplementedError):
- groupby_reduce(dask.array.empty((2, 3, 2)), np.ones((2, 3, 2)), func=func, axis=-1)
+ # blockwise is fine
+ groupby_reduce(dask.array.empty((2, 3, 2)), np.ones((2, 3, 2)), func=func, axis=-1)
+
+ # anything else is not.
+ with pytest.raises(ValueError):
+ groupby_reduce(
+ dask.array.empty((2, 3, 2), chunks=(-1, -1, 1)), np.ones((2,)), func=func, axis=-1
+ )
@requires_dask
@@ -1655,3 +1690,27 @@ def test_xarray_fill_value_behaviour():
)
# fmt: on
assert_equal(expected, actual)
+
+
+ at pytest.mark.parametrize("q", (0.5, (0.5,), (0.5, 0.67, 0.85)))
+ at pytest.mark.parametrize("func", ["nanquantile", "quantile"])
+ at pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False])
+ at pytest.mark.parametrize("by_ndim", [1, 2])
+def test_multiple_quantiles(q, chunk, func, by_ndim):
+ array = np.array([[1, -1, np.nan, 3, 4, 10, 5], [1, np.nan, np.nan, 3, 4, np.nan, np.nan]])
+ labels = np.array([0, 0, 0, 1, 0, 1, 1])
+ if by_ndim == 2:
+ labels = np.broadcast_to(labels, (5, *labels.shape))
+ array = np.broadcast_to(np.expand_dims(array, -2), (2, 5, array.shape[-1]))
+ axis = tuple(range(-by_ndim, 0))
+
+ if chunk:
+ array = dask.array.from_array(array, chunks=(1,) + (-1,) * by_ndim)
+
+ actual, _ = groupby_reduce(array, labels, func=func, finalize_kwargs=dict(q=q), axis=axis)
+ sorted_array = array[..., [0, 1, 2, 4, 3, 5, 6]]
+ f = partial(getattr(np, func), q=q, axis=axis, keepdims=True)
+ expected = np.concatenate((f(sorted_array[..., :4]), f(sorted_array[..., 4:])), axis=-1)
+ if by_ndim == 2:
+ expected = expected.squeeze(axis=-2)
+ assert_equal(expected, actual, tolerance=1e-14)
=====================================
tests/test_xarray.py
=====================================
@@ -421,7 +421,7 @@ def test_groupby_bins_indexed_coordinate(method):
)
},
coords={
- "time": pd.date_range("2013-01-01", "2013-02-01", freq="6H"),
+ "time": pd.date_range("2013-01-01", "2013-02-01", freq="6h"),
"lat": np.arange(75.0, 14.9, -2.5),
"lon": np.arange(200.0, 331.0, 2.5),
},
@@ -533,7 +533,7 @@ def test_dtype(add_nan, chunk, dtype, dtype_out, engine):
@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False])
@pytest.mark.parametrize("use_flox", [True, False])
def test_dtype_accumulation(use_flox, chunk):
- datetimes = pd.date_range("2010-01", "2015-01", freq="6H", inclusive="left")
+ datetimes = pd.date_range("2010-01", "2015-01", freq="6h", inclusive="left")
samples = 10 + np.cos(2 * np.pi * 0.001 * np.arange(len(datetimes))) * 1
samples += np.random.randn(len(datetimes))
samples = samples.astype("float32")
@@ -593,7 +593,7 @@ def test_preserve_multiindex():
def test_fill_value_xarray_behaviour():
- times = pd.date_range("2000-01-01", freq="6H", periods=10)
+ times = pd.date_range("2000-01-01", freq="6h", periods=10)
ds = xr.Dataset(
{
"bar": (
@@ -605,11 +605,11 @@ def test_fill_value_xarray_behaviour():
}
)
- pd.date_range("2000-01-01", freq="3H", periods=19)
+ pd.date_range("2000-01-01", freq="3h", periods=19)
with xr.set_options(use_flox=False):
- expected = ds.resample(time="3H").sum()
+ expected = ds.resample(time="3h").sum()
with xr.set_options(use_flox=True):
- actual = ds.resample(time="3H").sum()
+ actual = ds.resample(time="3h").sum()
xr.testing.assert_identical(expected, actual)
@@ -685,3 +685,28 @@ def test_resampling_missing_groups(chunk):
with xr.set_options(use_flox=True):
actual = da.resample(time="1D").mean()
xr.testing.assert_identical(expected, actual)
+
+
+ at pytest.mark.parametrize("q", (0.5, (0.5,), (0.5, 0.67, 0.85)))
+ at pytest.mark.parametrize("skipna", [False, True])
+ at pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False])
+ at pytest.mark.parametrize("by_ndim", [1, 2])
+def test_multiple_quantiles(q, chunk, by_ndim, skipna):
+ array = np.array([[1, -1, np.nan, 3, 4, 10, 5], [1, np.nan, np.nan, 3, 4, np.nan, np.nan]])
+ labels = np.array([0, 0, 0, 1, 0, 1, 1])
+ dims = ("y",)
+ if by_ndim == 2:
+ labels = np.broadcast_to(labels, (5, *labels.shape))
+ array = np.broadcast_to(np.expand_dims(array, -2), (2, 5, array.shape[-1]))
+ dims += ("y0",)
+
+ if chunk:
+ array = dask.array.from_array(array, chunks=(1,) + (-1,) * by_ndim)
+
+ da = xr.DataArray(array, dims=("x", *dims))
+ by = xr.DataArray(labels, dims=dims, name="by")
+
+ actual = xarray_reduce(da, by, func="quantile", skipna=skipna, q=q)
+ with xr.set_options(use_flox=False):
+ expected = da.groupby(by).quantile(q, skipna=skipna)
+ xr.testing.assert_allclose(expected, actual)
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/c4d83b02aa4e775453ef623cc05ce6a464c2327b...6d6a236512820a0d498e8d70a861e449541e873a
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/c4d83b02aa4e775453ef623cc05ce6a464c2327b...6d6a236512820a0d498e8d70a861e449541e873a
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240210/acede955/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list