[Git][debian-gis-team/flox][upstream] New upstream version 0.9.10

Antonio Valentino (@antonio.valentino) gitlab at salsa.debian.org
Thu Aug 15 08:09:40 BST 2024



Antonio Valentino pushed to branch upstream at Debian GIS Project / flox


Commits:
9fbeaa3c by Antonio Valentino at 2024-08-15T06:20:43+00:00
New upstream version 0.9.10
- - - - -


12 changed files:

- ci/benchmark.yml
- ci/docs.yml
- flox/__init__.py
- flox/aggregate_flox.py
- flox/aggregations.py
- flox/core.py
- flox/xrdtypes.py
- readthedocs.yml
- tests/conftest.py
- + tests/test_asv.py
- tests/test_core.py
- tests/test_properties.py


Changes:

=====================================
ci/benchmark.yml
=====================================
@@ -6,7 +6,7 @@ dependencies:
   - build
   - cachey
   - dask-core
-  - numpy>=1.22
+  - numpy<2
   - mamba
   - pip
   - python=3.10


=====================================
ci/docs.yml
=====================================
@@ -16,7 +16,7 @@ dependencies:
   - myst-parser
   - myst-nb
   - sphinx
-  - furo
+  - furo>=2024.08
   - ipykernel
   - jupyter
   - sphinx-codeautolink


=====================================
flox/__init__.py
=====================================
@@ -2,7 +2,7 @@
 # flake8: noqa
 """Top-level module for flox ."""
 from . import cache
-from .aggregations import Aggregation  # noqa
+from .aggregations import Aggregation, Scan  # noqa
 from .core import groupby_reduce, groupby_scan, rechunk_for_blockwise, rechunk_for_cohorts  # noqa
 
 


=====================================
flox/aggregate_flox.py
=====================================
@@ -2,6 +2,7 @@ from functools import partial
 
 import numpy as np
 
+from . import xrdtypes as dtypes
 from .xrutils import is_scalar, isnull, notnull
 
 
@@ -98,7 +99,7 @@ def quantile_(array, inv_idx, *, q, axis, skipna, group_idx, dtype=None, out=Non
     # partition the complex array in-place
     labels_broadcast = np.broadcast_to(group_idx, array.shape)
     with np.errstate(invalid="ignore"):
-        cmplx = labels_broadcast + 1j * array
+        cmplx = labels_broadcast + 1j * (array.view(int) if array.dtype.kind in "Mm" else array)
     cmplx.partition(kth=kth, axis=-1)
     if is_scalar_q:
         a_ = cmplx.imag
@@ -158,6 +159,8 @@ def _np_grouped_op(
 
 
 def _nan_grouped_op(group_idx, array, func, fillna, *args, **kwargs):
+    if fillna in [dtypes.INF, dtypes.NINF]:
+        fillna = dtypes._get_fill_value(kwargs.get("dtype", None) or array.dtype, fillna)
     result = func(group_idx, np.where(isnull(array), fillna, array), *args, **kwargs)
     # np.nanmax([np.nan, np.nan]) = np.nan
     # To recover this behaviour, we need to search for the fillna value
@@ -175,9 +178,9 @@ nansum = partial(_nan_grouped_op, func=sum, fillna=0)
 prod = partial(_np_grouped_op, op=np.multiply.reduceat)
 nanprod = partial(_nan_grouped_op, func=prod, fillna=1)
 max = partial(_np_grouped_op, op=np.maximum.reduceat)
-nanmax = partial(_nan_grouped_op, func=max, fillna=-np.inf)
+nanmax = partial(_nan_grouped_op, func=max, fillna=dtypes.NINF)
 min = partial(_np_grouped_op, op=np.minimum.reduceat)
-nanmin = partial(_nan_grouped_op, func=min, fillna=np.inf)
+nanmin = partial(_nan_grouped_op, func=min, fillna=dtypes.INF)
 quantile = partial(_np_grouped_op, op=partial(quantile_, skipna=False))
 nanquantile = partial(_np_grouped_op, op=partial(quantile_, skipna=True))
 median = partial(partial(_np_grouped_op, q=0.5), op=partial(quantile_, skipna=False))


=====================================
flox/aggregations.py
=====================================
@@ -115,60 +115,6 @@ def generic_aggregate(
     return result
 
 
-def _normalize_dtype(dtype: DTypeLike, array_dtype: np.dtype, fill_value=None) -> np.dtype:
-    if dtype is None:
-        dtype = array_dtype
-    if dtype is np.floating:
-        # mean, std, var always result in floating
-        # but we preserve the array's dtype if it is floating
-        if array_dtype.kind in "fcmM":
-            dtype = array_dtype
-        else:
-            dtype = np.dtype("float64")
-    elif not isinstance(dtype, np.dtype):
-        dtype = np.dtype(dtype)
-    if fill_value not in [None, dtypes.INF, dtypes.NINF, dtypes.NA]:
-        dtype = np.result_type(dtype, fill_value)
-    return dtype
-
-
-def _maybe_promote_int(dtype) -> np.dtype:
-    # https://numpy.org/doc/stable/reference/generated/numpy.prod.html
-    # The dtype of a is used by default unless a has an integer dtype of less precision
-    # than the default platform integer.
-    if not isinstance(dtype, np.dtype):
-        dtype = np.dtype(dtype)
-    if dtype.kind == "i":
-        dtype = np.result_type(dtype, np.intp)
-    elif dtype.kind == "u":
-        dtype = np.result_type(dtype, np.uintp)
-    return dtype
-
-
-def _get_fill_value(dtype, fill_value):
-    """Returns dtype appropriate infinity. Returns +Inf equivalent for None."""
-    if fill_value in [None, dtypes.NA] and dtype.kind in "US":
-        return ""
-    if fill_value == dtypes.INF or fill_value is None:
-        return dtypes.get_pos_infinity(dtype, max_for_int=True)
-    if fill_value == dtypes.NINF:
-        return dtypes.get_neg_infinity(dtype, min_for_int=True)
-    if fill_value == dtypes.NA:
-        if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
-            return np.nan
-        # This is madness, but npg checks that fill_value is compatible
-        # with array dtype even if the fill_value is never used.
-        elif (
-            np.issubdtype(dtype, np.integer)
-            or np.issubdtype(dtype, np.timedelta64)
-            or np.issubdtype(dtype, np.datetime64)
-        ):
-            return dtypes.get_neg_infinity(dtype, min_for_int=True)
-        else:
-            return None
-    return fill_value
-
-
 def _atleast_1d(inp, min_length: int = 1):
     if xrutils.is_scalar(inp):
         inp = (inp,) * min_length
@@ -210,6 +156,7 @@ class Aggregation:
         final_dtype: DTypeLike | None = None,
         reduction_type: Literal["reduce", "argreduce"] = "reduce",
         new_dims_func: Callable | None = None,
+        preserves_dtype: bool = False,
     ):
         """
         Blueprint for computing grouped aggregations.
@@ -256,6 +203,8 @@ class Aggregation:
             Function that receives finalize_kwargs and returns a tupleof sizes of any new dimensions
             added by the reduction. For e.g. quantile for q=(0.5, 0.85) adds a new dimension of size 2,
             so returns (2,)
+        preserves_dtype: bool,
+           Whether a function preserves the dtype on return E.g. min, max, first, last, mode
         """
         self.name = name
         # preprocess before blockwise
@@ -292,6 +241,7 @@ class Aggregation:
         self.new_dims_func: Callable = (
             returns_empty_tuple if new_dims_func is None else new_dims_func
         )
+        self.preserves_dtype = preserves_dtype
 
     @cached_property
     def new_dims(self) -> tuple[Dim]:
@@ -434,10 +384,14 @@ nanstd = Aggregation(
 )
 
 
-min_ = Aggregation("min", chunk="min", combine="min", fill_value=dtypes.INF)
-nanmin = Aggregation("nanmin", chunk="nanmin", combine="nanmin", fill_value=np.nan)
-max_ = Aggregation("max", chunk="max", combine="max", fill_value=dtypes.NINF)
-nanmax = Aggregation("nanmax", chunk="nanmax", combine="nanmax", fill_value=np.nan)
+min_ = Aggregation("min", chunk="min", combine="min", fill_value=dtypes.INF, preserves_dtype=True)
+nanmin = Aggregation(
+    "nanmin", chunk="nanmin", combine="nanmin", fill_value=dtypes.NA, preserves_dtype=True
+)
+max_ = Aggregation("max", chunk="max", combine="max", fill_value=dtypes.NINF, preserves_dtype=True)
+nanmax = Aggregation(
+    "nanmax", chunk="nanmax", combine="nanmax", fill_value=dtypes.NA, preserves_dtype=True
+)
 
 
 def argreduce_preprocess(array, axis):
@@ -525,10 +479,14 @@ nanargmin = Aggregation(
     final_dtype=np.intp,
 )
 
-first = Aggregation("first", chunk=None, combine=None, fill_value=None)
-last = Aggregation("last", chunk=None, combine=None, fill_value=None)
-nanfirst = Aggregation("nanfirst", chunk="nanfirst", combine="nanfirst", fill_value=dtypes.NA)
-nanlast = Aggregation("nanlast", chunk="nanlast", combine="nanlast", fill_value=dtypes.NA)
+first = Aggregation("first", chunk=None, combine=None, fill_value=None, preserves_dtype=True)
+last = Aggregation("last", chunk=None, combine=None, fill_value=None, preserves_dtype=True)
+nanfirst = Aggregation(
+    "nanfirst", chunk="nanfirst", combine="nanfirst", fill_value=dtypes.NA, preserves_dtype=True
+)
+nanlast = Aggregation(
+    "nanlast", chunk="nanlast", combine="nanlast", fill_value=dtypes.NA, preserves_dtype=True
+)
 
 all_ = Aggregation(
     "all",
@@ -579,8 +537,12 @@ nanquantile = Aggregation(
     final_dtype=np.floating,
     new_dims_func=quantile_new_dims_func,
 )
-mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None)
-nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None)
+mode = Aggregation(
+    name="mode", fill_value=dtypes.NA, chunk=None, combine=None, preserves_dtype=True
+)
+nanmode = Aggregation(
+    name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None, preserves_dtype=True
+)
 
 
 @dataclass
@@ -634,7 +596,7 @@ class AlignedArrays:
             # TODO: automate?
             engine="flox",
             dtype=self.array.dtype,
-            fill_value=_get_fill_value(self.array.dtype, dtypes.NA),
+            fill_value=dtypes._get_fill_value(self.array.dtype, dtypes.NA),
             expected_groups=None,
         )
         return AlignedArrays(array=reduced["intermediates"][0], group_idx=reduced["groups"])
@@ -729,6 +691,7 @@ ffill = Scan(
     binary_op=None,
     reduction="nanlast",
     scan="ffill",
+    # Important: this must be NaN otherwise, ffill does not work.
     identity=np.nan,
     mode="concat_then_scan",
 )
@@ -737,6 +700,7 @@ bfill = Scan(
     binary_op=None,
     reduction="nanlast",
     scan="ffill",
+    # Important: this must be NaN otherwise, bfill does not work.
     identity=np.nan,
     mode="concat_then_scan",
     preprocess=reverse,
@@ -815,17 +779,18 @@ def _initialize_aggregation(
     dtype_: np.dtype | None = (
         np.dtype(dtype) if dtype is not None and not isinstance(dtype, np.dtype) else dtype
     )
-
-    final_dtype = _normalize_dtype(dtype_ or agg.dtype_init["final"], array_dtype, fill_value)
-    if agg.name not in ["first", "last", "nanfirst", "nanlast", "min", "max", "nanmin", "nanmax"]:
-        final_dtype = _maybe_promote_int(final_dtype)
+    final_dtype = dtypes._normalize_dtype(
+        dtype_ or agg.dtype_init["final"], array_dtype, fill_value
+    )
+    if not agg.preserves_dtype:
+        final_dtype = dtypes._maybe_promote_int(final_dtype)
     agg.dtype = {
         "user": dtype,  # Save to automatically choose an engine
         "final": final_dtype,
         "numpy": (final_dtype,),
         "intermediate": tuple(
             (
-                _normalize_dtype(int_dtype, np.result_type(array_dtype, final_dtype), int_fv)
+                dtypes._normalize_dtype(int_dtype, np.result_type(array_dtype, final_dtype), int_fv)
                 if int_dtype is None
                 else np.dtype(int_dtype)
             )
@@ -838,10 +803,10 @@ def _initialize_aggregation(
     # Replace sentinel fill values according to dtype
     agg.fill_value["user"] = fill_value
     agg.fill_value["intermediate"] = tuple(
-        _get_fill_value(dt, fv)
+        dtypes._get_fill_value(dt, fv)
         for dt, fv in zip(agg.dtype["intermediate"], agg.fill_value["intermediate"])
     )
-    agg.fill_value[func] = _get_fill_value(agg.dtype["final"], agg.fill_value[func])
+    agg.fill_value[func] = dtypes._get_fill_value(agg.dtype["final"], agg.fill_value[func])
 
     fv = fill_value if fill_value is not None else agg.fill_value[agg.name]
     if _is_arg_reduction(agg):


=====================================
flox/core.py
=====================================
@@ -170,7 +170,9 @@ def _is_minmax_reduction(func: T_Agg) -> bool:
 
 
 def _is_first_last_reduction(func: T_Agg) -> bool:
-    return isinstance(func, str) and func in ["nanfirst", "nanlast", "first", "last"]
+    if isinstance(func, Aggregation):
+        func = func.name
+    return func in ["nanfirst", "nanlast", "first", "last"]
 
 
 def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex:
@@ -1642,7 +1644,12 @@ def dask_groupby_agg(
     #       This allows us to discover groups at compute time, support argreductions, lower intermediate
     #       memory usage (but method="cohorts" would also work to reduce memory in some cases)
     labels_are_unknown = is_duck_dask_array(by_input) and expected_groups is None
-    do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
+    do_grouped_combine = (
+        _is_arg_reduction(agg)
+        or labels_are_unknown
+        or (_is_first_last_reduction(agg) and array.dtype.kind != "f")
+    )
+    do_simple_combine = not do_grouped_combine
 
     if method == "blockwise":
         #  use the "non dask" code path, but applied blockwise
@@ -1698,7 +1705,7 @@ def dask_groupby_agg(
 
         tree_reduce = partial(
             dask.array.reductions._tree_reduce,
-            name=f"{name}-reduce",
+            name=f"{name}-simple-reduce",
             dtype=array.dtype,
             axis=axis,
             keepdims=True,
@@ -1733,14 +1740,20 @@ def dask_groupby_agg(
             groups_ = []
             for blks, cohort in chunks_cohorts.items():
                 cohort_index = pd.Index(cohort)
-                reindexer = partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
+                reindexer = (
+                    partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
+                    if do_simple_combine
+                    else identity
+                )
                 reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer)
                 # now that we have reindexed, we can set reindex=True explicitlly
                 reduced_.append(
                     tree_reduce(
                         reindexed,
-                        combine=partial(combine, agg=agg, reindex=True),
-                        aggregate=partial(aggregate, expected_groups=cohort_index, reindex=True),
+                        combine=partial(combine, agg=agg, reindex=do_simple_combine),
+                        aggregate=partial(
+                            aggregate, expected_groups=cohort_index, reindex=do_simple_combine
+                        ),
                     )
                 )
                 # This is done because pandas promotes to 64-bit types when an Index is created
@@ -1986,8 +1999,13 @@ def _validate_reindex(
     expected_groups,
     any_by_dask: bool,
     is_dask_array: bool,
+    array_dtype: Any,
 ) -> bool | None:
     # logger.debug("Entering _validate_reindex: reindex is {}".format(reindex))  # noqa
+    def first_or_last():
+        return func in ["first", "last"] or (
+            _is_first_last_reduction(func) and array_dtype.kind != "f"
+        )
 
     all_numpy = not is_dask_array and not any_by_dask
     if reindex is True and not all_numpy:
@@ -1997,7 +2015,7 @@ def _validate_reindex(
             raise ValueError(
                 "reindex=True is not a valid choice for method='blockwise' or method='cohorts'."
             )
-        if func in ["first", "last"]:
+        if first_or_last():
             raise ValueError("reindex must be None or False when func is 'first' or 'last.")
 
     if reindex is None:
@@ -2008,9 +2026,10 @@ def _validate_reindex(
         if all_numpy:
             return True
 
-        if func in ["first", "last"]:
+        if first_or_last():
             # have to do the grouped_combine since there's no good fill_value
-            reindex = False
+            # Also needed for nanfirst, nanlast with no-NaN dtypes
+            return False
 
         if method == "blockwise":
             # for grouping by dask arrays, we set reindex=True
@@ -2412,12 +2431,19 @@ def groupby_reduce(
     if method == "cohorts" and any_by_dask:
         raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.")
 
+    if not is_duck_array(array):
+        array = np.asarray(array)
+
     reindex = _validate_reindex(
-        reindex, func, method, expected_groups, any_by_dask, is_duck_dask_array(array)
+        reindex,
+        func,
+        method,
+        expected_groups,
+        any_by_dask,
+        is_duck_dask_array(array),
+        array.dtype,
     )
 
-    if not is_duck_array(array):
-        array = np.asarray(array)
     is_bool_array = np.issubdtype(array.dtype, bool)
     array = array.astype(np.intp) if is_bool_array else array
 
@@ -2601,7 +2627,7 @@ def groupby_reduce(
 
         # TODO: clean this up
         reindex = _validate_reindex(
-            reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array)
+            reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array), array.dtype
         )
 
         if TYPE_CHECKING:
@@ -2637,10 +2663,18 @@ def groupby_reduce(
                 groups = (groups[0][sorted_idx],)
 
     if factorize_early:
+        assert len(groups) == 1
+        (groups_,) = groups
         # nan group labels are factorized to -1, and preserved
         # now we get rid of them by reindexing
-        # This also handles bins with no data
-        result = reindex_(result, from_=groups[0], to=expected_, fill_value=fill_value).reshape(
+        # First, for "blockwise", we can have -1 repeated in different blocks
+        # This breaks the reindexing so remove those first.
+        if method == "blockwise" and (mask := groups_ == -1).sum(axis=-1) > 1:
+            result = result[..., ~mask]
+            groups_ = groups_[..., ~mask]
+
+        # This reindex also handles bins with no data
+        result = reindex_(result, from_=groups_, to=expected_, fill_value=fill_value).reshape(
             result.shape[:-1] + grp_shape
         )
         groups = final_groups


=====================================
flox/xrdtypes.py
=====================================
@@ -1,6 +1,7 @@
 import functools
 
 import numpy as np
+from numpy.typing import DTypeLike
 
 from . import xrutils as utils
 
@@ -147,3 +148,57 @@ def get_neg_infinity(dtype, min_for_int=False):
 def is_datetime_like(dtype):
     """Check if a dtype is a subclass of the numpy datetime types"""
     return np.issubdtype(dtype, np.datetime64) or np.issubdtype(dtype, np.timedelta64)
+
+
+def _normalize_dtype(dtype: DTypeLike, array_dtype: np.dtype, fill_value=None) -> np.dtype:
+    if dtype is None:
+        dtype = array_dtype
+    if dtype is np.floating:
+        # mean, std, var always result in floating
+        # but we preserve the array's dtype if it is floating
+        if array_dtype.kind in "fcmM":
+            dtype = array_dtype
+        else:
+            dtype = np.dtype("float64")
+    elif not isinstance(dtype, np.dtype):
+        dtype = np.dtype(dtype)
+    if fill_value not in [None, INF, NINF, NA]:
+        dtype = np.result_type(dtype, fill_value)
+    return dtype
+
+
+def _maybe_promote_int(dtype) -> np.dtype:
+    # https://numpy.org/doc/stable/reference/generated/numpy.prod.html
+    # The dtype of a is used by default unless a has an integer dtype of less precision
+    # than the default platform integer.
+    if not isinstance(dtype, np.dtype):
+        dtype = np.dtype(dtype)
+    if dtype.kind == "i":
+        dtype = np.result_type(dtype, np.intp)
+    elif dtype.kind == "u":
+        dtype = np.result_type(dtype, np.uintp)
+    return dtype
+
+
+def _get_fill_value(dtype, fill_value):
+    """Returns dtype appropriate infinity. Returns +Inf equivalent for None."""
+    if fill_value in [None, NA] and dtype.kind in "US":
+        return ""
+    if fill_value == INF or fill_value is None:
+        return get_pos_infinity(dtype, max_for_int=True)
+    if fill_value == NINF:
+        return get_neg_infinity(dtype, min_for_int=True)
+    if fill_value == NA:
+        if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
+            return np.nan
+        # This is madness, but npg checks that fill_value is compatible
+        # with array dtype even if the fill_value is never used.
+        elif np.issubdtype(dtype, np.integer):
+            return get_neg_infinity(dtype, min_for_int=True)
+        elif np.issubdtype(dtype, np.timedelta64):
+            return np.timedelta64("NaT")
+        elif np.issubdtype(dtype, np.datetime64):
+            return np.datetime64("NaT")
+        else:
+            return None
+    return fill_value


=====================================
readthedocs.yml
=====================================
@@ -1,9 +1,9 @@
 version: 2
 
 build:
-  os: "ubuntu-20.04"
+  os: "ubuntu-lts-latest"
   tools:
-    python: "mambaforge-4.10"
+    python: "mambaforge-latest"
 
 conda:
   environment: ci/docs.yml


=====================================
tests/conftest.py
=====================================
@@ -10,11 +10,12 @@ settings.register_profile(
     suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow],
 )
 settings.register_profile(
-    "local",
+    "default",
     max_examples=300,
     suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow],
     verbosity=Verbosity.verbose,
 )
+settings.load_profile("default")
 
 
 @pytest.fixture(


=====================================
tests/test_asv.py
=====================================
@@ -0,0 +1,24 @@
+# Run asv benchmarks as tests
+
+import pytest
+
+pytest.importorskip("dask")
+
+from asv_bench.benchmarks import reduce
+
+
+ at pytest.mark.parametrize(
+    "problem", [reduce.ChunkReduce1D, reduce.ChunkReduce2D, reduce.ChunkReduce2DAllAxes]
+)
+def test_reduce(problem) -> None:
+    testcase = problem()
+    testcase.setup()
+    for args in zip(*testcase.time_reduce.params):
+        testcase.time_reduce(*args)
+
+
+def test_reduce_bare() -> None:
+    testcase = reduce.ChunkReduce1D()
+    testcase.setup()
+    for args in zip(*testcase.time_reduce_bare.params):
+        testcase.time_reduce_bare(*args)


=====================================
tests/test_core.py
=====================================
@@ -13,8 +13,9 @@ import pytest
 from numpy_groupies.aggregate_numpy import aggregate
 
 import flox
+from flox import xrdtypes as dtypes
 from flox import xrutils
-from flox.aggregations import Aggregation, _initialize_aggregation, _maybe_promote_int
+from flox.aggregations import Aggregation, _initialize_aggregation
 from flox.core import (
     HAS_NUMBAGG,
     _choose_engine,
@@ -161,7 +162,7 @@ def test_groupby_reduce(
     if func == "mean" or func == "nanmean":
         expected_result = np.array(expected, dtype=np.float64)
     elif func == "sum":
-        expected_result = np.array(expected, dtype=_maybe_promote_int(array.dtype))
+        expected_result = np.array(expected, dtype=dtypes._maybe_promote_int(array.dtype))
     elif func == "count":
         expected_result = np.array(expected, dtype=np.intp)
 
@@ -389,7 +390,7 @@ def test_groupby_reduce_preserves_dtype(dtype, func):
     array = np.ones((2, 12), dtype=dtype)
     by = np.array([labels] * 2)
     result, _ = groupby_reduce(from_array(array, chunks=(-1, 4)), by, func=func)
-    expect_dtype = _maybe_promote_int(array.dtype)
+    expect_dtype = dtypes._maybe_promote_int(array.dtype)
     assert result.dtype == expect_dtype
 
 
@@ -613,6 +614,33 @@ def test_dask_reduce_axis_subset():
         )
 
 
+ at pytest.mark.parametrize("group_idx", [[0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 1, 0]])
+ at pytest.mark.parametrize(
+    "func",
+    [
+        # "first", "last",
+        "nanfirst",
+        "nanlast",
+    ],
+)
+ at pytest.mark.parametrize(
+    "chunks",
+    [
+        None,
+        pytest.param(1, marks=pytest.mark.skipif(not has_dask, reason="no dask")),
+        pytest.param(2, marks=pytest.mark.skipif(not has_dask, reason="no dask")),
+        pytest.param(3, marks=pytest.mark.skipif(not has_dask, reason="no dask")),
+    ],
+)
+def test_first_last_useless(func, chunks, group_idx):
+    array = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int8)
+    if chunks is not None:
+        array = dask.array.from_array(array, chunks=chunks)
+    actual, _ = groupby_reduce(array, np.array(group_idx), func=func, engine="numpy")
+    expected = np.array([[0, 0], [0, 0]], dtype=np.int8)
+    assert_equal(actual, expected)
+
+
 @pytest.mark.parametrize("func", ["first", "last", "nanfirst", "nanlast"])
 @pytest.mark.parametrize("axis", [(0, 1)])
 def test_first_last_disallowed(axis, func):
@@ -1027,7 +1055,7 @@ def test_dtype_preservation(dtype, func, engine):
         # https://github.com/numbagg/numbagg/issues/121
         pytest.skip()
     if func == "sum":
-        expected = _maybe_promote_int(dtype)
+        expected = dtypes._maybe_promote_int(dtype)
     elif func == "mean" and "int" in dtype:
         expected = np.float64
     else:
@@ -1058,7 +1086,7 @@ def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype):
     actual, actual_groups = groupby_reduce(array, labels, func="sum", method=method)
     assert_equal(actual_groups, np.arange(6, dtype=labels.dtype))
 
-    expect_dtype = _maybe_promote_int(dtype)
+    expect_dtype = dtypes._maybe_promote_int(dtype)
     assert_equal(actual, np.array([0, 4, 24, 6, 12, 20], dtype=expect_dtype))
 
 
@@ -1563,18 +1591,36 @@ def test_validate_reindex_map_reduce(
     dask_expected, reindex, func, expected_groups, any_by_dask
 ) -> None:
     actual = _validate_reindex(
-        reindex, func, "map-reduce", expected_groups, any_by_dask, is_dask_array=True
+        reindex,
+        func,
+        "map-reduce",
+        expected_groups,
+        any_by_dask,
+        is_dask_array=True,
+        array_dtype=np.dtype("int32"),
     )
     assert actual is dask_expected
 
     # always reindex with all numpy inputs
     actual = _validate_reindex(
-        reindex, func, "map-reduce", expected_groups, any_by_dask=False, is_dask_array=False
+        reindex,
+        func,
+        "map-reduce",
+        expected_groups,
+        any_by_dask=False,
+        is_dask_array=False,
+        array_dtype=np.dtype("int32"),
     )
     assert actual
 
     actual = _validate_reindex(
-        True, func, "map-reduce", expected_groups, any_by_dask=False, is_dask_array=False
+        True,
+        func,
+        "map-reduce",
+        expected_groups,
+        any_by_dask=False,
+        is_dask_array=False,
+        array_dtype=np.dtype("int32"),
     )
     assert actual
 
@@ -1584,19 +1630,37 @@ def test_validate_reindex() -> None:
     for method in methods:
         with pytest.raises(NotImplementedError):
             _validate_reindex(
-                True, "argmax", method, expected_groups=None, any_by_dask=False, is_dask_array=True
+                True,
+                "argmax",
+                method,
+                expected_groups=None,
+                any_by_dask=False,
+                is_dask_array=True,
+                array_dtype=np.dtype("int32"),
             )
 
     methods: list[T_Method] = ["blockwise", "cohorts"]
     for method in methods:
         with pytest.raises(ValueError):
             _validate_reindex(
-                True, "sum", method, expected_groups=None, any_by_dask=False, is_dask_array=True
+                True,
+                "sum",
+                method,
+                expected_groups=None,
+                any_by_dask=False,
+                is_dask_array=True,
+                array_dtype=np.dtype("int32"),
             )
 
         for func in ["sum", "argmax"]:
             actual = _validate_reindex(
-                None, func, method, expected_groups=None, any_by_dask=False, is_dask_array=True
+                None,
+                func,
+                method,
+                expected_groups=None,
+                any_by_dask=False,
+                is_dask_array=True,
+                array_dtype=np.dtype("int32"),
             )
             assert actual is False
 
@@ -1608,6 +1672,7 @@ def test_validate_reindex() -> None:
             expected_groups=np.array([1, 2, 3]),
             any_by_dask=False,
             is_dask_array=True,
+            array_dtype=np.dtype("int32"),
         )
 
     assert _validate_reindex(
@@ -1617,6 +1682,7 @@ def test_validate_reindex() -> None:
         expected_groups=np.array([1, 2, 3]),
         any_by_dask=True,
         is_dask_array=True,
+        array_dtype=np.dtype("int32"),
     )
     assert _validate_reindex(
         None,
@@ -1625,8 +1691,24 @@ def test_validate_reindex() -> None:
         expected_groups=np.array([1, 2, 3]),
         any_by_dask=True,
         is_dask_array=True,
+        array_dtype=np.dtype("int32"),
     )
 
+    kwargs = dict(
+        method="blockwise",
+        expected_groups=np.array([1, 2, 3]),
+        any_by_dask=True,
+        is_dask_array=True,
+    )
+
+    for func in ["nanfirst", "nanlast"]:
+        assert not _validate_reindex(None, func, array_dtype=np.dtype("int32"), **kwargs)  # type: ignore[arg-type]
+        assert _validate_reindex(None, func, array_dtype=np.dtype("float32"), **kwargs)  # type: ignore[arg-type]
+
+    for func in ["first", "last"]:
+        assert not _validate_reindex(None, func, array_dtype=np.dtype("int32"), **kwargs)  # type: ignore[arg-type]
+        assert not _validate_reindex(None, func, array_dtype=np.dtype("float32"), **kwargs)  # type: ignore[arg-type]
+
 
 @requires_dask
 def test_1d_blockwise_sort_optimization():
@@ -1847,3 +1929,17 @@ def test_ffill_bfill(chunks, size, add_nan_by, func):
     expected = flox.groupby_scan(array.compute(), by, func=func)
     actual = flox.groupby_scan(array, by, func=func)
     assert_equal(expected, actual)
+
+
+ at requires_dask
+def test_blockwise_nans():
+    array = dask.array.ones((1, 10), chunks=2)
+    by = np.array([-1, 0, -1, 1, -1, 2, -1, 3, 4, 4])
+    actual, actual_groups = flox.groupby_reduce(
+        array, by, func="sum", expected_groups=pd.RangeIndex(0, 5)
+    )
+    expected, expected_groups = flox.groupby_reduce(
+        array.compute(), by, func="sum", expected_groups=pd.RangeIndex(0, 5)
+    )
+    assert_equal(expected_groups, actual_groups)
+    assert_equal(expected, actual)


=====================================
tests/test_properties.py
=====================================
@@ -9,6 +9,7 @@ pytest.importorskip("dask")
 pytest.importorskip("cftime")
 
 import dask
+import hypothesis.extra.numpy as npst
 import hypothesis.strategies as st
 import numpy as np
 from hypothesis import assume, given, note
@@ -19,6 +20,7 @@ from flox.xrutils import notnull
 
 from . import assert_equal
 from .strategies import by_arrays, chunked_arrays, func_st, numeric_arrays
+from .strategies import chunks as chunks_strategy
 
 dask.config.set(scheduler="sync")
 
@@ -59,7 +61,11 @@ def not_overflowing_array(array: np.ndarray[Any, Any]) -> bool:
     return result
 
 
- at given(data=st.data(), array=numeric_arrays, func=func_st)
+ at given(
+    data=st.data(),
+    array=st.one_of(numeric_arrays, chunked_arrays(arrays=numeric_arrays)),
+    func=func_st,
+)
 def test_groupby_reduce(data, array, func: str) -> None:
     # overflow behaviour differs between bincount and sum (for example)
     assume(not_overflowing_array(array))
@@ -91,7 +97,13 @@ def test_groupby_reduce(data, array, func: str) -> None:
 
         # numpy-groupies always does the calculation in float64
         if (
-            ("var" in func or "std" in func or "sum" in func or "mean" in func)
+            (
+                "var" in func
+                or "std" in func
+                or "sum" in func
+                or "mean" in func
+                or "quantile" in func
+            )
             and array.dtype.kind == "f"
             and array.dtype.itemsize != 8
         ):
@@ -208,3 +220,16 @@ def test_first_last(data, array: dask.array.Array, func: str) -> None:
         first, *_ = groupby_reduce(array, by, func=func, engine="flox")
         second, *_ = groupby_reduce(array, by, func=mate, engine="flox")
         assert_equal(first, second)
+
+
+ at given(data=st.data(), func=st.sampled_from(["nanfirst", "nanlast"]))
+def test_first_last_useless(data, func):
+    shape = data.draw(npst.array_shapes())
+    by = data.draw(by_arrays(shape=shape[slice(-1, None)]))
+    chunks = data.draw(chunks_strategy(shape=shape))
+    array = np.zeros(shape, dtype=np.int8)
+    if chunks is not None:
+        array = dask.array.from_array(array, chunks=chunks)
+    actual, groups = groupby_reduce(array, by, axis=-1, func=func, engine="numpy")
+    expected = np.zeros(shape[:-1] + (len(groups),), dtype=array.dtype)
+    assert_equal(actual, expected)



View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/9fbeaa3c30ae8cb0998fbde29e4a8a6985027ffd

-- 
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/9fbeaa3c30ae8cb0998fbde29e4a8a6985027ffd
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240815/71a16d7e/attachment-0001.htm>


More information about the Pkg-grass-devel mailing list