[Git][debian-gis-team/flox][upstream] New upstream version 0.10.5
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Mon Aug 18 10:22:03 BST 2025
Antonio Valentino pushed to branch upstream at Debian GIS Project / flox
Commits:
df73f5f7 by Antonio Valentino at 2025-08-18T09:02:47+00:00
New upstream version 0.10.5
- - - - -
25 changed files:
- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .gitignore
- .pre-commit-config.yaml
- + CLAUDE.md
- ci/docs.yml
- ci/environment.yml
- ci/minimal-requirements.yml
- ci/no-dask.yml
- ci/no-numba.yml
- ci/no-xarray.yml
- docs/source/conf.py
- docs/source/xarray.md
- flox/__init__.py
- flox/aggregate_flox.py
- flox/aggregate_sparse.py
- flox/aggregations.py
- flox/core.py
- + flox/options.py
- flox/xarray.py
- flox/xrdtypes.py
- flox/xrutils.py
- pyproject.toml
- tests/test_core.py
- tests/test_properties.py
Changes:
=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -77,7 +77,7 @@ jobs:
--ignore flox/tests \
--cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v5.4.2
+ uses: codecov/codecov-action at v5.4.3
with:
file: ./coverage.xml
flags: unittests
@@ -132,7 +132,7 @@ jobs:
python -m mypy --install-types --non-interactive --cache-dir=.mypy_cache/ --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
- uses: codecov/codecov-action at v5.4.2
+ uses: codecov/codecov-action at v5.4.3
with:
file: mypy_report/cobertura.xml
flags: mypy
=====================================
.github/workflows/ci.yaml
=====================================
@@ -26,7 +26,7 @@ jobs:
matrix:
os: ["ubuntu-latest"]
env: ["environment"]
- python-version: ["3.10", "3.13"]
+ python-version: ["3.11", "3.13"]
include:
- os: "windows-latest"
env: "environment"
@@ -36,10 +36,10 @@ jobs:
python-version: "3.13"
- os: "ubuntu-latest"
env: "minimal-requirements"
- python-version: "3.10"
+ python-version: "3.11"
- os: "windows-latest"
env: "env-numpy1"
- python-version: "3.10"
+ python-version: "3.11"
steps:
- uses: actions/checkout at v4
with:
@@ -76,7 +76,7 @@ jobs:
python -c "import xarray; xarray.show_versions()"
pytest --durations=20 --durations-min=0.5 -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v5.4.2
+ uses: codecov/codecov-action at v5.4.3
with:
file: ./coverage.xml
flags: unittests
=====================================
.gitignore
=====================================
@@ -110,3 +110,6 @@ venv.bak/
.mypy_cache/
.DS_Store
+
+# Git worktrees
+worktrees/
=====================================
.pre-commit-config.yaml
=====================================
@@ -4,17 +4,12 @@ ci:
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
- rev: "v0.11.4"
+ rev: "v0.12.2"
hooks:
- id: ruff
args: ["--fix", "--show-fixes"]
- id: ruff-format
- - repo: https://github.com/pre-commit/mirrors-prettier
- rev: "v4.0.0-alpha.8"
- hooks:
- - id: prettier
-
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
=====================================
CLAUDE.md
=====================================
@@ -0,0 +1,151 @@
+# CLAUDE.md
+
+This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
+
+## Project Overview
+
+**flox** is a Python library providing fast GroupBy reduction operations for `dask.array`. It implements parallel-friendly GroupBy reductions using the MapReduce paradigm and integrates with xarray for labeled multidimensional arrays.
+
+## Development Commands
+
+### Environment Setup
+
+```bash
+# Create and activate development environment
+mamba env create -f ci/environment.yml
+conda activate flox-tests
+python -m pip install --no-deps -e .
+```
+
+### Testing
+
+```bash
+# Run full test suite (as used in CI)
+pytest --durations=20 --durations-min=0.5 -n auto --cov=./ --cov-report=xml --hypothesis-profile ci
+
+# Run tests without coverage
+pytest -n auto
+
+# Run single test file
+pytest tests/test_core.py
+
+# Run specific test
+pytest tests/test_core.py::test_function_name
+```
+
+### Code Quality
+
+```bash
+# Run all pre-commit hooks
+pre-commit run --all-files
+
+# Format code with ruff
+ruff format .
+
+# Lint and fix with ruff
+ruff check --fix .
+
+# Type checking
+mypy flox/
+
+# Spell checking
+codespell
+```
+
+### Benchmarking
+
+```bash
+# Performance benchmarking (from asv_bench/ directory)
+cd asv_bench
+asv run
+asv publish
+asv preview
+```
+
+## CI Configuration
+
+### GitHub Workflows (`.github/workflows/`)
+
+- **`ci.yaml`** - Main CI pipeline with test matrix across Python versions (3.11, 3.13) and operating systems (Ubuntu, Windows)
+- **`ci-additional.yaml`** - Additional CI jobs including doctests and mypy type checking
+- **`upstream-dev-ci.yaml`** - Tests against development versions of upstream dependencies
+- **`pypi.yaml`** - PyPI publishing workflow
+- **`testpypi-release.yaml`** - Test PyPI release workflow
+- **`benchmarks.yml`** - Performance benchmarking workflow
+
+### Environment Files (`ci/`)
+
+- **`environment.yml`** - Main test environment with all dependencies
+- **`minimal-requirements.yml`** - Minimal requirements testing (pandas==1.5, numpy==1.22, etc.)
+- **`no-dask.yml`** - Testing without dask dependency
+- **`no-numba.yml`** - Testing without numba dependency
+- **`no-xarray.yml`** - Testing without xarray dependency
+- **`env-numpy1.yml`** - Testing with numpy\<2 constraint
+- **`docs.yml`** - Documentation building environment
+- **`upstream-dev-env.yml`** - Development versions of dependencies
+- **`benchmark.yml`** - Benchmarking environment
+
+### ReadTheDocs Configuration
+
+- **`.readthedocs.yml`** - ReadTheDocs configuration using `ci/docs.yml` environment
+
+## Code Architecture
+
+### Core Modules (`flox/`)
+
+- **`core.py`** - Main reduction logic, central orchestrator of groupby operations
+- **`aggregations.py`** - Defines the `Aggregation` class and built-in aggregation operations
+- **`xarray.py`** - Primary integration with xarray, provides `xarray_reduce()` API
+- **`dask_array_ops.py`** - Dask-specific array operations and optimizations
+
+### Aggregation Backends (`flox/aggregate_*.py`)
+
+- **`aggregate_flox.py`** - Native flox implementation
+- **`aggregate_npg.py`** - numpy-groupies backend
+- **`aggregate_numbagg.py`** - numbagg backend for JIT-compiled operations
+- **`aggregate_sparse.py`** - Support for sparse arrays
+
+### Utilities
+
+- **`cache.py`** - Caching mechanisms for performance
+- **`visualize.py`** - Tools for visualizing groupby operations
+- **`lib.py`** - General utility functions
+- **`xrutils.py`** & **`xrdtypes.py`** - xarray-specific utilities and types
+
+### Main APIs
+
+- `flox.groupby_reduce()` - Pure dask array interface
+- `flox.xarray.xarray_reduce()` - Pure xarray interface
+
+## Key Design Patterns
+
+**Engine Selection**: The library supports multiple computation backends ("flox", "numpy", "numbagg") that can be chosen based on data characteristics and performance requirements.
+
+**MapReduce Strategy**: Implements groupby reductions using a two-stage approach (blockwise + tree reduction) to avoid expensive sort/shuffle operations in parallel computing.
+
+**Chunking Intelligence**: Automatically rechunks data to optimize groupby operations, particularly important for the current `auto-blockwise-rechunk` branch.
+
+**Integration Testing**: Extensive testing against xarray's groupby functionality to ensure compatibility with the broader scientific Python ecosystem.
+
+## Testing Configuration
+
+- **Framework**: pytest with coverage, parallel execution (pytest-xdist), and property-based testing (hypothesis)
+- **Coverage Target**: 95%
+- **Test Environments**: Multiple conda environments test optional dependencies (no-dask, no-numba, no-xarray)
+- **CI Matrices**: Tests across Python 3.11-3.13, Ubuntu/Windows, multiple dependency configurations
+
+## Dependencies
+
+**Core**: pandas>=1.5, numpy>=1.22, numpy_groupies>=0.9.19, scipy>=1.9, toolz, packaging>=21.3
+
+**Optional**: cachey, dask, numba, numbagg, xarray (enable with `pip install flox[all]`)
+
+## Development Notes
+
+- Uses `setuptools_scm` for automatic versioning from git tags
+- Heavy emphasis on performance with ASV benchmarking infrastructure
+- Type hints throughout with mypy checking
+- Pre-commit hooks enforce code quality (ruff, prettier, codespell)
+- Integration testing with xarray upstream development branch
+- **Python Support**: Minimum version 3.11 (updated from 3.10)
+- **Git Worktrees**: `worktrees/` directory is ignored for development workflows
=====================================
ci/docs.yml
=====================================
@@ -7,7 +7,7 @@ dependencies:
- dask-core
- pip
- xarray
- - numpy>=1.22
+ - numpy>=1.26
- scipy
- numpydoc
- numpy_groupies>=0.9.19
@@ -22,5 +22,6 @@ dependencies:
- ipykernel
- jupyter
- sphinx-codeautolink
+ - sphinx-copybutton
- pip:
- -e ..
=====================================
ci/environment.yml
=====================================
@@ -9,7 +9,7 @@ dependencies:
- cubed>=0.20.0
- dask-core
- pandas
- - numpy>=1.22
+ - numpy>=1.26
- scipy
- sparse
- lxml # for mypy coverage report
=====================================
ci/minimal-requirements.yml
=====================================
@@ -10,9 +10,9 @@ dependencies:
- pytest-pretty
- pytest-xdist
- syrupy
- - numpy==1.22
- - scipy==1.9.0
+ - numpy==1.26
+ - scipy==1.12
- numpy_groupies==0.9.19
- - pandas==1.5
+ - pandas==2.1
- pooch
- toolz
=====================================
ci/no-dask.yml
=====================================
@@ -6,7 +6,7 @@ dependencies:
- pandas
- hypothesis
- cftime
- - numpy>=1.22
+ - numpy>=1.26
- scipy
- sparse
- pip
=====================================
ci/no-numba.yml
=====================================
@@ -9,7 +9,7 @@ dependencies:
- dask-core
- hypothesis
- pandas
- - numpy>=1.22
+ - numpy>=1.26
- scipy
- sparse
- lxml # for mypy coverage report
=====================================
ci/no-xarray.yml
=====================================
@@ -5,7 +5,7 @@ dependencies:
- codecov
- syrupy
- pandas
- - numpy>=1.22
+ - numpy>=1.26
- scipy
- sparse
- pip
=====================================
docs/source/conf.py
=====================================
@@ -41,6 +41,7 @@ extensions = [
"myst_nb",
"sphinx_codeautolink",
"sphinx_remove_toctrees",
+ "sphinx_copybutton",
]
codeautolink_concat_default = True
=====================================
docs/source/xarray.md
=====================================
@@ -11,22 +11,4 @@ ds.groupby_bins("lon", bins=[0, 10, 20]).mean(method="map-reduce")
ds.resample(time="M").mean(method="blockwise")
```
-Xarray's GroupBy operations are currently limited:
-
-1. One can only group by a single variable.
-1. When grouping by a dask array, that array will be computed to discover the unique group labels, and their locations
-
-These limitations can be avoided by using {py:func}`flox.xarray.xarray_reduce` which allows grouping by multiple variables, lazy grouping by dask variables,
-as well as an arbitrary combination of categorical grouping and binning. For example,
-
-```python
-flox.xarray.xarray_reduce(
- ds,
- ds.time.dt.month,
- ds.lon,
- func="mean",
- expected_groups=[None, [0, 10, 20]],
- isbin=[False, True],
- method="map-reduce",
-)
-```
+{py:func}`flox.xarray.xarray_reduce` used to provide extra functionality, but now Xarray's GroupBy object has been upgraded to match those capabilities with better API!
=====================================
flox/__init__.py
=====================================
@@ -3,7 +3,7 @@
"""Top-level module for flox ."""
from . import cache
-from .aggregations import Aggregation, Scan # noqa
+from .aggregations import Aggregation, Scan, is_supported_aggregation
from .core import (
groupby_reduce,
groupby_scan,
@@ -11,7 +11,8 @@ from .core import (
rechunk_for_cohorts,
ReindexStrategy,
ReindexArrayType,
-) # noqa
+)
+from .options import set_options
def _get_version():
@@ -24,3 +25,16 @@ def _get_version():
__version__ = _get_version()
+
+__all__ = [
+ "Aggregation",
+ "Scan",
+ "groupby_reduce",
+ "groupby_scan",
+ "rechunk_for_blockwise",
+ "rechunk_for_cohorts",
+ "set_options",
+ "ReindexStrategy",
+ "ReindexArrayType",
+ "is_supported_aggregation",
+]
=====================================
flox/aggregate_flox.py
=====================================
@@ -276,3 +276,39 @@ def ffill(group_idx, array, *, axis, **kwargs):
invert_perm = slice(None) if isinstance(perm, slice) else np.argsort(perm, kind="stable")
return array[tuple(slc)][..., invert_perm]
+
+
+def _np_grouped_scan(group_idx, array, *, axis: int, skipna: bool, **kwargs):
+ handle_nans = not skipna and array.dtype.kind in "cfO"
+
+ group_idx, array, perm = _prepare_for_flox(group_idx, array)
+ ndim = array.ndim
+ assert axis == (ndim - 1), (axis, ndim - 1)
+
+ flag = np.concatenate((np.asarray([True], like=group_idx), group_idx[1:] != group_idx[:-1]))
+ (inv_idx,) = flag.nonzero()
+ segment_lengths = np.add.reduceat(np.ones(group_idx.shape), inv_idx, dtype=np.int64)
+
+ # TODO: set dtype to float properly for handle_nans?
+ accum = np.nancumsum(array, axis=axis)
+
+ if len(inv_idx) > 1:
+ first_group_idx = inv_idx[1]
+ # extract cumulative sum _before_ start of group
+ prev_group_cumsum = accum[..., inv_idx[1:] - 1]
+ accum[..., first_group_idx:] -= np.repeat(prev_group_cumsum, segment_lengths[1:], axis=axis)
+
+ if handle_nans:
+ mask = isnull(array)
+ accummask = np.cumsum(mask, axis=-1, dtype=np.uint64)
+ if len(inv_idx) > 1:
+ prev_group_cumsum = accummask[..., inv_idx[1:] - 1]
+ accummask[..., first_group_idx:] -= np.repeat(prev_group_cumsum, segment_lengths[1:], axis=axis)
+ accum[accummask > 0] = np.nan
+
+ invert_perm = slice(None) if isinstance(perm, slice) else np.argsort(perm, kind="stable")
+ return accum[..., invert_perm]
+
+
+cumsum = partial(_np_grouped_scan, skipna=False)
+nancumsum = partial(_np_grouped_scan, skipna=True)
=====================================
flox/aggregate_sparse.py
=====================================
@@ -1,7 +1,9 @@
# Unlike the other aggregate_* submodules, this one simply defines a wrapper function
# because we run the groupby on the underlying dense data.
+from collections.abc import Callable
from functools import partial
+from typing import Any, TypeAlias
import numpy as np
import sparse
@@ -18,10 +20,12 @@ def nanadd(a, b):
From https://stackoverflow.com/a/50642947/1707127
"""
- return np.where(np.isnan(a + b), np.where(np.isnan(a), b, a), a + b)
+ ab = a + b
+ return np.where(np.isnan(ab), np.where(np.isnan(a), b, a), ab)
-BINARY_OPS = {
+CallableMap: TypeAlias = dict[str, Callable[[np.ndarray, np.ndarray], np.ndarray]]
+BINARY_OPS: CallableMap = {
"sum": np.add,
"nansum": nanadd,
"max": np.maximum,
@@ -29,8 +33,8 @@ BINARY_OPS = {
"min": np.minimum,
"nanmin": np.fmin,
}
-HYPER_OPS = {"sum": np.multiply, "nansum": np.multiply}
-IDENTITY = {
+HYPER_OPS: CallableMap = {"sum": np.multiply, "nansum": np.multiply}
+IDENTITY: dict[str, Any] = {
"sum": 0,
"nansum": 0,
"prod": 1,
=====================================
flox/aggregations.py
=====================================
@@ -14,7 +14,7 @@ from numpy.typing import ArrayLike, DTypeLike
from . import aggregate_flox, aggregate_npg, xrutils
from . import xrdtypes as dtypes
-from .lib import sparse_array_type
+from .lib import dask_array_type, sparse_array_type
if TYPE_CHECKING:
FuncTuple = tuple[Callable | str, ...]
@@ -631,7 +631,7 @@ class AlignedArrays:
reduced = chunk_reduce(
self.array,
self.group_idx,
- func=("nanlast",),
+ func=("last",),
axis=-1,
# TODO: automate?
engine="flox",
@@ -699,6 +699,7 @@ def scan_binary_op(left_state: ScanState, right_state: ScanState, *, agg: Scan)
fill_value=agg.identity,
)
result = AlignedArrays(array=final_value[..., left.group_idx.size :], group_idx=right.group_idx)
+
else:
raise ValueError(f"Unknown binary op application mode: {agg.mode!r}")
@@ -717,8 +718,7 @@ def scan_binary_op(left_state: ScanState, right_state: ScanState, *, agg: Scan)
)
-# TODO: numpy_groupies cumsum is a broken when NaNs are present.
-# cumsum = Scan("cumsum", binary_op=np.add, reduction="sum", scan="cumsum", identity=0)
+cumsum = Scan("cumsum", binary_op=np.add, reduction="sum", scan="cumsum", identity=0)
nancumsum = Scan("nancumsum", binary_op=np.add, reduction="nansum", scan="nancumsum", identity=0)
# ffill uses the identity for scan, and then at the binary-op state,
# we concatenate the blockwise-reduced values with the original block,
@@ -782,7 +782,7 @@ AGGREGATIONS: dict[str, Aggregation | Scan] = {
"nanquantile": nanquantile,
"mode": mode,
"nanmode": nanmode,
- # "cumsum": cumsum,
+ "cumsum": cumsum,
"nancumsum": nancumsum,
"ffill": ffill,
"bfill": bfill,
@@ -895,3 +895,20 @@ def _initialize_aggregation(
agg.simple_combine = tuple(simple_combine)
return agg
+
+
+def is_supported_aggregation(array, func: str) -> bool:
+ if isinstance(array, dask_array_type):
+ array = array._meta
+
+ if isinstance(array, sparse_array_type):
+ from flox.core import _is_sparse_supported_reduction
+
+ return _is_sparse_supported_reduction(func)
+
+ module, *_ = type(array).__module__.split(".")
+
+ if module in ["numpy", "cubed"]:
+ return func in AGGREGATIONS
+ else:
+ return False
=====================================
flox/core.py
=====================================
@@ -6,7 +6,6 @@ import itertools
import logging
import math
import operator
-import sys
import warnings
from collections import namedtuple
from collections.abc import Callable, Sequence
@@ -49,6 +48,7 @@ from .aggregations import (
)
from .cache import memoize
from .lib import ArrayLayer, dask_array_type, sparse_array_type
+from .options import OPTIONS
from .xrutils import (
_contains_cftime_datetimes,
_to_pytimedelta,
@@ -71,13 +71,6 @@ HAS_NUMBAGG = module_available("numbagg", minversion="0.3.0")
HAS_SPARSE = module_available("sparse")
if TYPE_CHECKING:
- try:
- if sys.version_info < (3, 11):
- from typing_extensions import Unpack
- else:
- from typing import Unpack
- except (ModuleNotFoundError, ImportError):
- Unpack: Any # type: ignore[no-redef]
from .types import CubedArray, DaskArray, Graph
T_DuckArray: TypeAlias = np.ndarray | DaskArray | CubedArray # Any ?
@@ -119,6 +112,7 @@ FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask")
# _simple_combine.
DUMMY_AXIS = -2
+
logger = logging.getLogger("flox")
@@ -215,7 +209,10 @@ def _postprocess_numbagg(result, *, func, fill_value, size, seen_groups):
if needs_masking:
mask = np.isin(groups, seen_groups, assume_unique=True, invert=True)
if mask.any():
- result[..., groups[mask]] = fill_value
+ if isinstance(result, sparse_array_type):
+ result.fill_value = fill_value
+ else:
+ result[..., groups[mask]] = fill_value
return result
@@ -223,8 +220,11 @@ def identity(x: T) -> T:
return x
-def _issorted(arr: np.ndarray) -> bool:
- return bool((arr[:-1] <= arr[1:]).all())
+def _issorted(arr: np.ndarray, ascending=True) -> bool:
+ if ascending:
+ return bool((arr[:-1] <= arr[1:]).all())
+ else:
+ return bool((arr[:-1] >= arr[1:]).all())
def _is_arg_reduction(func: T_Agg) -> bool:
@@ -307,7 +307,7 @@ def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray:
def _get_optimal_chunks_for_groups(chunks, labels):
chunkidx = np.cumsum(chunks) - 1
# what are the groups at chunk boundaries
- labels_at_chunk_bounds = _unique(labels[chunkidx])
+ labels_at_chunk_bounds = pd.unique(labels[chunkidx])
# what's the last index of all groups
last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last")
# what's the last index of groups at the chunk boundaries.
@@ -325,6 +325,8 @@ def _get_optimal_chunks_for_groups(chunks, labels):
Δl = abs(c - l)
if c == 0 or newchunkidx[-1] > l:
continue
+ f = f.item() # noqa
+ l = l.item() # noqa
if Δf < Δl and f > newchunkidx[-1]:
newchunkidx.append(f)
else:
@@ -716,7 +718,9 @@ def rechunk_for_cohorts(
return array.rechunk({axis: newchunks})
-def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray:
+def rechunk_for_blockwise(
+ array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True
+) -> tuple[T_MethodOpt, DaskArray]:
"""
Rechunks array so that group boundaries line up with chunk boundaries, allowing
embarrassingly parallel group reductions.
@@ -739,14 +743,47 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) ->
DaskArray
Rechunked array
"""
- # TODO: this should be unnecessary?
- labels = factorize_((labels,), axes=())[0]
+
chunks = array.chunks[axis]
- newchunks = _get_optimal_chunks_for_groups(chunks, labels)
+ if len(chunks) == 1:
+ return "blockwise", array
+
+ # import dask
+ # from dask.utils import parse_bytes
+ # factor = parse_bytes(dask.config.get("array.chunk-size")) / (
+ # math.prod(array.chunksize) * array.dtype.itemsize
+ # )
+ # if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR:
+ # new_constant_chunks = math.ceil(factor) * max(chunks)
+ # q, r = divmod(array.shape[axis], new_constant_chunks)
+ # new_input_chunks = (new_constant_chunks,) * q + (r,)
+ # else:
+ new_input_chunks = chunks
+
+ # FIXME: this should be unnecessary?
+ labels = factorize_((labels,), axes=())[0]
+ newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels)
if newchunks == chunks:
- return array
+ return "blockwise", array
+
+ Δn = abs(len(newchunks) - len(new_input_chunks))
+ if pass_num_chunks_threshold := (
+ Δn / len(new_input_chunks) < OPTIONS["rechunk_blockwise_num_chunks_threshold"]
+ ):
+ logger.debug("blockwise rechunk passes num chunks threshold")
+ if pass_chunk_size_threshold := (
+ # we just pick the max because number of chunks may have changed.
+ (abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks))
+ < OPTIONS["rechunk_blockwise_chunk_size_threshold"]
+ ):
+ logger.debug("blockwise rechunk passes chunk size change threshold")
+
+ if force or (pass_num_chunks_threshold and pass_chunk_size_threshold):
+ logger.debug("Rechunking to enable blockwise.")
+ return "blockwise", array.rechunk({axis: newchunks})
else:
- return array.rechunk({axis: newchunks})
+ logger.debug("Didn't meet thresholds to do automatic rechunking for blockwise reductions.")
+ return None, array
def reindex_numpy(array, from_: pd.Index, to: pd.Index, fill_value, dtype, axis: int):
@@ -2500,7 +2537,7 @@ def groupby_reduce(
engine: T_EngineOpt = None,
reindex: ReindexStrategy | bool | None = None,
finalize_kwargs: dict[Any, Any] | None = None,
-) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]:
+) -> tuple[DaskArray, *tuple[np.ndarray | DaskArray, ...]]:
"""
GroupBy reductions using tree reductions for dask.array
@@ -2712,6 +2749,11 @@ def groupby_reduce(
has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_)
+ if method is None and is_duck_dask_array(array) and not any_by_dask and by_.ndim == 1 and _issorted(by_):
+ # Let's try rechunking for sorted 1D by.
+ (single_axis,) = axis_
+ method, array = rechunk_for_blockwise(array, single_axis, by_, force=False)
+
is_first_last = _is_first_last_reduction(func)
if is_first_last:
if has_dask and nax != 1:
@@ -2899,7 +2941,7 @@ def groupby_reduce(
# if preferred method is already blockwise, no need to rechunk
if preferred_method != "blockwise" and method == "blockwise" and by_.ndim == 1:
- array = rechunk_for_blockwise(array, axis=-1, labels=by_)
+ _, array = rechunk_for_blockwise(array, axis=-1, labels=by_)
result, groups = partial_agg(
array=array,
=====================================
flox/options.py
=====================================
@@ -0,0 +1,64 @@
+"""
+Started from xarray options.py; vendored from cf-xarray
+"""
+
+import copy
+from collections.abc import MutableMapping
+from typing import Any
+
+OPTIONS: MutableMapping[str, Any] = {
+ # Thresholds below which we will automatically rechunk to blockwise if it makes sense
+ # 1. Fractional change in number of chunks after rechunking
+ "rechunk_blockwise_num_chunks_threshold": 0.25,
+ # 2. Fractional change in max chunk size after rechunking
+ "rechunk_blockwise_chunk_size_threshold": 1.5,
+ # 3. If input arrays have chunk size smaller than `dask.array.chunk-size`,
+ # then adjust chunks to meet that size first.
+ # "rechunk.blockwise.chunk_size_factor": 1.5,
+}
+
+
+class set_options: # numpydoc ignore=PR01,PR02
+ """
+ Set options for cf-xarray in a controlled context.
+
+ Parameters
+ ----------
+ rechunk_blockwise_num_chunks_threshold : float
+ Rechunk if fractional change in number of chunks after rechunking
+ is less than this amount.
+ rechunk_blockwise_chunk_size_threshold: float
+ Rechunk if fractional change in max chunk size after rechunking
+ is less than this threshold.
+
+ Examples
+ --------
+
+ You can use ``set_options`` either as a context manager:
+
+ >>> import flox
+ >>> with flox.set_options(rechunk_blockwise_num_chunks_threshold=1):
+ ... pass
+
+ Or to set global options:
+
+ >>> flox.set_options(rechunk_blockwise_num_chunks_threshold=1):
+ """
+
+ def __init__(self, **kwargs):
+ self.old = {}
+ for k in kwargs:
+ if k not in OPTIONS:
+ raise ValueError(f"argument name {k!r} is not in the set of valid options {set(OPTIONS)!r}")
+ self.old[k] = OPTIONS[k]
+ self._apply_update(kwargs)
+
+ def _apply_update(self, options_dict):
+ options_dict = copy.deepcopy(options_dict)
+ OPTIONS.update(options_dict)
+
+ def __enter__(self):
+ return
+
+ def __exit__(self, type, value, traceback):
+ self._apply_update(self.old)
=====================================
flox/xarray.py
=====================================
@@ -1,10 +1,11 @@
from __future__ import annotations
from collections.abc import Hashable, Iterable, Sequence
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
import numpy as np
import pandas as pd
+import toolz
import xarray as xr
from packaging.version import Version
@@ -249,7 +250,7 @@ def xarray_reduce(
grouper_dims.append(d)
if isinstance(obj, xr.Dataset):
- ds = obj
+ ds = cast(xr.Dataset, obj)
else:
ds = obj._to_temp_dataset()
@@ -295,7 +296,7 @@ def xarray_reduce(
not set(grouper_dims).issubset(set(variable.dims)) for variable in ds.data_vars.values()
)
if needs_broadcast:
- ds_broad = xr.broadcast(ds, *by_da, exclude=exclude_dims)[0]
+ ds_broad = cast(xr.Dataset, xr.broadcast(ds, *by_da, exclude=exclude_dims)[0])
else:
ds_broad = ds
@@ -589,7 +590,7 @@ def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_Data
DataArray or Dataset
Xarray object with rechunked arrays.
"""
- return _rechunk(rechunk_array_for_blockwise, obj, dim, labels)
+ return _rechunk(toolz.compose(toolz.last, rechunk_array_for_blockwise), obj, dim, labels)
def _rechunk(func, obj, dim, labels, **kwargs):
=====================================
flox/xrdtypes.py
=====================================
@@ -196,14 +196,14 @@ def _get_fill_value(dtype, fill_value):
if fill_value == NA:
if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
return np.nan
- # This is madness, but npg checks that fill_value is compatible
- # with array dtype even if the fill_value is never used.
- elif np.issubdtype(dtype, np.integer):
- return get_neg_infinity(dtype, min_for_int=True)
elif np.issubdtype(dtype, np.timedelta64):
return np.timedelta64("NaT")
elif np.issubdtype(dtype, np.datetime64):
return np.datetime64("NaT")
+ # This is madness, but npg checks that fill_value is compatible
+ # with array dtype even if the fill_value is never used.
+ elif np.issubdtype(dtype, np.integer):
+ return get_neg_infinity(dtype, min_for_int=True)
else:
return None
return fill_value
=====================================
flox/xrutils.py
=====================================
@@ -4,6 +4,7 @@
import datetime
import importlib
from collections.abc import Iterable
+from types import ModuleType
from typing import Any
import numpy as np
@@ -45,13 +46,20 @@ try:
except ImportError:
cftime = None
+cubed: ModuleType | None
+try:
+ import cubed # type: ignore[no-redef]
+except ImportError:
+ cubed = None
+dask: ModuleType | None
try:
import dask.array
- dask_array_type = dask.array.Array
+ dask_array_type = dask.array.Array # type: ignore[union-attr]
except ImportError:
- dask_array_type = () # type: ignore[assignment, misc]
+ dask = None
+ dask_array_type = ()
def asarray(data, xp=np):
@@ -79,13 +87,9 @@ def is_chunked_array(x) -> bool:
def is_dask_collection(x):
- try:
- import dask
-
- return dask.is_dask_collection(x)
-
- except ImportError:
+ if dask is None:
return False
+ return dask.is_dask_collection(x)
def is_duck_dask_array(x):
@@ -93,12 +97,9 @@ def is_duck_dask_array(x):
def is_duck_cubed_array(x):
- try:
- import cubed
-
- return is_duck_array(x) and isinstance(x, cubed.Array)
- except ImportError:
+ if cubed is None:
return False
+ return is_duck_array(x) and isinstance(x, cubed.Array)
class ReprObject:
@@ -140,7 +141,7 @@ def is_scalar(value: Any, include_0d: bool = True) -> bool:
or isinstance(value, str | bytes | dict)
or not (
isinstance(value, (Iterable,) + NON_NUMPY_SUPPORTED_ARRAY_TYPES)
- or hasattr(value, "__array_function__")
+ or hasattr(value, "__array_function__") # type: ignore[unreachable]
)
)
@@ -182,13 +183,13 @@ def isnull(data: Any):
else:
# at this point, array should have dtype=object
if isinstance(data, (np.ndarray, dask_array_type)): # noqa
- return pd.isnull(data) # type: ignore[arg-type]
+ return pd.isnull(data)
else:
# Not reachable yet, but intended for use with other duck array
# types. For full consistency with pandas, we should accept None as
# a null value as well as NaN, but it isn't clear how to do this
# with duck typing.
- return data != data
+ return data != data # type: ignore[unreachable]
def datetime_to_numeric(array, offset=None, datetime_unit=None, dtype=float):
=====================================
pyproject.toml
=====================================
@@ -3,7 +3,7 @@ name = "flox"
description = "GroupBy operations for dask.array"
license = {file = "LICENSE"}
readme = "README.md"
-requires-python = ">=3.10"
+requires-python = ">=3.11"
keywords = ["xarray", "dask", "groupby"]
classifiers = [
"Development Status :: 4 - Beta",
@@ -11,18 +11,17 @@ classifiers = [
"Natural Language :: English",
"Operating System :: OS Independent",
"Programming Language :: Python",
- "Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
]
dependencies = [
- "pandas>=1.5",
+ "pandas>=2.1",
"packaging>=21.3",
- "numpy>=1.22",
+ "numpy>=1.26",
"numpy_groupies>=0.9.19",
"toolz",
- "scipy>=1.9",
+ "scipy>=1.12",
]
dynamic=["version"]
@@ -39,10 +38,10 @@ test = ["netCDF4"]
[build-system]
requires = [
- "pandas>=1.5",
- "numpy>=1.22",
+ "pandas>=2.1",
+ "numpy>=1.26",
"numpy_groupies>=0.9.19",
- "scipy>=1.9",
+ "scipy>=1.12",
"toolz",
"setuptools>=61.0.0",
"setuptools_scm[toml]>=7.0",
@@ -62,7 +61,7 @@ write_to_template= '__version__ = "{version}"'
[tool.ruff]
line-length = 110
-target-version = "py310"
+target-version = "py311"
builtins = ["ellipsis"]
exclude = [
".eggs",
@@ -90,6 +89,8 @@ select = [
"I",
# Pyupgrade
"UP",
+ # flake8-tidy-imports
+ "TID",
]
[tool.ruff.lint.isort]
=====================================
tests/test_core.py
=====================================
@@ -14,8 +14,8 @@ import pytest
from numpy_groupies.aggregate_numpy import aggregate
import flox
+from flox import set_options, xrutils
from flox import xrdtypes as dtypes
-from flox import xrutils
from flox.aggregations import Aggregation, _initialize_aggregation
from flox.core import (
HAS_NUMBAGG,
@@ -31,6 +31,7 @@ from flox.core import (
find_group_cohorts,
groupby_reduce,
groupby_scan,
+ rechunk_for_blockwise,
rechunk_for_cohorts,
reindex_,
subset_to_blocks,
@@ -979,26 +980,39 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None:
assert_equal(actual, expected)
+ at requires_dask
@pytest.mark.parametrize(
- "inchunks, expected",
+ "inchunks, expected, expected_method",
[
- [(1,) * 10, (3, 2, 2, 3)],
- [(2,) * 5, (3, 2, 2, 3)],
- [(3, 3, 3, 1), (3, 2, 5)],
- [(3, 1, 1, 2, 1, 1, 1), (3, 2, 2, 3)],
- [(3, 2, 2, 3), (3, 2, 2, 3)],
- [(4, 4, 2), (3, 4, 3)],
- [(5, 5), (5, 5)],
- [(6, 4), (5, 5)],
- [(7, 3), (7, 3)],
- [(8, 2), (7, 3)],
- [(9, 1), (10,)],
- [(10,), (10,)],
+ [(1,) * 10, (3, 2, 2, 3), None],
+ [(2,) * 5, (3, 2, 2, 3), None],
+ [(3, 3, 3, 1), (3, 2, 5), None],
+ [(3, 1, 1, 2, 1, 1, 1), (3, 2, 2, 3), None],
+ [(3, 2, 2, 3), (3, 2, 2, 3), "blockwise"],
+ [(4, 4, 2), (3, 4, 3), None],
+ [(5, 5), (5, 5), "blockwise"],
+ [(6, 4), (5, 5), None],
+ [(7, 3), (7, 3), "blockwise"],
+ [(8, 2), (7, 3), None],
+ [(9, 1), (10,), None],
+ [(10,), (10,), "blockwise"],
],
)
-def test_rechunk_for_blockwise(inchunks, expected):
+def test_rechunk_for_blockwise(inchunks, expected, expected_method):
labels = np.array([1, 1, 1, 2, 2, 3, 3, 5, 5, 5])
assert _get_optimal_chunks_for_groups(inchunks, labels) == expected
+ # reversed
+ assert _get_optimal_chunks_for_groups(inchunks, labels[::-1]) == expected
+
+ with set_options(rechunk_blockwise_chunk_size_threshold=-1):
+ array = dask.array.ones(labels.size, chunks=(inchunks,))
+ method, array = rechunk_for_blockwise(array, -1, labels, force=False)
+ assert method == expected_method
+ assert array.chunks == (inchunks,)
+
+ method, array = rechunk_for_blockwise(array, -1, labels[::-1], force=False)
+ assert method == expected_method
+ assert array.chunks == (inchunks,)
@requires_dask
@@ -1974,18 +1988,56 @@ def test_nanlen_string(dtype, engine) -> None:
assert_equal(expected, actual)
-def test_cumsum() -> None:
- array = np.array([1, 1, 1], dtype=np.uint64)
+ at pytest.mark.parametrize(
+ "array",
+ [
+ np.array([1, 1, 1, 2, 3, 4, 5], dtype=np.uint64),
+ np.array([1, 1, 1, 2, np.nan, 4, 5], dtype=np.float64),
+ ],
+)
+ at pytest.mark.parametrize("func", ["cumsum", "nancumsum"])
+def test_cumsum_simple(array, func) -> None:
by = np.array([0] * array.shape[-1])
- expected = np.nancumsum(array, axis=-1)
+ expected = getattr(np, func)(array, axis=-1)
+
+ actual = groupby_scan(array, by, func=func, axis=-1)
+ assert_equal(actual, expected)
+
+ if has_dask:
+ da = dask.array.from_array(array, chunks=2)
+ actual = groupby_scan(da, by, func=func, axis=-1)
+ assert_equal(actual, expected)
- actual = groupby_scan(array, by, func="nancumsum", axis=-1)
- assert_equal(expected, actual)
+def test_cumsum() -> None:
+ array = np.array(
+ [
+ [1, 2, np.nan, 4, 5],
+ [3, np.nan, 4, 6, 6],
+ ]
+ )
+ by = [0, 1, 1, 0, 1]
+
+ expected = np.array(
+ [
+ [1, 2, np.nan, 5, np.nan],
+ [3, np.nan, np.nan, 9, np.nan],
+ ]
+ )
+ actual = groupby_scan(array, by, func="cumsum", axis=-1)
+ assert_equal(actual, expected)
+ if has_dask:
+ da = dask.array.from_array(array, chunks=2)
+ actual = groupby_scan(da, by, func="cumsum", axis=-1)
+ assert_equal(actual, expected)
+
+ expected = np.array([[1, 2, 2, 5, 7], [3, 0, 4, 9, 10]], dtype=np.float64)
+ actual = groupby_scan(array, by, func="nancumsum", axis=-1)
+ assert_equal(actual, expected)
if has_dask:
da = dask.array.from_array(array, chunks=2)
actual = groupby_scan(da, by, func="nancumsum", axis=-1)
- assert_equal(expected, actual)
+ assert_equal(actual, expected)
@pytest.mark.parametrize(
=====================================
tests/test_properties.py
=====================================
@@ -56,10 +56,11 @@ def bfill(array, axis, dtype=None):
NUMPY_SCAN_FUNCS: dict[str, Callable] = {
+ "cumsum": np.cumsum,
"nancumsum": np.nancumsum,
"ffill": ffill,
"bfill": bfill,
-} # "cumsum": np.cumsum,
+}
def not_overflowing_array(array: np.ndarray[Any, Any]) -> bool:
@@ -210,7 +211,7 @@ def test_groupby_reduce_numpy_vs_other(data, array, func: str) -> None:
array=chunked_arrays(arrays=numeric_like_arrays),
func=st.sampled_from(tuple(NUMPY_SCAN_FUNCS)),
)
-def test_scans(data, array: dask.array.Array, func: str) -> None:
+def test_scans_against_numpy(data, array: dask.array.Array, func: str) -> None:
if "cum" in func:
assume(not_overflowing_array(np.asarray(array)))
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/df73f5f722bb9dfba7054035d4089cf4e484f10c
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/df73f5f722bb9dfba7054035d4089cf4e484f10c
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20250818/b82fb896/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list