[Git][debian-gis-team/flox][master] 4 commits: New upstream version 0.9.2

Antonio Valentino (@antonio.valentino) gitlab at salsa.debian.org
Sat Feb 10 11:48:24 GMT 2024



Antonio Valentino pushed to branch master at Debian GIS Project / flox


Commits:
54cced1c by Antonio Valentino at 2024-02-10T10:55:40+00:00
New upstream version 0.9.2
- - - - -
bee6cf4c by Antonio Valentino at 2024-02-10T10:55:41+00:00
Update upstream source from tag 'upstream/0.9.2'

Update to upstream version '0.9.2'
with Debian dir 41e546e0f9340ad2944402d47a2f6fdb357e5dd2
- - - - -
459a453f by Antonio Valentino at 2024-02-10T10:56:25+00:00
New upstream release

- - - - -
6d6a2365 by Antonio Valentino at 2024-02-10T10:59:17+00:00
Set distribution to unstable

- - - - -


16 changed files:

- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .github/workflows/testpypi-release.yaml
- .github/workflows/upstream-dev-ci.yaml
- ci/environment.yml
- ci/no-dask.yml
- debian/changelog
- flox/aggregate_flox.py
- flox/aggregate_npg.py
- flox/aggregate_numbagg.py
- flox/aggregations.py
- flox/core.py
- flox/xarray.py
- flox/xrutils.py
- tests/test_core.py
- tests/test_xarray.py


Changes:

=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -41,7 +41,7 @@ jobs:
 
     env:
       CONDA_ENV_FILE: ci/environment.yml
-      PYTHON_VERSION: "3.11"
+      PYTHON_VERSION: "3.12"
 
     steps:
       - uses: actions/checkout at v4
@@ -77,7 +77,7 @@ jobs:
           --ignore flox/tests \
           --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v3.1.4
+        uses: codecov/codecov-action at v4.0.0
         with:
           file: ./coverage.xml
           flags: unittests
@@ -95,7 +95,7 @@ jobs:
         shell: bash -l {0}
     env:
       CONDA_ENV_FILE: ci/environment.yml
-      PYTHON_VERSION: "3.11"
+      PYTHON_VERSION: "3.12"
 
     steps:
       - uses: actions/checkout at v4
@@ -131,7 +131,7 @@ jobs:
           python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
 
       - name: Upload mypy coverage to Codecov
-        uses: codecov/codecov-action at v3.1.4
+        uses: codecov/codecov-action at v4.0.0
         with:
           file: mypy_report/cobertura.xml
           flags: mypy


=====================================
.github/workflows/ci.yaml
=====================================
@@ -25,7 +25,7 @@ jobs:
       fail-fast: false
       matrix:
         os: ["ubuntu-latest", "windows-latest"]
-        python-version: ["3.9", "3.11"]
+        python-version: ["3.9", "3.12"]
     steps:
       - uses: actions/checkout at v4
         with:
@@ -49,7 +49,7 @@ jobs:
         run: |
           pytest -n auto --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v3.1.4
+        uses: codecov/codecov-action at v4.0.0
         with:
           file: ./coverage.xml
           flags: unittests
@@ -66,7 +66,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        python-version: ["3.11"]
+        python-version: ["3.12"]
         env: ["no-xarray", "no-dask"]
         include:
           - env: "no-numba"
@@ -93,7 +93,7 @@ jobs:
         run: |
           python -m pytest -n auto --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v3.1.4
+        uses: codecov/codecov-action at v4.0.0
         with:
           file: ./coverage.xml
           flags: unittests


=====================================
.github/workflows/testpypi-release.yaml
=====================================
@@ -24,7 +24,7 @@ jobs:
       - uses: actions/setup-python at v5
         name: Install Python
         with:
-          python-version: "3.11"
+          python-version: "3.12"
 
       - name: Install dependencies
         run: |
@@ -65,7 +65,7 @@ jobs:
       - uses: actions/setup-python at v5
         name: Install Python
         with:
-          python-version: "3.11"
+          python-version: "3.12"
       - uses: actions/download-artifact at v4
         with:
           name: releases


=====================================
.github/workflows/upstream-dev-ci.yaml
=====================================
@@ -33,7 +33,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        python-version: ["3.11"]
+        python-version: ["3.12"]
     steps:
       - uses: actions/checkout at v4
         with:


=====================================
ci/environment.yml
=====================================
@@ -19,7 +19,6 @@ dependencies:
   - pytest-xdist
   - xarray
   - pre-commit
-  - numbagg>=0.3
   - numpy_groupies>=0.9.19
   - pooch
   - toolz


=====================================
ci/no-dask.yml
=====================================
@@ -19,3 +19,4 @@ dependencies:
   - pooch
   - toolz
   - numba
+  - numbagg>=0.3


=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+flox (0.9.2-1) unstable; urgency=medium
+
+  * New upstream release.
+
+ -- Antonio Valentino <antonio.valentino at tiscali.it>  Sat, 10 Feb 2024 10:59:02 +0000
+
 flox (0.9.0-1) unstable; urgency=medium
 
   * New upstream release.


=====================================
flox/aggregate_flox.py
=====================================
@@ -2,7 +2,7 @@ from functools import partial
 
 import numpy as np
 
-from .xrutils import isnull
+from .xrutils import is_scalar, isnull, notnull
 
 
 def _prepare_for_flox(group_idx, array):
@@ -10,6 +10,7 @@ def _prepare_for_flox(group_idx, array):
     Sort the input array once to save time.
     """
     assert array.shape[-1] == group_idx.shape[0]
+
     issorted = (group_idx[:-1] <= group_idx[1:]).all()
     if issorted:
         ordered_array = array
@@ -20,7 +21,102 @@ def _prepare_for_flox(group_idx, array):
     return group_idx, ordered_array
 
 
-def _np_grouped_op(group_idx, array, op, axis=-1, size=None, fill_value=None, dtype=None, out=None):
+def _lerp(a, b, *, t, dtype, out=None):
+    """
+    COPIED from numpy.
+
+    Compute the linear interpolation weighted by gamma on each point of
+    two same shape array.
+
+    a : array_like
+        Left bound.
+    b : array_like
+        Right bound.
+    t : array_like
+        The interpolation weight.
+    """
+    if out is None:
+        out = np.empty_like(a, dtype=dtype)
+    diff_b_a = np.subtract(b, a)
+    # asanyarray is a stop-gap until gh-13105
+    np.add(a, diff_b_a * t, out=out)
+    np.subtract(b, diff_b_a * (1 - t), out=out, where=t >= 0.5)
+    return out
+
+
+def quantile_(array, inv_idx, *, q, axis, skipna, group_idx, dtype=None, out=None):
+    inv_idx = np.concatenate((inv_idx, [array.shape[-1]]))
+
+    array_nanmask = isnull(array)
+    actual_sizes = np.add.reduceat(~array_nanmask, inv_idx[:-1], axis=axis)
+    newshape = (1,) * (array.ndim - 1) + (inv_idx.size - 1,)
+    full_sizes = np.reshape(np.diff(inv_idx), newshape)
+    nanmask = full_sizes != actual_sizes
+
+    # The approach here is to use (complex_array.partition) because
+    # 1. The full np.lexsort((array, labels), axis=-1) is slow and unnecessary
+    # 2. Using record_array.partition(..., order=["labels", "array"]) is incredibly slow.
+    # partition will first sort by real part, then by imaginary part, so it is a two element lex-partition.
+    # So we set
+    #     complex_array = group_idx + 1j * array
+    # group_idx is an integer (guaranteed), but array can have NaNs. Now,
+    #     1 + 1j*NaN = NaN + 1j * NaN
+    # so we must replace all NaNs with the maximum array value in the group so these NaNs
+    # get sorted to the end.
+    # Partly inspired by https://krstn.eu/np.nanpercentile()-there-has-to-be-a-faster-way/
+    # TODO: Don't know if this array has been copied in _prepare_for_flox. This is potentially wasteful
+    array = np.where(array_nanmask, -np.inf, array)
+    maxes = np.maximum.reduceat(array, inv_idx[:-1], axis=axis)
+    replacement = np.repeat(maxes, np.diff(inv_idx), axis=axis)
+    array[array_nanmask] = replacement[array_nanmask]
+
+    qin = q
+    q = np.atleast_1d(qin)
+    q = np.reshape(q, (len(q),) + (1,) * array.ndim)
+
+    # This is numpy's method="linear"
+    # TODO: could support all the interpolations here
+    virtual_index = q * (actual_sizes - 1) + inv_idx[:-1]
+
+    is_scalar_q = is_scalar(qin)
+    if is_scalar_q:
+        virtual_index = virtual_index.squeeze(axis=0)
+        idxshape = array.shape[:-1] + (actual_sizes.shape[-1],)
+    else:
+        idxshape = (q.shape[0],) + array.shape[:-1] + (actual_sizes.shape[-1],)
+
+    lo_ = np.floor(
+        virtual_index, casting="unsafe", out=np.empty(virtual_index.shape, dtype=np.int64)
+    )
+    hi_ = np.ceil(
+        virtual_index, casting="unsafe", out=np.empty(virtual_index.shape, dtype=np.int64)
+    )
+    kth = np.unique(np.concatenate([lo_.reshape(-1), hi_.reshape(-1)]))
+
+    # partition the complex array in-place
+    labels_broadcast = np.broadcast_to(group_idx, array.shape)
+    cmplx = labels_broadcast + 1j * array
+    cmplx.partition(kth=kth, axis=-1)
+    if is_scalar_q:
+        a_ = cmplx.imag
+    else:
+        a_ = np.broadcast_to(cmplx.imag, (q.shape[0],) + array.shape)
+
+    # get bounds, Broadcast to (num quantiles, ..., num labels)
+    loval = np.take_along_axis(a_, np.broadcast_to(lo_, idxshape), axis=axis)
+    hival = np.take_along_axis(a_, np.broadcast_to(hi_, idxshape), axis=axis)
+
+    # TODO: could support all the interpolations here
+    gamma = np.broadcast_to(virtual_index, idxshape) - lo_
+    result = _lerp(loval, hival, t=gamma, out=out, dtype=dtype)
+    if not skipna and np.any(nanmask):
+        result[..., nanmask] = np.nan
+    return result
+
+
+def _np_grouped_op(
+    group_idx, array, op, axis=-1, size=None, fill_value=None, dtype=None, out=None, **kwargs
+):
     """
     most of this code is from shoyer's gist
     https://gist.github.com/shoyer/f538ac78ae904c936844
@@ -38,16 +134,22 @@ def _np_grouped_op(group_idx, array, op, axis=-1, size=None, fill_value=None, dt
         dtype = array.dtype
 
     if out is None:
-        out = np.full(array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
+        q = kwargs.get("q", None)
+        if q is None:
+            out = np.full(array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
+        else:
+            nq = len(np.atleast_1d(q))
+            out = np.full((nq,) + array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype)
+            kwargs["group_idx"] = group_idx
 
     if (len(uniques) == size) and (uniques == np.arange(size, like=array)).all():
         # The previous version of this if condition
         #     ((uniques[1:] - uniques[:-1]) == 1).all():
         # does not work when group_idx is [1, 2] for e.g.
         # This happens during binning
-        op.reduceat(array, inv_idx, axis=axis, dtype=dtype, out=out)
+        op(array, inv_idx, axis=axis, dtype=dtype, out=out, **kwargs)
     else:
-        out[..., uniques] = op.reduceat(array, inv_idx, axis=axis, dtype=dtype)
+        out[..., uniques] = op(array, inv_idx, axis=axis, dtype=dtype, **kwargs)
 
     return out
 
@@ -65,14 +167,18 @@ def _nan_grouped_op(group_idx, array, func, fillna, *args, **kwargs):
     return result
 
 
-sum = partial(_np_grouped_op, op=np.add)
+sum = partial(_np_grouped_op, op=np.add.reduceat)
 nansum = partial(_nan_grouped_op, func=sum, fillna=0)
-prod = partial(_np_grouped_op, op=np.multiply)
+prod = partial(_np_grouped_op, op=np.multiply.reduceat)
 nanprod = partial(_nan_grouped_op, func=prod, fillna=1)
-max = partial(_np_grouped_op, op=np.maximum)
+max = partial(_np_grouped_op, op=np.maximum.reduceat)
 nanmax = partial(_nan_grouped_op, func=max, fillna=-np.inf)
-min = partial(_np_grouped_op, op=np.minimum)
+min = partial(_np_grouped_op, op=np.minimum.reduceat)
 nanmin = partial(_nan_grouped_op, func=min, fillna=np.inf)
+quantile = partial(_np_grouped_op, op=partial(quantile_, skipna=False))
+nanquantile = partial(_np_grouped_op, op=partial(quantile_, skipna=True))
+median = partial(partial(_np_grouped_op, q=0.5), op=partial(quantile_, skipna=False))
+nanmedian = partial(partial(_np_grouped_op, q=0.5), op=partial(quantile_, skipna=True))
 # TODO: all, any
 
 
@@ -99,7 +205,7 @@ def nansum_of_squares(group_idx, array, *, axis=-1, size=None, fill_value=None,
 
 
 def nanlen(group_idx, array, *args, **kwargs):
-    return sum(group_idx, (~isnull(array)).astype(int), *args, **kwargs)
+    return sum(group_idx, (notnull(array)).astype(int), *args, **kwargs)
 
 
 def mean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):


=====================================
flox/aggregate_npg.py
=====================================
@@ -8,6 +8,11 @@ def _get_aggregate(engine):
     return npg.aggregate_numpy if engine == "numpy" else npg.aggregate_numba
 
 
+def _casting_wrapper(func, grp, dtype):
+    """Used for generic aggregates. The group is dtype=object, need to cast back to fix weird bugs"""
+    return func(grp.astype(dtype))
+
+
 def sum_of_squares(
     group_idx,
     array,
@@ -106,7 +111,7 @@ def median(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dty
     return npg.aggregate_numpy.aggregate(
         group_idx,
         array,
-        func=np.median,
+        func=partial(_casting_wrapper, np.median, dtype=array.dtype),
         axis=axis,
         size=size,
         fill_value=fill_value,
@@ -118,7 +123,7 @@ def nanmedian(group_idx, array, engine, *, axis=-1, size=None, fill_value=None,
     return npg.aggregate_numpy.aggregate(
         group_idx,
         array,
-        func=np.nanmedian,
+        func=partial(_casting_wrapper, np.nanmedian, dtype=array.dtype),
         axis=axis,
         size=size,
         fill_value=fill_value,
@@ -130,7 +135,7 @@ def quantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=None
     return npg.aggregate_numpy.aggregate(
         group_idx,
         array,
-        func=partial(np.quantile, q=q),
+        func=partial(_casting_wrapper, partial(np.quantile, q=q), dtype=array.dtype),
         axis=axis,
         size=size,
         fill_value=fill_value,
@@ -142,7 +147,7 @@ def nanquantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=N
     return npg.aggregate_numpy.aggregate(
         group_idx,
         array,
-        func=partial(np.nanquantile, q=q),
+        func=partial(_casting_wrapper, partial(np.nanquantile, q=q), dtype=array.dtype),
         axis=axis,
         size=size,
         fill_value=fill_value,


=====================================
flox/aggregate_numbagg.py
=====================================
@@ -3,6 +3,9 @@ from functools import partial
 import numbagg
 import numbagg.grouped
 import numpy as np
+from packaging.version import Version
+
+NUMBAGG_SUPPORTS_DDOF = Version(numbagg.__version__) >= Version("0.7.0")
 
 DEFAULT_FILL_VALUE = {
     "nansum": 0,
@@ -42,6 +45,7 @@ def _numbagg_wrapper(
     size=None,
     fill_value=None,
     dtype=None,
+    **kwargs,
 ):
     cast_to = CAST_TO.get(func, None)
     if cast_to:
@@ -56,6 +60,7 @@ def _numbagg_wrapper(
         group_idx,
         axis=axis,
         num_labels=size,
+        **kwargs,
         # The following are unsupported
         # fill_value=fill_value,
         # dtype=dtype,
@@ -65,30 +70,36 @@ def _numbagg_wrapper(
 
 
 def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
-    assert ddof != 0
-
+    kwargs = {}
+    if NUMBAGG_SUPPORTS_DDOF:
+        kwargs["ddof"] = ddof
+    elif ddof != 1:
+        raise ValueError("Need numbagg >= v0.7.0 to support ddof != 1")
     return _numbagg_wrapper(
         group_idx,
         array,
         axis=axis,
         size=size,
         func="nanvar",
-        # ddof=0,
+        **kwargs,
         # fill_value=fill_value,
         # dtype=dtype,
     )
 
 
 def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
-    assert ddof != 0
-
+    kwargs = {}
+    if NUMBAGG_SUPPORTS_DDOF:
+        kwargs["ddof"] = ddof
+    elif ddof != 1:
+        raise ValueError("Need numbagg >= v0.7.0 to support ddof != 1")
     return _numbagg_wrapper(
         group_idx,
         array,
         axis=axis,
         size=size,
-        func="nanstd"
-        # ddof=0,
+        func="nanstd",
+        **kwargs,
         # fill_value=fill_value,
         # dtype=dtype,
     )


=====================================
flox/aggregations.py
=====================================
@@ -1,12 +1,14 @@
 from __future__ import annotations
 
 import copy
+import logging
 import warnings
-from functools import partial
+from dataclasses import dataclass
+from functools import cached_property, partial
 from typing import TYPE_CHECKING, Any, Callable, Literal, TypedDict
 
 import numpy as np
-from numpy.typing import DTypeLike
+from numpy.typing import ArrayLike, DTypeLike
 
 from . import aggregate_flox, aggregate_npg, xrutils
 from . import xrdtypes as dtypes
@@ -16,6 +18,9 @@ if TYPE_CHECKING:
     OptionalFuncTuple = tuple[Callable | str | None, ...]
 
 
+logger = logging.getLogger("flox")
+
+
 def _is_arg_reduction(func: str | Aggregation) -> bool:
     if isinstance(func, str) and func in ["argmin", "argmax", "nanargmax", "nanargmin"]:
         return True
@@ -62,22 +67,25 @@ def generic_aggregate(
         try:
             method = getattr(aggregate_flox, func)
         except AttributeError:
+            logger.debug(f"Couldn't find {func} for engine='flox'. Falling back to numpy")
             method = get_npg_aggregation(func, engine="numpy")
 
     elif engine == "numbagg":
         from . import aggregate_numbagg
 
         try:
-            if (
-                # numabgg hardcodes ddof=1
-                ("var" in func or "std" in func)
-                and kwargs.get("ddof", 0) == 0
-            ):
-                method = get_npg_aggregation(func, engine="numpy")
-
+            if "var" in func or "std" in func:
+                ddof = kwargs.get("ddof", 0)
+                if aggregate_numbagg.NUMBAGG_SUPPORTS_DDOF or (ddof != 0):
+                    method = getattr(aggregate_numbagg, func)
+                else:
+                    logger.debug(f"numbagg too old for ddof={ddof}. Falling back to numpy")
+                    method = get_npg_aggregation(func, engine="numpy")
             else:
                 method = getattr(aggregate_numbagg, func)
+
         except AttributeError:
+            logger.debug(f"Couldn't find {func} for engine='numbagg'. Falling back to numpy")
             method = get_npg_aggregation(func, engine="numpy")
 
     elif engine in ["numpy", "numba"]:
@@ -140,6 +148,24 @@ def _atleast_1d(inp, min_length: int = 1):
     return inp
 
 
+def returns_empty_tuple(*args, **kwargs):
+    return ()
+
+
+ at dataclass
+class Dim:
+    values: ArrayLike
+    name: str | None
+
+    @cached_property
+    def is_scalar(self) -> bool:
+        return xrutils.is_scalar(self.values)
+
+    @cached_property
+    def size(self) -> int:
+        return 0 if self.is_scalar else len(self.values)  # type: ignore[arg-type]
+
+
 class Aggregation:
     def __init__(
         self,
@@ -155,6 +181,7 @@ class Aggregation:
         dtypes=None,
         final_dtype: DTypeLike | None = None,
         reduction_type: Literal["reduce", "argreduce"] = "reduce",
+        new_dims_func: Callable | None = None,
     ):
         """
         Blueprint for computing grouped aggregations.
@@ -197,6 +224,10 @@ class Aggregation:
             per reduction in ``chunk`` as a tuple.
         final_dtype : DType, optional
             DType for output. By default, uses dtype of array being reduced.
+        new_dims_func: Callable
+            Function that receives finalize_kwargs and returns a tupleof sizes of any new dimensions
+            added by the reduction. For e.g. quantile for q=(0.5, 0.85) adds a new dimension of size 2,
+            so returns (2,)
         """
         self.name = name
         # preprocess before blockwise
@@ -230,6 +261,17 @@ class Aggregation:
         # The following are set by _initialize_aggregation
         self.finalize_kwargs: dict[Any, Any] = {}
         self.min_count: int = 0
+        self.new_dims_func: Callable = (
+            returns_empty_tuple if new_dims_func is None else new_dims_func
+        )
+
+    @cached_property
+    def new_dims(self) -> tuple[Dim]:
+        return self.new_dims_func(**self.finalize_kwargs)
+
+    @cached_property
+    def num_new_vector_dims(self) -> int:
+        return len(tuple(dim for dim in self.new_dims if not dim.is_scalar))
 
     def _normalize_dtype_fill_value(self, value, name):
         value = _atleast_1d(value)
@@ -487,11 +529,27 @@ median = Aggregation(
 nanmedian = Aggregation(
     name="nanmedian", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64
 )
+
+
+def quantile_new_dims_func(q) -> tuple[Dim]:
+    return (Dim(name="quantile", values=q),)
+
+
 quantile = Aggregation(
-    name="quantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64
+    name="quantile",
+    fill_value=dtypes.NA,
+    chunk=None,
+    combine=None,
+    final_dtype=np.float64,
+    new_dims_func=quantile_new_dims_func,
 )
 nanquantile = Aggregation(
-    name="nanquantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64
+    name="nanquantile",
+    fill_value=dtypes.NA,
+    chunk=None,
+    combine=None,
+    final_dtype=np.float64,
+    new_dims_func=quantile_new_dims_func,
 )
 mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None)
 nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None)
@@ -600,9 +658,10 @@ def _initialize_aggregation(
     # where the identity element is 0, 1
     if min_count > 0:
         agg.min_count = min_count
-        agg.chunk += ("nanlen",)
         agg.numpy += ("nanlen",)
-        agg.combine += ("sum",)
+        if agg.chunk != (None,):
+            agg.chunk += ("nanlen",)
+            agg.combine += ("sum",)
         agg.fill_value["intermediate"] += (0,)
         agg.fill_value["numpy"] += (0,)
         agg.dtype["intermediate"] += (np.intp,)


=====================================
flox/core.py
=====================================
@@ -34,9 +34,16 @@ from .aggregations import (
     _atleast_1d,
     _initialize_aggregation,
     generic_aggregate,
+    quantile_new_dims_func,
 )
 from .cache import memoize
-from .xrutils import is_duck_array, is_duck_dask_array, isnull, module_available
+from .xrutils import (
+    is_duck_array,
+    is_duck_dask_array,
+    isnull,
+    module_available,
+    notnull,
+)
 
 if module_available("numpy", minversion="2.0.0"):
     from numpy.lib.array_utils import (  # type: ignore[import-not-found]
@@ -156,7 +163,7 @@ def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex:
     if is_duck_dask_array(by):
         raise ValueError("Please provide expected_groups if not grouping by a numpy array.")
     flatby = by.reshape(-1)
-    expected = pd.unique(flatby[~isnull(flatby)])
+    expected = pd.unique(flatby[notnull(flatby)])
     return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0]
 
 
@@ -936,6 +943,7 @@ def chunk_reduce(
     assert group_idx.ndim == 1
 
     empty = np.all(props.nanmask)
+    hasnan = np.any(props.nanmask)
 
     results: IntermediateDict = {"groups": [], "intermediates": []}
     if reindex and expected_groups is not None:
@@ -990,10 +998,17 @@ def chunk_reduce(
                     # been "offset" and is really expected_groups in nearly all cases
                     seen_groups=seen_groups,
                 )
-            if np.any(props.nanmask):
+            if hasnan:
                 # remove NaN group label which should be last
                 result = result[..., :-1]
-            result = result.reshape(final_array_shape[:-1] + found_groups_shape)
+            # TODO: Figure out how to generalize this
+            if reduction in ("quantile", "nanquantile"):
+                new_dims_shape = tuple(
+                    dim.size for dim in quantile_new_dims_func(**kw) if not dim.is_scalar
+                )
+            else:
+                new_dims_shape = tuple()
+            result = result.reshape(new_dims_shape + final_array_shape[:-1] + found_groups_shape)
         results["intermediates"].append(result)
         previous_reduction = reduction
 
@@ -1028,7 +1043,7 @@ def _finalize_results(
     3. Mask using counts and fill with user-provided fill_value.
     4. reindex to expected_groups
     """
-    squeezed = _squeeze_results(results, axis)
+    squeezed = _squeeze_results(results, tuple(agg.num_new_vector_dims + ax for ax in axis))
 
     min_count = agg.min_count
     if min_count > 0:
@@ -1095,7 +1110,7 @@ def _find_unique_groups(x_chunk) -> np.ndarray:
     from dask.utils import deepmap
 
     unique_groups = _unique(np.asarray(tuple(flatten(deepmap(listify_groups, x_chunk)))))
-    unique_groups = unique_groups[~isnull(unique_groups)]
+    unique_groups = unique_groups[notnull(unique_groups)]
 
     if len(unique_groups) == 0:
         unique_groups = np.array([np.nan])
@@ -1654,8 +1669,13 @@ def dask_groupby_agg(
     else:
         raise ValueError(f"Unknown method={method}.")
 
-    out_inds = inds[: -len(axis)] + (inds[-1],)
-    output_chunks = reduced.chunks[: -len(axis)] + group_chunks
+    # Adjust output for any new dimensions added, example for multiple quantiles
+    new_dims_shape = tuple(dim.size for dim in agg.new_dims if not dim.is_scalar)
+    new_inds = tuple(range(-len(new_dims_shape), 0))
+    out_inds = new_inds + inds[: -len(axis)] + (inds[-1],)
+    output_chunks = new_dims_shape + reduced.chunks[: -len(axis)] + group_chunks
+    new_axes = dict(zip(new_inds, new_dims_shape))
+
     if method == "blockwise" and len(axis) > 1:
         # The final results are available but the blocks along axes
         # need to be reshaped to axis=-1
@@ -1674,6 +1694,7 @@ def dask_groupby_agg(
         key=agg.name,
         name=f"{name}-{token}",
         concatenate=False,
+        new_axes=new_axes,
     )
 
     return (result, groups)
@@ -1959,6 +1980,10 @@ def _choose_engine(by, agg: Aggregation):
 
     not_arg_reduce = not _is_arg_reduction(agg)
 
+    if agg.name in ["quantile", "nanquantile", "median", "nanmedian"]:
+        logger.info(f"_choose_engine: Choosing 'flox' since {agg.name}")
+        return "flox"
+
     # numbagg only supports nan-skipping reductions
     # without dtype specified
     has_blockwise_nan_skipping = (agg.chunk[0] is None and "nan" in agg.name) or any(
@@ -2106,8 +2131,17 @@ def groupby_reduce(
             "See https://github.com/numbagg/numbagg/issues/121."
         )
 
-    if func == "quantile" and (finalize_kwargs is None or "q" not in finalize_kwargs):
-        raise ValueError("Please pass `q` for quantile calculations.")
+    if func in ["quantile", "nanquantile"]:
+        if finalize_kwargs is None or "q" not in finalize_kwargs:
+            raise ValueError("Please pass `q` for quantile calculations.")
+        else:
+            nq = len(_atleast_1d(finalize_kwargs["q"]))
+            if nq > 1 and engine == "numpy":
+                raise ValueError(
+                    "Multiple quantiles not supported with engine='numpy'."
+                    "Use engine='flox' instead (it is also much faster), "
+                    "or set engine=None to use the default."
+                )
 
     bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by)
     nby = len(bys)
@@ -2262,6 +2296,20 @@ def groupby_reduce(
             # TODO: How else to narrow that array.chunks is there?
             assert isinstance(array, DaskArray)
 
+        if (not any_by_dask and method is None) or method == "cohorts":
+            preferred_method, chunks_cohorts = find_group_cohorts(
+                by_,
+                [array.chunks[ax] for ax in range(-by_.ndim, 0)],
+                expected_groups=expected_,
+                # when provided with cohorts, we *always* 'merge'
+                merge=(method == "cohorts"),
+            )
+        else:
+            preferred_method = "map-reduce"
+            chunks_cohorts = {}
+
+        method = _choose_method(method, preferred_method, agg, by_, nax)
+
         if agg.chunk[0] is None and method != "blockwise":
             raise NotImplementedError(
                 f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'."
@@ -2283,19 +2331,6 @@ def groupby_reduce(
                 f"Received method={method!r}"
             )
 
-        if (not any_by_dask and method is None) or method == "cohorts":
-            preferred_method, chunks_cohorts = find_group_cohorts(
-                by_,
-                [array.chunks[ax] for ax in axis_],
-                expected_groups=expected_,
-                # when provided with cohorts, we *always* 'merge'
-                merge=(method == "cohorts"),
-            )
-        else:
-            preferred_method = "map-reduce"
-            chunks_cohorts = {}
-
-        method = _choose_method(method, preferred_method, agg, by_, nax)
         # TODO: clean this up
         reindex = _validate_reindex(
             reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array)


=====================================
flox/xarray.py
=====================================
@@ -9,7 +9,7 @@ import xarray as xr
 from packaging.version import Version
 from xarray.core.duck_array_ops import _datetime_nanmin
 
-from .aggregations import Aggregation, _atleast_1d
+from .aggregations import Aggregation, Dim, _atleast_1d, quantile_new_dims_func
 from .core import (
     _convert_expected_groups_to_index,
     _get_expected_groups,
@@ -74,7 +74,7 @@ def xarray_reduce(
     dim: Dims | ellipsis = None,
     fill_value=None,
     dtype: np.typing.DTypeLike = None,
-    method: str = "map-reduce",
+    method: str | None = None,
     engine: str | None = None,
     keep_attrs: bool | None = True,
     skipna: bool | None = None,
@@ -387,6 +387,17 @@ def xarray_reduce(
 
         result, *groups = groupby_reduce(array, *by, func=func, **kwargs)
 
+        # Transpose the new quantile dimension to the end. This is ugly.
+        # but new core dimensions are expected at the end :/
+        # but groupby_reduce inserts them at the beginning
+        if func in ["quantile", "nanquantile"]:
+            (newdim,) = quantile_new_dims_func(**finalize_kwargs)
+            if not newdim.is_scalar:
+                # NOTE: _restore_dim_order will move any new dims to the end anyway.
+                # This transpose is simply makes it easy to specify output_core_dims
+                # output dim order: (*broadcast_dims, *group_dims, quantile_dim)
+                result = np.moveaxis(result, 0, -1)
+
         # Output of count has an int dtype.
         if requires_numeric and func != "count":
             if is_npdatetime:
@@ -412,8 +423,18 @@ def xarray_reduce(
     input_core_dims = [[d for d in grouper_dims if d not in dim_tuple] + list(dim_tuple)]
     input_core_dims += [list(b.dims) for b in by_da]
 
+    newdims: tuple[Dim, ...] = (
+        quantile_new_dims_func(**finalize_kwargs) if func in ["quantile", "nanquantile"] else ()
+    )
+
     output_core_dims = [d for d in input_core_dims[0] if d not in dim_tuple]
     output_core_dims.extend(group_names)
+    vector_dims = [dim.name for dim in newdims if not dim.is_scalar]
+    output_core_dims.extend(vector_dims)
+
+    output_sizes = group_sizes
+    output_sizes.update({dim.name: dim.size for dim in newdims if dim.size != 0})
+
     actual = xr.apply_ufunc(
         wrapper,
         ds_broad.drop_vars(tuple(missing_dim)).transpose(..., *grouper_dims),
@@ -424,7 +445,7 @@ def xarray_reduce(
         output_core_dims=[output_core_dims],
         dask="allowed",
         dask_gufunc_kwargs=dict(
-            output_sizes=group_sizes, output_dtypes=[dtype] if dtype is not None else None
+            output_sizes=output_sizes, output_dtypes=[dtype] if dtype is not None else None
         ),
         keep_attrs=keep_attrs,
         kwargs={
@@ -451,6 +472,9 @@ def xarray_reduce(
         if all(d not in ds_broad[var].dims for d in dim_tuple):
             actual[var] = ds_broad[var]
 
+    for newdim in newdims:
+        actual.coords[newdim.name] = newdim.values if newdim.is_scalar else np.array(newdim.values)
+
     expect3: T_ExpectIndex | np.ndarray
     for name, expect2, by_ in zip(group_names, expected_groups_valid_list, by_da):
         # Can't remove this until xarray handles IntervalIndex:
@@ -492,7 +516,7 @@ def xarray_reduce(
             else:
                 template = obj
 
-            if actual[var].ndim > 1:
+            if actual[var].ndim > 1 + len(vector_dims):
                 no_groupby_reorder = isinstance(
                     obj, xr.Dataset
                 )  # do not re-order dataarrays inside datasets


=====================================
flox/xrutils.py
=====================================
@@ -100,6 +100,20 @@ def is_scalar(value: Any, include_0d: bool = True) -> bool:
     )
 
 
+def notnull(data):
+    if not is_duck_array(data):
+        data = np.asarray(data)
+
+    scalar_type = data.dtype.type
+    if issubclass(scalar_type, (np.bool_, np.integer, np.character, np.void)):
+        # these types cannot represent missing values
+        return np.ones_like(data, dtype=bool)
+    else:
+        out = isnull(data)
+        np.logical_not(out, out=out)
+        return out
+
+
 def isnull(data):
     if not is_duck_array(data):
         data = np.asarray(data)


=====================================
tests/test_core.py
=====================================
@@ -254,14 +254,19 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
         fill_value = np.nan
         tolerance = {"rtol": 1e-14, "atol": 1e-16}
     elif "quantile" in func:
-        finalize_kwargs = [{"q": DEFAULT_QUANTILE}]
+        finalize_kwargs = [{"q": DEFAULT_QUANTILE}, {"q": [DEFAULT_QUANTILE / 2, DEFAULT_QUANTILE]}]
         fill_value = None
         tolerance = None
     else:
         fill_value = None
         tolerance = None
 
+    # for constructing expected
+    array_func = _get_array_func(func)
+
     for kwargs in finalize_kwargs:
+        if "quantile" in func and isinstance(kwargs["q"], list) and engine != "flox":
+            continue
         flox_kwargs = dict(func=func, engine=engine, finalize_kwargs=kwargs, fill_value=fill_value)
         with np.errstate(invalid="ignore", divide="ignore"):
             with warnings.catch_warnings():
@@ -280,18 +285,19 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
                     array_[..., nanmask] = np.nan
                     expected = getattr(np, func_)(array_, axis=-1, **kwargs)
                 else:
-                    array_func = _get_array_func(func)
                     expected = array_func(array_[..., ~nanmask], axis=-1, **kwargs)
         for _ in range(nby):
             expected = np.expand_dims(expected, -1)
 
         if func in BLOCKWISE_FUNCS:
             assert chunks == -1
-            flox_kwargs["method"] = "blockwise"
 
         actual, *groups = groupby_reduce(array, *by, **flox_kwargs)
-        assert actual.ndim == (array.ndim + nby - 1)
-        assert expected.ndim == (array.ndim + nby - 1)
+        if "quantile" in func and isinstance(kwargs["q"], list):
+            assert actual.ndim == expected.ndim == (array.ndim + nby)
+        else:
+            assert actual.ndim == expected.ndim == (array.ndim + nby - 1)
+
         expected_groups = tuple(np.array([idx + 1.0]) for idx in range(nby))
         for actual_group, expect in zip(groups, expected_groups):
             assert_equal(actual_group, expect)
@@ -299,6 +305,20 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
             assert actual.dtype.kind == "i"
         assert_equal(expected, actual, tolerance)
 
+        if "nan" not in func and "arg" not in func:
+            # test non-NaN skipping behaviour when NaNs are present
+            nanned = array_.copy()
+            # remove nans in by to reduce complexity
+            # We are checking for consistent behaviour with NaNs in array
+            by_ = tuple(np.nan_to_num(b, nan=np.nanmin(b)) for b in by)
+            nanned[[1, 4, 5], ...] = np.nan
+            nanned.reshape(-1)[0] = np.nan
+            actual, *_ = groupby_reduce(nanned, *by_, **flox_kwargs)
+            expected_0 = array_func(nanned, axis=-1, **kwargs)
+            for _ in range(nby):
+                expected_0 = np.expand_dims(expected_0, -1)
+            assert_equal(expected_0, actual, tolerance)
+
         if not has_dask or chunks is None or func in BLOCKWISE_FUNCS:
             continue
 
@@ -583,10 +603,25 @@ def test_nanfirst_nanlast_disallowed_dask(axis, func):
 
 
 @requires_dask
+ at pytest.mark.xfail
+ at pytest.mark.parametrize("func", ["first", "last"])
+def test_first_last_allowed_dask(func):
+    # blockwise should be fine... but doesn't work now.
+    groupby_reduce(dask.array.empty((2, 3, 2)), np.ones((2, 3, 2)), func=func, axis=-1)
+
+
+ at requires_dask
+ at pytest.mark.xfail
 @pytest.mark.parametrize("func", ["first", "last"])
 def test_first_last_disallowed_dask(func):
-    with pytest.raises(NotImplementedError):
-        groupby_reduce(dask.array.empty((2, 3, 2)), np.ones((2, 3, 2)), func=func, axis=-1)
+    # blockwise is fine
+    groupby_reduce(dask.array.empty((2, 3, 2)), np.ones((2, 3, 2)), func=func, axis=-1)
+
+    # anything else is not.
+    with pytest.raises(ValueError):
+        groupby_reduce(
+            dask.array.empty((2, 3, 2), chunks=(-1, -1, 1)), np.ones((2,)), func=func, axis=-1
+        )
 
 
 @requires_dask
@@ -1655,3 +1690,27 @@ def test_xarray_fill_value_behaviour():
     )
     # fmt: on
     assert_equal(expected, actual)
+
+
+ at pytest.mark.parametrize("q", (0.5, (0.5,), (0.5, 0.67, 0.85)))
+ at pytest.mark.parametrize("func", ["nanquantile", "quantile"])
+ at pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False])
+ at pytest.mark.parametrize("by_ndim", [1, 2])
+def test_multiple_quantiles(q, chunk, func, by_ndim):
+    array = np.array([[1, -1, np.nan, 3, 4, 10, 5], [1, np.nan, np.nan, 3, 4, np.nan, np.nan]])
+    labels = np.array([0, 0, 0, 1, 0, 1, 1])
+    if by_ndim == 2:
+        labels = np.broadcast_to(labels, (5, *labels.shape))
+        array = np.broadcast_to(np.expand_dims(array, -2), (2, 5, array.shape[-1]))
+    axis = tuple(range(-by_ndim, 0))
+
+    if chunk:
+        array = dask.array.from_array(array, chunks=(1,) + (-1,) * by_ndim)
+
+    actual, _ = groupby_reduce(array, labels, func=func, finalize_kwargs=dict(q=q), axis=axis)
+    sorted_array = array[..., [0, 1, 2, 4, 3, 5, 6]]
+    f = partial(getattr(np, func), q=q, axis=axis, keepdims=True)
+    expected = np.concatenate((f(sorted_array[..., :4]), f(sorted_array[..., 4:])), axis=-1)
+    if by_ndim == 2:
+        expected = expected.squeeze(axis=-2)
+    assert_equal(expected, actual, tolerance=1e-14)


=====================================
tests/test_xarray.py
=====================================
@@ -421,7 +421,7 @@ def test_groupby_bins_indexed_coordinate(method):
             )
         },
         coords={
-            "time": pd.date_range("2013-01-01", "2013-02-01", freq="6H"),
+            "time": pd.date_range("2013-01-01", "2013-02-01", freq="6h"),
             "lat": np.arange(75.0, 14.9, -2.5),
             "lon": np.arange(200.0, 331.0, 2.5),
         },
@@ -533,7 +533,7 @@ def test_dtype(add_nan, chunk, dtype, dtype_out, engine):
 @pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False])
 @pytest.mark.parametrize("use_flox", [True, False])
 def test_dtype_accumulation(use_flox, chunk):
-    datetimes = pd.date_range("2010-01", "2015-01", freq="6H", inclusive="left")
+    datetimes = pd.date_range("2010-01", "2015-01", freq="6h", inclusive="left")
     samples = 10 + np.cos(2 * np.pi * 0.001 * np.arange(len(datetimes))) * 1
     samples += np.random.randn(len(datetimes))
     samples = samples.astype("float32")
@@ -593,7 +593,7 @@ def test_preserve_multiindex():
 
 
 def test_fill_value_xarray_behaviour():
-    times = pd.date_range("2000-01-01", freq="6H", periods=10)
+    times = pd.date_range("2000-01-01", freq="6h", periods=10)
     ds = xr.Dataset(
         {
             "bar": (
@@ -605,11 +605,11 @@ def test_fill_value_xarray_behaviour():
         }
     )
 
-    pd.date_range("2000-01-01", freq="3H", periods=19)
+    pd.date_range("2000-01-01", freq="3h", periods=19)
     with xr.set_options(use_flox=False):
-        expected = ds.resample(time="3H").sum()
+        expected = ds.resample(time="3h").sum()
     with xr.set_options(use_flox=True):
-        actual = ds.resample(time="3H").sum()
+        actual = ds.resample(time="3h").sum()
     xr.testing.assert_identical(expected, actual)
 
 
@@ -685,3 +685,28 @@ def test_resampling_missing_groups(chunk):
     with xr.set_options(use_flox=True):
         actual = da.resample(time="1D").mean()
     xr.testing.assert_identical(expected, actual)
+
+
+ at pytest.mark.parametrize("q", (0.5, (0.5,), (0.5, 0.67, 0.85)))
+ at pytest.mark.parametrize("skipna", [False, True])
+ at pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False])
+ at pytest.mark.parametrize("by_ndim", [1, 2])
+def test_multiple_quantiles(q, chunk, by_ndim, skipna):
+    array = np.array([[1, -1, np.nan, 3, 4, 10, 5], [1, np.nan, np.nan, 3, 4, np.nan, np.nan]])
+    labels = np.array([0, 0, 0, 1, 0, 1, 1])
+    dims = ("y",)
+    if by_ndim == 2:
+        labels = np.broadcast_to(labels, (5, *labels.shape))
+        array = np.broadcast_to(np.expand_dims(array, -2), (2, 5, array.shape[-1]))
+        dims += ("y0",)
+
+    if chunk:
+        array = dask.array.from_array(array, chunks=(1,) + (-1,) * by_ndim)
+
+    da = xr.DataArray(array, dims=("x", *dims))
+    by = xr.DataArray(labels, dims=dims, name="by")
+
+    actual = xarray_reduce(da, by, func="quantile", skipna=skipna, q=q)
+    with xr.set_options(use_flox=False):
+        expected = da.groupby(by).quantile(q, skipna=skipna)
+    xr.testing.assert_allclose(expected, actual)



View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/c4d83b02aa4e775453ef623cc05ce6a464c2327b...6d6a236512820a0d498e8d70a861e449541e873a

-- 
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/compare/c4d83b02aa4e775453ef623cc05ce6a464c2327b...6d6a236512820a0d498e8d70a861e449541e873a
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240210/acede955/attachment-0001.htm>


More information about the Pkg-grass-devel mailing list