[Git][debian-gis-team/flox][upstream] New upstream version 0.10.5

Antonio Valentino (@antonio.valentino) gitlab at salsa.debian.org
Mon Aug 18 10:22:03 BST 2025



Antonio Valentino pushed to branch upstream at Debian GIS Project / flox


Commits:
df73f5f7 by Antonio Valentino at 2025-08-18T09:02:47+00:00
New upstream version 0.10.5
- - - - -


25 changed files:

- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .gitignore
- .pre-commit-config.yaml
- + CLAUDE.md
- ci/docs.yml
- ci/environment.yml
- ci/minimal-requirements.yml
- ci/no-dask.yml
- ci/no-numba.yml
- ci/no-xarray.yml
- docs/source/conf.py
- docs/source/xarray.md
- flox/__init__.py
- flox/aggregate_flox.py
- flox/aggregate_sparse.py
- flox/aggregations.py
- flox/core.py
- + flox/options.py
- flox/xarray.py
- flox/xrdtypes.py
- flox/xrutils.py
- pyproject.toml
- tests/test_core.py
- tests/test_properties.py


Changes:

=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -77,7 +77,7 @@ jobs:
           --ignore flox/tests \
           --cov=./ --cov-report=xml
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v5.4.2
+        uses: codecov/codecov-action at v5.4.3
         with:
           file: ./coverage.xml
           flags: unittests
@@ -132,7 +132,7 @@ jobs:
           python -m mypy --install-types --non-interactive --cache-dir=.mypy_cache/ --cobertura-xml-report mypy_report
 
       - name: Upload mypy coverage to Codecov
-        uses: codecov/codecov-action at v5.4.2
+        uses: codecov/codecov-action at v5.4.3
         with:
           file: mypy_report/cobertura.xml
           flags: mypy


=====================================
.github/workflows/ci.yaml
=====================================
@@ -26,7 +26,7 @@ jobs:
       matrix:
         os: ["ubuntu-latest"]
         env: ["environment"]
-        python-version: ["3.10", "3.13"]
+        python-version: ["3.11", "3.13"]
         include:
           - os: "windows-latest"
             env: "environment"
@@ -36,10 +36,10 @@ jobs:
             python-version: "3.13"
           - os: "ubuntu-latest"
             env: "minimal-requirements"
-            python-version: "3.10"
+            python-version: "3.11"
           - os: "windows-latest"
             env: "env-numpy1"
-            python-version: "3.10"
+            python-version: "3.11"
     steps:
       - uses: actions/checkout at v4
         with:
@@ -76,7 +76,7 @@ jobs:
           python -c "import xarray; xarray.show_versions()"
           pytest --durations=20 --durations-min=0.5 -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
       - name: Upload code coverage to Codecov
-        uses: codecov/codecov-action at v5.4.2
+        uses: codecov/codecov-action at v5.4.3
         with:
           file: ./coverage.xml
           flags: unittests


=====================================
.gitignore
=====================================
@@ -110,3 +110,6 @@ venv.bak/
 .mypy_cache/
 
 .DS_Store
+
+# Git worktrees
+worktrees/


=====================================
.pre-commit-config.yaml
=====================================
@@ -4,17 +4,12 @@ ci:
 repos:
   - repo: https://github.com/astral-sh/ruff-pre-commit
     # Ruff version.
-    rev: "v0.11.4"
+    rev: "v0.12.2"
     hooks:
       - id: ruff
         args: ["--fix", "--show-fixes"]
       - id: ruff-format
 
-  - repo: https://github.com/pre-commit/mirrors-prettier
-    rev: "v4.0.0-alpha.8"
-    hooks:
-      - id: prettier
-
   - repo: https://github.com/pre-commit/pre-commit-hooks
     rev: v5.0.0
     hooks:


=====================================
CLAUDE.md
=====================================
@@ -0,0 +1,151 @@
+# CLAUDE.md
+
+This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
+
+## Project Overview
+
+**flox** is a Python library providing fast GroupBy reduction operations for `dask.array`. It implements parallel-friendly GroupBy reductions using the MapReduce paradigm and integrates with xarray for labeled multidimensional arrays.
+
+## Development Commands
+
+### Environment Setup
+
+```bash
+# Create and activate development environment
+mamba env create -f ci/environment.yml
+conda activate flox-tests
+python -m pip install --no-deps -e .
+```
+
+### Testing
+
+```bash
+# Run full test suite (as used in CI)
+pytest --durations=20 --durations-min=0.5 -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
+
+# Run tests without coverage
+pytest -n auto
+
+# Run single test file
+pytest tests/test_core.py
+
+# Run specific test
+pytest tests/test_core.py::test_function_name
+```
+
+### Code Quality
+
+```bash
+# Run all pre-commit hooks
+pre-commit run --all-files
+
+# Format code with ruff
+ruff format .
+
+# Lint and fix with ruff
+ruff check --fix .
+
+# Type checking
+mypy flox/
+
+# Spell checking
+codespell
+```
+
+### Benchmarking
+
+```bash
+# Performance benchmarking (from asv_bench/ directory)
+cd asv_bench
+asv run
+asv publish
+asv preview
+```
+
+## CI Configuration
+
+### GitHub Workflows (`.github/workflows/`)
+
+- **`ci.yaml`** - Main CI pipeline with test matrix across Python versions (3.11, 3.13) and operating systems (Ubuntu, Windows)
+- **`ci-additional.yaml`** - Additional CI jobs including doctests and mypy type checking
+- **`upstream-dev-ci.yaml`** - Tests against development versions of upstream dependencies
+- **`pypi.yaml`** - PyPI publishing workflow
+- **`testpypi-release.yaml`** - Test PyPI release workflow
+- **`benchmarks.yml`** - Performance benchmarking workflow
+
+### Environment Files (`ci/`)
+
+- **`environment.yml`** - Main test environment with all dependencies
+- **`minimal-requirements.yml`** - Minimal requirements testing (pandas==1.5, numpy==1.22, etc.)
+- **`no-dask.yml`** - Testing without dask dependency
+- **`no-numba.yml`** - Testing without numba dependency
+- **`no-xarray.yml`** - Testing without xarray dependency
+- **`env-numpy1.yml`** - Testing with numpy\<2 constraint
+- **`docs.yml`** - Documentation building environment
+- **`upstream-dev-env.yml`** - Development versions of dependencies
+- **`benchmark.yml`** - Benchmarking environment
+
+### ReadTheDocs Configuration
+
+- **`.readthedocs.yml`** - ReadTheDocs configuration using `ci/docs.yml` environment
+
+## Code Architecture
+
+### Core Modules (`flox/`)
+
+- **`core.py`** - Main reduction logic, central orchestrator of groupby operations
+- **`aggregations.py`** - Defines the `Aggregation` class and built-in aggregation operations
+- **`xarray.py`** - Primary integration with xarray, provides `xarray_reduce()` API
+- **`dask_array_ops.py`** - Dask-specific array operations and optimizations
+
+### Aggregation Backends (`flox/aggregate_*.py`)
+
+- **`aggregate_flox.py`** - Native flox implementation
+- **`aggregate_npg.py`** - numpy-groupies backend
+- **`aggregate_numbagg.py`** - numbagg backend for JIT-compiled operations
+- **`aggregate_sparse.py`** - Support for sparse arrays
+
+### Utilities
+
+- **`cache.py`** - Caching mechanisms for performance
+- **`visualize.py`** - Tools for visualizing groupby operations
+- **`lib.py`** - General utility functions
+- **`xrutils.py`** & **`xrdtypes.py`** - xarray-specific utilities and types
+
+### Main APIs
+
+- `flox.groupby_reduce()` - Pure dask array interface
+- `flox.xarray.xarray_reduce()` - Pure xarray interface
+
+## Key Design Patterns
+
+**Engine Selection**: The library supports multiple computation backends ("flox", "numpy", "numbagg") that can be chosen based on data characteristics and performance requirements.
+
+**MapReduce Strategy**: Implements groupby reductions using a two-stage approach (blockwise + tree reduction) to avoid expensive sort/shuffle operations in parallel computing.
+
+**Chunking Intelligence**: Automatically rechunks data to optimize groupby operations, particularly important for the current `auto-blockwise-rechunk` branch.
+
+**Integration Testing**: Extensive testing against xarray's groupby functionality to ensure compatibility with the broader scientific Python ecosystem.
+
+## Testing Configuration
+
+- **Framework**: pytest with coverage, parallel execution (pytest-xdist), and property-based testing (hypothesis)
+- **Coverage Target**: 95%
+- **Test Environments**: Multiple conda environments test optional dependencies (no-dask, no-numba, no-xarray)
+- **CI Matrices**: Tests across Python 3.11-3.13, Ubuntu/Windows, multiple dependency configurations
+
+## Dependencies
+
+**Core**: pandas>=1.5, numpy>=1.22, numpy_groupies>=0.9.19, scipy>=1.9, toolz, packaging>=21.3
+
+**Optional**: cachey, dask, numba, numbagg, xarray (enable with `pip install flox[all]`)
+
+## Development Notes
+
+- Uses `setuptools_scm` for automatic versioning from git tags
+- Heavy emphasis on performance with ASV benchmarking infrastructure
+- Type hints throughout with mypy checking
+- Pre-commit hooks enforce code quality (ruff, prettier, codespell)
+- Integration testing with xarray upstream development branch
+- **Python Support**: Minimum version 3.11 (updated from 3.10)
+- **Git Worktrees**: `worktrees/` directory is ignored for development workflows


=====================================
ci/docs.yml
=====================================
@@ -7,7 +7,7 @@ dependencies:
   - dask-core
   - pip
   - xarray
-  - numpy>=1.22
+  - numpy>=1.26
   - scipy
   - numpydoc
   - numpy_groupies>=0.9.19
@@ -22,5 +22,6 @@ dependencies:
   - ipykernel
   - jupyter
   - sphinx-codeautolink
+  - sphinx-copybutton
   - pip:
       - -e ..


=====================================
ci/environment.yml
=====================================
@@ -9,7 +9,7 @@ dependencies:
   - cubed>=0.20.0
   - dask-core
   - pandas
-  - numpy>=1.22
+  - numpy>=1.26
   - scipy
   - sparse
   - lxml # for mypy coverage report


=====================================
ci/minimal-requirements.yml
=====================================
@@ -10,9 +10,9 @@ dependencies:
   - pytest-pretty
   - pytest-xdist
   - syrupy
-  - numpy==1.22
-  - scipy==1.9.0
+  - numpy==1.26
+  - scipy==1.12
   - numpy_groupies==0.9.19
-  - pandas==1.5
+  - pandas==2.1
   - pooch
   - toolz


=====================================
ci/no-dask.yml
=====================================
@@ -6,7 +6,7 @@ dependencies:
   - pandas
   - hypothesis
   - cftime
-  - numpy>=1.22
+  - numpy>=1.26
   - scipy
   - sparse
   - pip


=====================================
ci/no-numba.yml
=====================================
@@ -9,7 +9,7 @@ dependencies:
   - dask-core
   - hypothesis
   - pandas
-  - numpy>=1.22
+  - numpy>=1.26
   - scipy
   - sparse
   - lxml # for mypy coverage report


=====================================
ci/no-xarray.yml
=====================================
@@ -5,7 +5,7 @@ dependencies:
   - codecov
   - syrupy
   - pandas
-  - numpy>=1.22
+  - numpy>=1.26
   - scipy
   - sparse
   - pip


=====================================
docs/source/conf.py
=====================================
@@ -41,6 +41,7 @@ extensions = [
     "myst_nb",
     "sphinx_codeautolink",
     "sphinx_remove_toctrees",
+    "sphinx_copybutton",
 ]
 
 codeautolink_concat_default = True


=====================================
docs/source/xarray.md
=====================================
@@ -11,22 +11,4 @@ ds.groupby_bins("lon", bins=[0, 10, 20]).mean(method="map-reduce")
 ds.resample(time="M").mean(method="blockwise")
 ```
 
-Xarray's GroupBy operations are currently limited:
-
-1. One can only group by a single variable.
-1. When grouping by a dask array, that array will be computed to discover the unique group labels, and their locations
-
-These limitations can be avoided by using {py:func}`flox.xarray.xarray_reduce` which allows grouping by multiple variables, lazy grouping by dask variables,
-as well as an arbitrary combination of categorical grouping and binning. For example,
-
-```python
-flox.xarray.xarray_reduce(
-    ds,
-    ds.time.dt.month,
-    ds.lon,
-    func="mean",
-    expected_groups=[None, [0, 10, 20]],
-    isbin=[False, True],
-    method="map-reduce",
-)
-```
+{py:func}`flox.xarray.xarray_reduce` used to provide extra functionality, but now Xarray's GroupBy object has been upgraded to match those capabilities with better API!


=====================================
flox/__init__.py
=====================================
@@ -3,7 +3,7 @@
 """Top-level module for flox ."""
 
 from . import cache
-from .aggregations import Aggregation, Scan  # noqa
+from .aggregations import Aggregation, Scan, is_supported_aggregation
 from .core import (
     groupby_reduce,
     groupby_scan,
@@ -11,7 +11,8 @@ from .core import (
     rechunk_for_cohorts,
     ReindexStrategy,
     ReindexArrayType,
-)  # noqa
+)
+from .options import set_options
 
 
 def _get_version():
@@ -24,3 +25,16 @@ def _get_version():
 
 
 __version__ = _get_version()
+
+__all__ = [
+    "Aggregation",
+    "Scan",
+    "groupby_reduce",
+    "groupby_scan",
+    "rechunk_for_blockwise",
+    "rechunk_for_cohorts",
+    "set_options",
+    "ReindexStrategy",
+    "ReindexArrayType",
+    "is_supported_aggregation",
+]


=====================================
flox/aggregate_flox.py
=====================================
@@ -276,3 +276,39 @@ def ffill(group_idx, array, *, axis, **kwargs):
 
     invert_perm = slice(None) if isinstance(perm, slice) else np.argsort(perm, kind="stable")
     return array[tuple(slc)][..., invert_perm]
+
+
+def _np_grouped_scan(group_idx, array, *, axis: int, skipna: bool, **kwargs):
+    handle_nans = not skipna and array.dtype.kind in "cfO"
+
+    group_idx, array, perm = _prepare_for_flox(group_idx, array)
+    ndim = array.ndim
+    assert axis == (ndim - 1), (axis, ndim - 1)
+
+    flag = np.concatenate((np.asarray([True], like=group_idx), group_idx[1:] != group_idx[:-1]))
+    (inv_idx,) = flag.nonzero()
+    segment_lengths = np.add.reduceat(np.ones(group_idx.shape), inv_idx, dtype=np.int64)
+
+    # TODO: set dtype to float properly for handle_nans?
+    accum = np.nancumsum(array, axis=axis)
+
+    if len(inv_idx) > 1:
+        first_group_idx = inv_idx[1]
+        # extract cumulative sum _before_ start of group
+        prev_group_cumsum = accum[..., inv_idx[1:] - 1]
+        accum[..., first_group_idx:] -= np.repeat(prev_group_cumsum, segment_lengths[1:], axis=axis)
+
+    if handle_nans:
+        mask = isnull(array)
+        accummask = np.cumsum(mask, axis=-1, dtype=np.uint64)
+        if len(inv_idx) > 1:
+            prev_group_cumsum = accummask[..., inv_idx[1:] - 1]
+            accummask[..., first_group_idx:] -= np.repeat(prev_group_cumsum, segment_lengths[1:], axis=axis)
+        accum[accummask > 0] = np.nan
+
+    invert_perm = slice(None) if isinstance(perm, slice) else np.argsort(perm, kind="stable")
+    return accum[..., invert_perm]
+
+
+cumsum = partial(_np_grouped_scan, skipna=False)
+nancumsum = partial(_np_grouped_scan, skipna=True)


=====================================
flox/aggregate_sparse.py
=====================================
@@ -1,7 +1,9 @@
 # Unlike the other aggregate_* submodules, this one simply defines a wrapper function
 # because we run the groupby on the underlying dense data.
 
+from collections.abc import Callable
 from functools import partial
+from typing import Any, TypeAlias
 
 import numpy as np
 import sparse
@@ -18,10 +20,12 @@ def nanadd(a, b):
 
     From https://stackoverflow.com/a/50642947/1707127
     """
-    return np.where(np.isnan(a + b), np.where(np.isnan(a), b, a), a + b)
+    ab = a + b
+    return np.where(np.isnan(ab), np.where(np.isnan(a), b, a), ab)
 
 
-BINARY_OPS = {
+CallableMap: TypeAlias = dict[str, Callable[[np.ndarray, np.ndarray], np.ndarray]]
+BINARY_OPS: CallableMap = {
     "sum": np.add,
     "nansum": nanadd,
     "max": np.maximum,
@@ -29,8 +33,8 @@ BINARY_OPS = {
     "min": np.minimum,
     "nanmin": np.fmin,
 }
-HYPER_OPS = {"sum": np.multiply, "nansum": np.multiply}
-IDENTITY = {
+HYPER_OPS: CallableMap = {"sum": np.multiply, "nansum": np.multiply}
+IDENTITY: dict[str, Any] = {
     "sum": 0,
     "nansum": 0,
     "prod": 1,


=====================================
flox/aggregations.py
=====================================
@@ -14,7 +14,7 @@ from numpy.typing import ArrayLike, DTypeLike
 
 from . import aggregate_flox, aggregate_npg, xrutils
 from . import xrdtypes as dtypes
-from .lib import sparse_array_type
+from .lib import dask_array_type, sparse_array_type
 
 if TYPE_CHECKING:
     FuncTuple = tuple[Callable | str, ...]
@@ -631,7 +631,7 @@ class AlignedArrays:
         reduced = chunk_reduce(
             self.array,
             self.group_idx,
-            func=("nanlast",),
+            func=("last",),
             axis=-1,
             # TODO: automate?
             engine="flox",
@@ -699,6 +699,7 @@ def scan_binary_op(left_state: ScanState, right_state: ScanState, *, agg: Scan)
             fill_value=agg.identity,
         )
         result = AlignedArrays(array=final_value[..., left.group_idx.size :], group_idx=right.group_idx)
+
     else:
         raise ValueError(f"Unknown binary op application mode: {agg.mode!r}")
 
@@ -717,8 +718,7 @@ def scan_binary_op(left_state: ScanState, right_state: ScanState, *, agg: Scan)
     )
 
 
-# TODO: numpy_groupies cumsum is a broken when NaNs are present.
-# cumsum = Scan("cumsum", binary_op=np.add, reduction="sum", scan="cumsum", identity=0)
+cumsum = Scan("cumsum", binary_op=np.add, reduction="sum", scan="cumsum", identity=0)
 nancumsum = Scan("nancumsum", binary_op=np.add, reduction="nansum", scan="nancumsum", identity=0)
 # ffill uses the identity for scan, and then at the binary-op state,
 # we concatenate the blockwise-reduced values with the original block,
@@ -782,7 +782,7 @@ AGGREGATIONS: dict[str, Aggregation | Scan] = {
     "nanquantile": nanquantile,
     "mode": mode,
     "nanmode": nanmode,
-    # "cumsum": cumsum,
+    "cumsum": cumsum,
     "nancumsum": nancumsum,
     "ffill": ffill,
     "bfill": bfill,
@@ -895,3 +895,20 @@ def _initialize_aggregation(
     agg.simple_combine = tuple(simple_combine)
 
     return agg
+
+
+def is_supported_aggregation(array, func: str) -> bool:
+    if isinstance(array, dask_array_type):
+        array = array._meta
+
+    if isinstance(array, sparse_array_type):
+        from flox.core import _is_sparse_supported_reduction
+
+        return _is_sparse_supported_reduction(func)
+
+    module, *_ = type(array).__module__.split(".")
+
+    if module in ["numpy", "cubed"]:
+        return func in AGGREGATIONS
+    else:
+        return False


=====================================
flox/core.py
=====================================
@@ -6,7 +6,6 @@ import itertools
 import logging
 import math
 import operator
-import sys
 import warnings
 from collections import namedtuple
 from collections.abc import Callable, Sequence
@@ -49,6 +48,7 @@ from .aggregations import (
 )
 from .cache import memoize
 from .lib import ArrayLayer, dask_array_type, sparse_array_type
+from .options import OPTIONS
 from .xrutils import (
     _contains_cftime_datetimes,
     _to_pytimedelta,
@@ -71,13 +71,6 @@ HAS_NUMBAGG = module_available("numbagg", minversion="0.3.0")
 HAS_SPARSE = module_available("sparse")
 
 if TYPE_CHECKING:
-    try:
-        if sys.version_info < (3, 11):
-            from typing_extensions import Unpack
-        else:
-            from typing import Unpack
-    except (ModuleNotFoundError, ImportError):
-        Unpack: Any  # type: ignore[no-redef]
     from .types import CubedArray, DaskArray, Graph
 
     T_DuckArray: TypeAlias = np.ndarray | DaskArray | CubedArray  # Any ?
@@ -119,6 +112,7 @@ FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask")
 # _simple_combine.
 DUMMY_AXIS = -2
 
+
 logger = logging.getLogger("flox")
 
 
@@ -215,7 +209,10 @@ def _postprocess_numbagg(result, *, func, fill_value, size, seen_groups):
     if needs_masking:
         mask = np.isin(groups, seen_groups, assume_unique=True, invert=True)
         if mask.any():
-            result[..., groups[mask]] = fill_value
+            if isinstance(result, sparse_array_type):
+                result.fill_value = fill_value
+            else:
+                result[..., groups[mask]] = fill_value
     return result
 
 
@@ -223,8 +220,11 @@ def identity(x: T) -> T:
     return x
 
 
-def _issorted(arr: np.ndarray) -> bool:
-    return bool((arr[:-1] <= arr[1:]).all())
+def _issorted(arr: np.ndarray, ascending=True) -> bool:
+    if ascending:
+        return bool((arr[:-1] <= arr[1:]).all())
+    else:
+        return bool((arr[:-1] >= arr[1:]).all())
 
 
 def _is_arg_reduction(func: T_Agg) -> bool:
@@ -307,7 +307,7 @@ def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray:
 def _get_optimal_chunks_for_groups(chunks, labels):
     chunkidx = np.cumsum(chunks) - 1
     # what are the groups at chunk boundaries
-    labels_at_chunk_bounds = _unique(labels[chunkidx])
+    labels_at_chunk_bounds = pd.unique(labels[chunkidx])
     # what's the last index of all groups
     last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last")
     # what's the last index of groups at the chunk boundaries.
@@ -325,6 +325,8 @@ def _get_optimal_chunks_for_groups(chunks, labels):
         Δl = abs(c - l)
         if c == 0 or newchunkidx[-1] > l:
             continue
+        f = f.item()  # noqa
+        l = l.item()  # noqa
         if Δf < Δl and f > newchunkidx[-1]:
             newchunkidx.append(f)
         else:
@@ -716,7 +718,9 @@ def rechunk_for_cohorts(
         return array.rechunk({axis: newchunks})
 
 
-def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray:
+def rechunk_for_blockwise(
+    array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True
+) -> tuple[T_MethodOpt, DaskArray]:
     """
     Rechunks array so that group boundaries line up with chunk boundaries, allowing
     embarrassingly parallel group reductions.
@@ -739,14 +743,47 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) ->
     DaskArray
         Rechunked array
     """
-    # TODO: this should be unnecessary?
-    labels = factorize_((labels,), axes=())[0]
+
     chunks = array.chunks[axis]
-    newchunks = _get_optimal_chunks_for_groups(chunks, labels)
+    if len(chunks) == 1:
+        return "blockwise", array
+
+    # import dask
+    # from dask.utils import parse_bytes
+    # factor = parse_bytes(dask.config.get("array.chunk-size")) / (
+    #     math.prod(array.chunksize) * array.dtype.itemsize
+    # )
+    # if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR:
+    #     new_constant_chunks = math.ceil(factor) * max(chunks)
+    #     q, r = divmod(array.shape[axis], new_constant_chunks)
+    #     new_input_chunks = (new_constant_chunks,) * q + (r,)
+    # else:
+    new_input_chunks = chunks
+
+    # FIXME: this should be unnecessary?
+    labels = factorize_((labels,), axes=())[0]
+    newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels)
     if newchunks == chunks:
-        return array
+        return "blockwise", array
+
+    Δn = abs(len(newchunks) - len(new_input_chunks))
+    if pass_num_chunks_threshold := (
+        Δn / len(new_input_chunks) < OPTIONS["rechunk_blockwise_num_chunks_threshold"]
+    ):
+        logger.debug("blockwise rechunk passes num chunks threshold")
+    if pass_chunk_size_threshold := (
+        # we just pick the max because number of chunks may have changed.
+        (abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks))
+        < OPTIONS["rechunk_blockwise_chunk_size_threshold"]
+    ):
+        logger.debug("blockwise rechunk passes chunk size change threshold")
+
+    if force or (pass_num_chunks_threshold and pass_chunk_size_threshold):
+        logger.debug("Rechunking to enable blockwise.")
+        return "blockwise", array.rechunk({axis: newchunks})
     else:
-        return array.rechunk({axis: newchunks})
+        logger.debug("Didn't meet thresholds to do automatic rechunking for blockwise reductions.")
+        return None, array
 
 
 def reindex_numpy(array, from_: pd.Index, to: pd.Index, fill_value, dtype, axis: int):
@@ -2500,7 +2537,7 @@ def groupby_reduce(
     engine: T_EngineOpt = None,
     reindex: ReindexStrategy | bool | None = None,
     finalize_kwargs: dict[Any, Any] | None = None,
-) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]:
+) -> tuple[DaskArray, *tuple[np.ndarray | DaskArray, ...]]:
     """
     GroupBy reductions using tree reductions for dask.array
 
@@ -2712,6 +2749,11 @@ def groupby_reduce(
     has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
     has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_)
 
+    if method is None and is_duck_dask_array(array) and not any_by_dask and by_.ndim == 1 and _issorted(by_):
+        # Let's try rechunking for sorted 1D by.
+        (single_axis,) = axis_
+        method, array = rechunk_for_blockwise(array, single_axis, by_, force=False)
+
     is_first_last = _is_first_last_reduction(func)
     if is_first_last:
         if has_dask and nax != 1:
@@ -2899,7 +2941,7 @@ def groupby_reduce(
 
         # if preferred method is already blockwise, no need to rechunk
         if preferred_method != "blockwise" and method == "blockwise" and by_.ndim == 1:
-            array = rechunk_for_blockwise(array, axis=-1, labels=by_)
+            _, array = rechunk_for_blockwise(array, axis=-1, labels=by_)
 
         result, groups = partial_agg(
             array=array,


=====================================
flox/options.py
=====================================
@@ -0,0 +1,64 @@
+"""
+Started from xarray options.py; vendored from cf-xarray
+"""
+
+import copy
+from collections.abc import MutableMapping
+from typing import Any
+
+OPTIONS: MutableMapping[str, Any] = {
+    # Thresholds below which we will automatically rechunk to blockwise if it makes sense
+    # 1. Fractional change in number of chunks after rechunking
+    "rechunk_blockwise_num_chunks_threshold": 0.25,
+    # 2. Fractional change in max chunk size after rechunking
+    "rechunk_blockwise_chunk_size_threshold": 1.5,
+    # 3. If input arrays have chunk size smaller than `dask.array.chunk-size`,
+    #    then adjust chunks to meet that size first.
+    # "rechunk.blockwise.chunk_size_factor": 1.5,
+}
+
+
+class set_options:  # numpydoc ignore=PR01,PR02
+    """
+    Set options for cf-xarray in a controlled context.
+
+    Parameters
+    ----------
+    rechunk_blockwise_num_chunks_threshold : float
+        Rechunk if fractional change in number of chunks after rechunking
+        is less than this amount.
+    rechunk_blockwise_chunk_size_threshold: float
+        Rechunk if fractional change in max chunk size after rechunking
+        is less than this threshold.
+
+    Examples
+    --------
+
+    You can use ``set_options`` either as a context manager:
+
+    >>> import flox
+    >>> with flox.set_options(rechunk_blockwise_num_chunks_threshold=1):
+    ...     pass
+
+    Or to set global options:
+
+    >>> flox.set_options(rechunk_blockwise_num_chunks_threshold=1):
+    """
+
+    def __init__(self, **kwargs):
+        self.old = {}
+        for k in kwargs:
+            if k not in OPTIONS:
+                raise ValueError(f"argument name {k!r} is not in the set of valid options {set(OPTIONS)!r}")
+            self.old[k] = OPTIONS[k]
+        self._apply_update(kwargs)
+
+    def _apply_update(self, options_dict):
+        options_dict = copy.deepcopy(options_dict)
+        OPTIONS.update(options_dict)
+
+    def __enter__(self):
+        return
+
+    def __exit__(self, type, value, traceback):
+        self._apply_update(self.old)


=====================================
flox/xarray.py
=====================================
@@ -1,10 +1,11 @@
 from __future__ import annotations
 
 from collections.abc import Hashable, Iterable, Sequence
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
 
 import numpy as np
 import pandas as pd
+import toolz
 import xarray as xr
 from packaging.version import Version
 
@@ -249,7 +250,7 @@ def xarray_reduce(
                 grouper_dims.append(d)
 
     if isinstance(obj, xr.Dataset):
-        ds = obj
+        ds = cast(xr.Dataset, obj)
     else:
         ds = obj._to_temp_dataset()
 
@@ -295,7 +296,7 @@ def xarray_reduce(
         not set(grouper_dims).issubset(set(variable.dims)) for variable in ds.data_vars.values()
     )
     if needs_broadcast:
-        ds_broad = xr.broadcast(ds, *by_da, exclude=exclude_dims)[0]
+        ds_broad = cast(xr.Dataset, xr.broadcast(ds, *by_da, exclude=exclude_dims)[0])
     else:
         ds_broad = ds
 
@@ -589,7 +590,7 @@ def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_Data
     DataArray or Dataset
         Xarray object with rechunked arrays.
     """
-    return _rechunk(rechunk_array_for_blockwise, obj, dim, labels)
+    return _rechunk(toolz.compose(toolz.last, rechunk_array_for_blockwise), obj, dim, labels)
 
 
 def _rechunk(func, obj, dim, labels, **kwargs):


=====================================
flox/xrdtypes.py
=====================================
@@ -196,14 +196,14 @@ def _get_fill_value(dtype, fill_value):
     if fill_value == NA:
         if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
             return np.nan
-        # This is madness, but npg checks that fill_value is compatible
-        # with array dtype even if the fill_value is never used.
-        elif np.issubdtype(dtype, np.integer):
-            return get_neg_infinity(dtype, min_for_int=True)
         elif np.issubdtype(dtype, np.timedelta64):
             return np.timedelta64("NaT")
         elif np.issubdtype(dtype, np.datetime64):
             return np.datetime64("NaT")
+        # This is madness, but npg checks that fill_value is compatible
+        # with array dtype even if the fill_value is never used.
+        elif np.issubdtype(dtype, np.integer):
+            return get_neg_infinity(dtype, min_for_int=True)
         else:
             return None
     return fill_value


=====================================
flox/xrutils.py
=====================================
@@ -4,6 +4,7 @@
 import datetime
 import importlib
 from collections.abc import Iterable
+from types import ModuleType
 from typing import Any
 
 import numpy as np
@@ -45,13 +46,20 @@ try:
 except ImportError:
     cftime = None
 
+cubed: ModuleType | None
+try:
+    import cubed  # type: ignore[no-redef]
+except ImportError:
+    cubed = None
 
+dask: ModuleType | None
 try:
     import dask.array
 
-    dask_array_type = dask.array.Array
+    dask_array_type = dask.array.Array  # type: ignore[union-attr]
 except ImportError:
-    dask_array_type = ()  # type: ignore[assignment, misc]
+    dask = None
+    dask_array_type = ()
 
 
 def asarray(data, xp=np):
@@ -79,13 +87,9 @@ def is_chunked_array(x) -> bool:
 
 
 def is_dask_collection(x):
-    try:
-        import dask
-
-        return dask.is_dask_collection(x)
-
-    except ImportError:
+    if dask is None:
         return False
+    return dask.is_dask_collection(x)
 
 
 def is_duck_dask_array(x):
@@ -93,12 +97,9 @@ def is_duck_dask_array(x):
 
 
 def is_duck_cubed_array(x):
-    try:
-        import cubed
-
-        return is_duck_array(x) and isinstance(x, cubed.Array)
-    except ImportError:
+    if cubed is None:
         return False
+    return is_duck_array(x) and isinstance(x, cubed.Array)
 
 
 class ReprObject:
@@ -140,7 +141,7 @@ def is_scalar(value: Any, include_0d: bool = True) -> bool:
         or isinstance(value, str | bytes | dict)
         or not (
             isinstance(value, (Iterable,) + NON_NUMPY_SUPPORTED_ARRAY_TYPES)
-            or hasattr(value, "__array_function__")
+            or hasattr(value, "__array_function__")  # type: ignore[unreachable]
         )
     )
 
@@ -182,13 +183,13 @@ def isnull(data: Any):
     else:
         # at this point, array should have dtype=object
         if isinstance(data, (np.ndarray, dask_array_type)):  # noqa
-            return pd.isnull(data)  # type: ignore[arg-type]
+            return pd.isnull(data)
         else:
             # Not reachable yet, but intended for use with other duck array
             # types. For full consistency with pandas, we should accept None as
             # a null value as well as NaN, but it isn't clear how to do this
             # with duck typing.
-            return data != data
+            return data != data  # type: ignore[unreachable]
 
 
 def datetime_to_numeric(array, offset=None, datetime_unit=None, dtype=float):


=====================================
pyproject.toml
=====================================
@@ -3,7 +3,7 @@ name = "flox"
 description = "GroupBy operations for dask.array"
 license = {file = "LICENSE"}
 readme = "README.md"
-requires-python = ">=3.10"
+requires-python = ">=3.11"
 keywords = ["xarray", "dask", "groupby"]
 classifiers = [
     "Development Status :: 4 - Beta",
@@ -11,18 +11,17 @@ classifiers = [
     "Natural Language :: English",
     "Operating System :: OS Independent",
     "Programming Language :: Python",
-    "Programming Language :: Python :: 3.10",
     "Programming Language :: Python :: 3.11",
     "Programming Language :: Python :: 3.12",
     "Programming Language :: Python :: 3.13",
 ]
 dependencies = [
-    "pandas>=1.5",
+    "pandas>=2.1",
     "packaging>=21.3",
-    "numpy>=1.22",
+    "numpy>=1.26",
     "numpy_groupies>=0.9.19",
     "toolz",
-    "scipy>=1.9",
+    "scipy>=1.12",
 ]
 dynamic=["version"]
 
@@ -39,10 +38,10 @@ test = ["netCDF4"]
 
 [build-system]
 requires = [
-    "pandas>=1.5",
-    "numpy>=1.22",
+    "pandas>=2.1",
+    "numpy>=1.26",
     "numpy_groupies>=0.9.19",
-    "scipy>=1.9",
+    "scipy>=1.12",
     "toolz",
     "setuptools>=61.0.0",
     "setuptools_scm[toml]>=7.0",
@@ -62,7 +61,7 @@ write_to_template= '__version__ = "{version}"'
 
 [tool.ruff]
 line-length = 110
-target-version = "py310"
+target-version = "py311"
 builtins = ["ellipsis"]
 exclude = [
     ".eggs",
@@ -90,6 +89,8 @@ select = [
     "I",
     # Pyupgrade
     "UP",
+    # flake8-tidy-imports
+    "TID",
 ]
 
 [tool.ruff.lint.isort]


=====================================
tests/test_core.py
=====================================
@@ -14,8 +14,8 @@ import pytest
 from numpy_groupies.aggregate_numpy import aggregate
 
 import flox
+from flox import set_options, xrutils
 from flox import xrdtypes as dtypes
-from flox import xrutils
 from flox.aggregations import Aggregation, _initialize_aggregation
 from flox.core import (
     HAS_NUMBAGG,
@@ -31,6 +31,7 @@ from flox.core import (
     find_group_cohorts,
     groupby_reduce,
     groupby_scan,
+    rechunk_for_blockwise,
     rechunk_for_cohorts,
     reindex_,
     subset_to_blocks,
@@ -979,26 +980,39 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None:
     assert_equal(actual, expected)
 
 
+ at requires_dask
 @pytest.mark.parametrize(
-    "inchunks, expected",
+    "inchunks, expected, expected_method",
     [
-        [(1,) * 10, (3, 2, 2, 3)],
-        [(2,) * 5, (3, 2, 2, 3)],
-        [(3, 3, 3, 1), (3, 2, 5)],
-        [(3, 1, 1, 2, 1, 1, 1), (3, 2, 2, 3)],
-        [(3, 2, 2, 3), (3, 2, 2, 3)],
-        [(4, 4, 2), (3, 4, 3)],
-        [(5, 5), (5, 5)],
-        [(6, 4), (5, 5)],
-        [(7, 3), (7, 3)],
-        [(8, 2), (7, 3)],
-        [(9, 1), (10,)],
-        [(10,), (10,)],
+        [(1,) * 10, (3, 2, 2, 3), None],
+        [(2,) * 5, (3, 2, 2, 3), None],
+        [(3, 3, 3, 1), (3, 2, 5), None],
+        [(3, 1, 1, 2, 1, 1, 1), (3, 2, 2, 3), None],
+        [(3, 2, 2, 3), (3, 2, 2, 3), "blockwise"],
+        [(4, 4, 2), (3, 4, 3), None],
+        [(5, 5), (5, 5), "blockwise"],
+        [(6, 4), (5, 5), None],
+        [(7, 3), (7, 3), "blockwise"],
+        [(8, 2), (7, 3), None],
+        [(9, 1), (10,), None],
+        [(10,), (10,), "blockwise"],
     ],
 )
-def test_rechunk_for_blockwise(inchunks, expected):
+def test_rechunk_for_blockwise(inchunks, expected, expected_method):
     labels = np.array([1, 1, 1, 2, 2, 3, 3, 5, 5, 5])
     assert _get_optimal_chunks_for_groups(inchunks, labels) == expected
+    # reversed
+    assert _get_optimal_chunks_for_groups(inchunks, labels[::-1]) == expected
+
+    with set_options(rechunk_blockwise_chunk_size_threshold=-1):
+        array = dask.array.ones(labels.size, chunks=(inchunks,))
+        method, array = rechunk_for_blockwise(array, -1, labels, force=False)
+        assert method == expected_method
+        assert array.chunks == (inchunks,)
+
+        method, array = rechunk_for_blockwise(array, -1, labels[::-1], force=False)
+        assert method == expected_method
+        assert array.chunks == (inchunks,)
 
 
 @requires_dask
@@ -1974,18 +1988,56 @@ def test_nanlen_string(dtype, engine) -> None:
     assert_equal(expected, actual)
 
 
-def test_cumsum() -> None:
-    array = np.array([1, 1, 1], dtype=np.uint64)
+ at pytest.mark.parametrize(
+    "array",
+    [
+        np.array([1, 1, 1, 2, 3, 4, 5], dtype=np.uint64),
+        np.array([1, 1, 1, 2, np.nan, 4, 5], dtype=np.float64),
+    ],
+)
+ at pytest.mark.parametrize("func", ["cumsum", "nancumsum"])
+def test_cumsum_simple(array, func) -> None:
     by = np.array([0] * array.shape[-1])
-    expected = np.nancumsum(array, axis=-1)
+    expected = getattr(np, func)(array, axis=-1)
+
+    actual = groupby_scan(array, by, func=func, axis=-1)
+    assert_equal(actual, expected)
+
+    if has_dask:
+        da = dask.array.from_array(array, chunks=2)
+        actual = groupby_scan(da, by, func=func, axis=-1)
+        assert_equal(actual, expected)
 
-    actual = groupby_scan(array, by, func="nancumsum", axis=-1)
-    assert_equal(expected, actual)
 
+def test_cumsum() -> None:
+    array = np.array(
+        [
+            [1, 2, np.nan, 4, 5],
+            [3, np.nan, 4, 6, 6],
+        ]
+    )
+    by = [0, 1, 1, 0, 1]
+
+    expected = np.array(
+        [
+            [1, 2, np.nan, 5, np.nan],
+            [3, np.nan, np.nan, 9, np.nan],
+        ]
+    )
+    actual = groupby_scan(array, by, func="cumsum", axis=-1)
+    assert_equal(actual, expected)
+    if has_dask:
+        da = dask.array.from_array(array, chunks=2)
+        actual = groupby_scan(da, by, func="cumsum", axis=-1)
+        assert_equal(actual, expected)
+
+    expected = np.array([[1, 2, 2, 5, 7], [3, 0, 4, 9, 10]], dtype=np.float64)
+    actual = groupby_scan(array, by, func="nancumsum", axis=-1)
+    assert_equal(actual, expected)
     if has_dask:
         da = dask.array.from_array(array, chunks=2)
         actual = groupby_scan(da, by, func="nancumsum", axis=-1)
-        assert_equal(expected, actual)
+        assert_equal(actual, expected)
 
 
 @pytest.mark.parametrize(


=====================================
tests/test_properties.py
=====================================
@@ -56,10 +56,11 @@ def bfill(array, axis, dtype=None):
 
 
 NUMPY_SCAN_FUNCS: dict[str, Callable] = {
+    "cumsum": np.cumsum,
     "nancumsum": np.nancumsum,
     "ffill": ffill,
     "bfill": bfill,
-}  # "cumsum": np.cumsum,
+}
 
 
 def not_overflowing_array(array: np.ndarray[Any, Any]) -> bool:
@@ -210,7 +211,7 @@ def test_groupby_reduce_numpy_vs_other(data, array, func: str) -> None:
     array=chunked_arrays(arrays=numeric_like_arrays),
     func=st.sampled_from(tuple(NUMPY_SCAN_FUNCS)),
 )
-def test_scans(data, array: dask.array.Array, func: str) -> None:
+def test_scans_against_numpy(data, array: dask.array.Array, func: str) -> None:
     if "cum" in func:
         assume(not_overflowing_array(np.asarray(array)))
 



View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/df73f5f722bb9dfba7054035d4089cf4e484f10c

-- 
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/df73f5f722bb9dfba7054035d4089cf4e484f10c
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20250818/b82fb896/attachment-0001.htm>


More information about the Pkg-grass-devel mailing list