[Git][debian-gis-team/flox][upstream] New upstream version 0.9.0
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Sat Jan 27 11:20:28 GMT 2024
Antonio Valentino pushed to branch upstream at Debian GIS Project / flox
Commits:
6907084c by Antonio Valentino at 2024-01-27T10:53:24+00:00
New upstream version 0.9.0
- - - - -
22 changed files:
- .github/workflows/benchmarks.yml
- .github/workflows/upstream-dev-ci.yaml
- .pre-commit-config.yaml
- asv_bench/benchmarks/cohorts.py
- asv_bench/benchmarks/reduce.py
- ci/environment.yml
- ci/minimal-requirements.yml
- ci/no-dask.yml
- ci/no-numba.yml
- ci/no-xarray.yml
- ci/upstream-dev-env.yml
- + docs/diagrams/bitmask-patterns-perfect.png
- + docs/diagrams/containment.png
- + docs/diagrams/counties-bitmask-containment.png
- + docs/diagrams/nwm-cohorts.png
- docs/source/implementation.md
- docs/source/user-stories/climatology.ipynb
- flox/core.py
- flox/visualize.py
- tests/__init__.py
- tests/test_core.py
- tests/test_xarray.py
Changes:
=====================================
.github/workflows/benchmarks.yml
=====================================
@@ -41,15 +41,19 @@ jobs:
OMP_NUM_THREADS: 1
ASV_FACTOR: 1.5
ASV_SKIP_SLOW: 1
+ BASE_SHA: ${{ github.event.pull_request.base.sha }}
+ LAST_HEAD_SHA: ${{ github.event.pull_request.head.sha }}
+ HEAD_LABEL: ${{ github.event.pull_request.head.label }}
+ BASE_LABEL: ${{ github.event.pull_request.base.label }}
run: |
# set -x
# ID this runner
asv machine --yes
- echo "Baseline: ${{ github.event.pull_request.base.sha }} (${{ github.event.pull_request.base.label }})"
- echo "Contender: ${GITHUB_SHA} (${{ github.event.pull_request.head.label }})"
+ echo "Baseline: $LAST_HEAD_SHA ($BASE_LABEL)"
+ echo "Contender: ${GITHUB_SHA} ($HEAD_LABEL)"
# Run benchmarks for current commit against base
ASV_OPTIONS="--split --show-stderr --factor $ASV_FACTOR"
- asv continuous $ASV_OPTIONS ${{ github.event.pull_request.base.sha }} ${GITHUB_SHA} \
+ asv continuous $ASV_OPTIONS $BASE_SHA ${GITHUB_SHA} \
| sed "/Traceback \|failed$\|PERFORMANCE DECREASED/ s/^/::error::/" \
| tee benchmarks.log
# Report and export results for subsequent steps
=====================================
.github/workflows/upstream-dev-ci.yaml
=====================================
@@ -7,6 +7,9 @@ on:
types: [opened, reopened, synchronize, labeled]
branches:
- main
+ paths:
+ - ".github/workflows/upstream-dev-ci.yaml"
+ - "ci/upstream-dev-env.yml"
schedule:
- cron: "0 0 * * *" # Daily “At 00:00” UTC
workflow_dispatch: # allows you to trigger the workflow run manually
@@ -41,16 +44,49 @@ jobs:
- name: Set up conda environment
uses: mamba-org/setup-micromamba at v1
with:
- environment-file: ci/upstream-dev-env.yml
environment-name: flox-tests
init-shell: bash
- cache-environment: true
+ # cache-environment: true
+ # micromamba list does not list pip dependencies, so install mamba
create-args: >-
+ mamba
+ pip
python=${{ matrix.python-version }}
pytest-reportlog
+
+ - name: Install upstream dev dependencies
+ run: |
+ # install cython for building cftime without build isolation
+ micromamba install -f ci/upstream-dev-env.yml
+ micromamba remove --force numpy scipy pandas cftime
+ python -m pip install \
+ -i https://pypi.anaconda.org/scientific-python-nightly-wheels/simple \
+ --no-deps \
+ --pre \
+ --upgrade \
+ numpy \
+ scipy \
+ pandas \
+ xarray
+ # without build isolation for packages compiling against numpy
+ # TODO: remove once there are `numpy>=2.0` builds for cftime
+ python -m pip install \
+ --no-deps \
+ --upgrade \
+ --no-build-isolation \
+ git+https://github.com/Unidata/cftime
+ python -m pip install \
+ git+https://github.com/dask/dask \
+ git+https://github.com/ml31415/numpy-groupies
+
- name: Install flox
run: |
python -m pip install --no-deps -e .
+
+ - name: List deps
+ run: |
+ # micromamba list does not list pip dependencies
+ mamba list
- name: Run Tests
if: success()
id: status
=====================================
.pre-commit-config.yaml
=====================================
@@ -59,3 +59,22 @@ repos:
rev: v0.15
hooks:
- id: validate-pyproject
+
+ - repo: https://github.com/rhysd/actionlint
+ rev: v1.6.26
+ hooks:
+ - id: actionlint
+ files: ".github/workflows/"
+ args:
+ [
+ "-ignore",
+ "SC1090",
+ "-ignore",
+ "SC2046",
+ "-ignore",
+ "SC2086",
+ "-ignore",
+ "SC2129",
+ "-ignore",
+ "SC2155",
+ ]
=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -11,12 +11,19 @@ class Cohorts:
def setup(self, *args, **kwargs):
raise NotImplementedError
+ def containment(self):
+ asfloat = self.bitmask().astype(float)
+ chunks_per_label = asfloat.sum(axis=0)
+ containment = (asfloat.T @ asfloat) / chunks_per_label
+ print(containment.nnz / np.prod(containment.shape))
+ return containment.todense()
+
def chunks_cohorts(self):
return flox.core.find_group_cohorts(
self.by,
[self.array.chunks[ax] for ax in self.axis],
expected_groups=self.expected,
- )
+ )[1]
def bitmask(self):
chunks = [self.array.chunks[ax] for ax in self.axis]
@@ -75,9 +82,9 @@ class NWMMidwest(Cohorts):
y = np.repeat(np.arange(30), 60)
by = x[np.newaxis, :] * y[:, np.newaxis]
- self.by = flox.core._factorize_multiple(
- (by,), expected_groups=(None,), any_by_dask=False, reindex=False
- )[0][0]
+ self.by = flox.core._factorize_multiple((by,), expected_groups=(None,), any_by_dask=False)[
+ 0
+ ][0]
self.array = dask.array.ones(self.by.shape, chunks=(350, 350))
self.axis = (-2, -1)
@@ -119,8 +126,7 @@ class ERA5MonthHour(ERA5Dataset, Cohorts):
ret = flox.core._factorize_multiple(
by,
(pd.Index(np.arange(1, 13)), pd.Index(np.arange(1, 25))),
- False,
- reindex=False,
+ any_by_dask=False,
)
# Add one so the rechunk code is simpler and makes sense
self.by = ret[0][0]
@@ -185,3 +191,13 @@ class PerfectBlockwiseResampling(Cohorts):
self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, 10))
self.by = codes_for_resampling(index, freq="5D")
self.expected = pd.RangeIndex(self.by.max() + 1)
+
+
+class OISST(Cohorts):
+ def setup(self, *args, **kwargs):
+ self.array = dask.array.ones((1, 14532), chunks=(1, 10))
+ self.axis = (1,)
+ index = pd.date_range("1981-09-01 12:00", "2021-06-14 12:00", freq="D")
+ self.time = pd.Series(index)
+ self.by = self.time.dt.dayofyear.values - 1
+ self.expected = pd.RangeIndex(self.by.max() + 1)
=====================================
asv_bench/benchmarks/reduce.py
=====================================
@@ -7,7 +7,7 @@ import flox.aggregations
N = 3000
funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "count"]
-engines = [None, "flox", "numpy", "numbagg"]
+engines = [None, "flox", "numpy"] # numbagg is disabled for now since it takes ages in CI
expected_groups = {
"None": None,
"bins": pd.IntervalIndex.from_breaks([1, 2, 4]),
=====================================
ci/environment.yml
=====================================
@@ -4,9 +4,9 @@ channels:
dependencies:
- asv
- cachey
+ - cftime
- codecov
- dask-core
- - netcdf4
- pandas
- numpy>=1.22
- scipy
=====================================
ci/minimal-requirements.yml
=====================================
@@ -3,7 +3,6 @@ channels:
- conda-forge
dependencies:
- codecov
- - netcdf4
- pip
- pytest
- pytest-cov
=====================================
ci/no-dask.yml
=====================================
@@ -3,8 +3,8 @@ channels:
- conda-forge
dependencies:
- codecov
- - netcdf4
- pandas
+ - cftime
- numpy>=1.22
- scipy
- pip
=====================================
ci/no-numba.yml
=====================================
@@ -4,9 +4,9 @@ channels:
dependencies:
- asv
- cachey
+ - cftime
- codecov
- dask-core
- - netcdf4
- pandas
- numpy>=1.22
- scipy
=====================================
ci/no-xarray.yml
=====================================
@@ -3,7 +3,6 @@ channels:
- conda-forge
dependencies:
- codecov
- - netcdf4
- pandas
- numpy>=1.22
- scipy
=====================================
ci/upstream-dev-env.yml
=====================================
@@ -4,19 +4,25 @@ channels:
dependencies:
- cachey
- codecov
- - netcdf4
- pooch
- toolz
- - numba
- - scipy
- - pytest
- - pytest-cov
+ # - numpy
+ # - pandas
+ # - scipy
- pytest-pretty
- pytest-xdist
- pip
- - pip:
- - git+https://github.com/pydata/xarray
- - git+https://github.com/pandas-dev/pandas
- - git+https://github.com/dask/dask
- - git+https://github.com/ml31415/numpy-groupies
- - git+https://github.com/numbagg/numbagg
+ # for cftime
+ - cython>=0.29.20
+ - py-cpuinfo
+ # - numba
+ - pytest
+ - pytest-cov
+ # for upstream pandas
+ - python-dateutil
+ - pytz
+ # - pip:
+ # - git+https://github.com/pydata/xarray
+ # - git+https://github.com/dask/dask
+ # - git+https://github.com/ml31415/numpy-groupies
+ # # - git+https://github.com/numbagg/numbagg
=====================================
docs/diagrams/bitmask-patterns-perfect.png
=====================================
Binary files /dev/null and b/docs/diagrams/bitmask-patterns-perfect.png differ
=====================================
docs/diagrams/containment.png
=====================================
Binary files /dev/null and b/docs/diagrams/containment.png differ
=====================================
docs/diagrams/counties-bitmask-containment.png
=====================================
Binary files /dev/null and b/docs/diagrams/counties-bitmask-containment.png differ
=====================================
docs/diagrams/nwm-cohorts.png
=====================================
Binary files /dev/null and b/docs/diagrams/nwm-cohorts.png differ
=====================================
docs/source/implementation.md
=====================================
@@ -1,3 +1,12 @@
+---
+jupytext:
+ text_representation:
+ format_name: myst
+kernelspec:
+ display_name: Python 3
+ name: python3
+---
+
(algorithms)=
# Parallel Algorithms
@@ -7,10 +16,14 @@
can be hard. Performance strongly depends on how the groups are distributed amongst the blocks of an array.
`flox` implements 4 strategies for grouped reductions, each is appropriate for a particular distribution of groups
-among the blocks of a dask array. Switch between the various strategies by passing `method`
-and/or `reindex` to either {py:func}`flox.groupby_reduce` or {py:func}`flox.xarray.xarray_reduce`.
+among the blocks of a dask array.
+
+```{tip}
+By default, `flox >= 0.9.0` will use [heuristics](method-heuristics) to choose a `method`.
+```
-Your options are:
+Switch between the various strategies by passing `method` and/or `reindex` to either {py:func}`flox.groupby_reduce`
+or {py:func}`flox.xarray.xarray_reduce`. Your options are:
1. [`method="map-reduce"` with `reindex=False`](map-reindex-false)
1. [`method="map-reduce"` with `reindex=True`](map-reindex-True)
@@ -20,18 +33,17 @@ Your options are:
The most appropriate strategy for your problem will depend on the chunking of your dataset,
and the distribution of group labels across those chunks.
-```{tip}
Currently these strategies are implemented for dask. We would like to generalize to other parallel array types
as appropriate (e.g. Ramba, cubed, arkouda). Please open an issue to discuss if you are interested.
-```
(xarray-split)=
-## Background: Xarray's current GroupBy strategy
+## Background
-Xarray's current strategy is to find all unique group labels, index out each group,
-and then apply the reduction operation. Note that this only works if we know the group
-labels (i.e. you cannot use this strategy to group by a dask array).
+Without `flox` installed, Xarray's GroupBy strategy is to find all unique group labels,
+index out each group, and then apply the reduction operation. Note that this only works
+if we know the group labels (i.e. you cannot use this strategy to group by a dask array),
+and is basically an unvectorized slow for-loop over groups.
Schematically, this looks like (colors indicate group labels; separated groups of colors
indicate different blocks of an array):
@@ -208,23 +220,91 @@ One annoyance is that if the chunksize doesn't evenly divide the number of group
Consider our earlier example, `groupby("time.month")` with monthly frequency data and chunksize of 4 along `time`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk4.png)
+```{code-cell}
+import flox
+import numpy as np
+
+labels = np.tile(np.arange(12), 12)
+chunks = (tuple(np.repeat(4, labels.size // 4)),)
+```
+
`flox` can find these cohorts, below it identifies the cohorts with labels `1,2,3,4`; `5,6,7,8`, and `9,10,11,12`.
-```python
->>> flox.find_group_cohorts(labels, array.chunks[-1]).values()
-[[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]] # 3 cohorts
+```{code-cell}
+preferred_method, chunks_cohorts = flox.core.find_group_cohorts(labels, chunks)
+chunks_cohorts.values()
```
Now consider `chunksize=5`.
![cohorts-schematic](/../diagrams/cohorts-month-chunk5.png)
-```python
->>> flox.core.find_group_cohorts(labels, array.chunks[-1]).values()
-[[1], [2, 3], [4, 5], [6], [7, 8], [9, 10], [11], [12]] # 8 cohorts
+```{code-cell}
+labels = np.tile(np.arange(12), 12)
+chunks = (tuple(np.repeat(5, labels.size // 5)) + (4,),)
+preferred_method, chunks_cohorts = flox.core.find_group_cohorts(labels, chunks, merge=True)
+chunks_cohorts.values()
```
-We find 8 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts).
-In this case, it seems to better to rechunk to a size of `4` along `time`.
-If you have ideas for improving this case, please open an issue.
+We find 7 cohorts (note the original xarray strategy is equivalent to constructing 12 cohorts).
+In this case, it seems to better to rechunk to a size of `4` (or `6`) along `time`.
+
+Indeed flox's heuristics think `"map-reduce"` is better for this case:
+
+```{code-cell}
+preferred_method
+```
### Example : spatial grouping
+
+Spatial groupings are particularly interesting for the `"cohorts"` strategy. Consider the problem of computing county-level
+aggregated statistics ([example blog post](https://xarray.dev/blog/flox)). There are ~3100 groups (counties), each marked by
+a different color. There are ~2300 chunks of size (350, 350) in (lat, lon). Many groups are contained to a small number of chunks:
+see left panel where the grid lines mark chunk boundaries.
+
+![cohorts-schematic](/../diagrams/nwm-cohorts.png)
+
+This seems like a good fit for `'cohorts'`: to get the answer for a county in the Northwest US, we needn't look at values
+for the southwest US. How do we decide that automatically for the user?
+
+(method-heuristics)=
+
+## Heuristics
+
+`flox >=0.9` will automatically choose `method` for you. To do so, we need to detect how each group
+label is distributed across the chunks of the array; and the degree to which the chunk distribution for a particular
+label overlaps with all other labels. The algorithm is as follows.
+
+1. First determine which labels are present in each chunk. The distribution of labels across chunks
+ is represented internally as a 2D boolean sparse array `S[chunks, labels]`. `S[i, j] = 1` when
+ label `j` is present in chunk `i`.
+
+1. Then we look for patterns in `S` to decide if we can use `"blockwise"`. The dark color cells are `1` at that
+ cell in `S`.
+ ![bitmask-patterns](/../diagrams/bitmask-patterns-perfect.png)
+
+ - On the left, is a monthly grouping for a monthly time series with chunk size 4. There are 3 non-overlapping cohorts so
+ `method="cohorts"` is perfect.
+ - On the right, is a resampling problem of a daily time series with chunk size 10 to 5-daily frequency. Two 5-day periods
+ are exactly contained in one chunk, so `method="blockwise"` is perfect.
+
+1. The metric used for determining the degree of overlap between the chunks occupied by different labels is
+ [containment](http://ekzhu.com/datasketch/lshensemble.html). For each label `i` we can quickly compute containment against
+ all other labels `j` as `C = S.T @ S / number_chunks_per_label`. Here is `C` for a range of chunk sizes from 1 to 12, for computing
+ the monthly mean of a monthly time series problem, \[the title on each image is `(chunk size, sparsity)`\].
+
+ ```python
+ chunks = np.arange(1, 13)
+ labels = np.tile(np.arange(1, 13), 30)
+ ```
+
+ ![cohorts-schematic](/../diagrams/containment.png)
+
+1. To choose between `"map-reduce"` and `"cohorts"`, we need a summary measure of the degree to which the labels overlap with
+ each other. We use _sparsity_ --- the number of non-zero elements in `C` divided by the number of elements in `C`, `C.nnz/C.size`.
+ When sparsity > 0.6, we choose `"map-reduce"` since there is decent overlap between (any) cohorts. Otherwise we use `"cohorts"`.
+
+Cool, isn't it?!
+
+For reference here is `S` and `C` for the US county groupby problem:
+![county-bitmask](/../diagrams/counties-bitmask-containment.png)
+The sparsity of `C` is 0.006, so `"cohorts"` seems a good strategy here.
=====================================
docs/source/user-stories/climatology.ipynb
=====================================
@@ -84,7 +84,7 @@
"\n",
"The default\n",
"[method=\"map-reduce\"](https://flox.readthedocs.io/en/latest/implementation.html#method-map-reduce)\n",
- "doesn't work so well. We aggregate all days in a single chunk.\n",
+ "doesn't work so well. We aggregate all days in a single ~3GB chunk.\n",
"\n",
"For this to work well, we'd want smaller chunks in space and bigger chunks in\n",
"time.\n"
@@ -191,7 +191,7 @@
"source": [
"# integer codes for each \"day\"\n",
"codes, _ = pd.factorize(day.data)\n",
- "cohorts = flox.core.find_group_cohorts(\n",
+ "preferred_method, cohorts = flox.core.find_group_cohorts(\n",
" labels=codes,\n",
" chunks=(oisst.chunksizes[\"time\"],),\n",
")\n",
@@ -281,7 +281,7 @@
"metadata": {},
"outputs": [],
"source": [
- "new_cohorts = flox.core.find_group_cohorts(\n",
+ "preferrd_method, new_cohorts = flox.core.find_group_cohorts(\n",
" labels=codes,\n",
" chunks=(rechunked.chunksizes[\"time\"],),\n",
")\n",
=====================================
flox/core.py
=====================================
@@ -1,6 +1,7 @@
from __future__ import annotations
import itertools
+import logging
import math
import operator
import sys
@@ -37,6 +38,13 @@ from .aggregations import (
from .cache import memoize
from .xrutils import is_duck_array, is_duck_dask_array, isnull, module_available
+if module_available("numpy", minversion="2.0.0"):
+ from numpy.lib.array_utils import ( # type: ignore[import-not-found]
+ normalize_axis_tuple,
+ )
+else:
+ from numpy.core.numeric import normalize_axis_tuple # type: ignore[attr-defined]
+
HAS_NUMBAGG = module_available("numbagg", minversion="0.3.0")
if TYPE_CHECKING:
@@ -75,6 +83,7 @@ if TYPE_CHECKING:
T_Engine = Literal["flox", "numpy", "numba", "numbagg"]
T_EngineOpt = None | T_Engine
T_Method = Literal["map-reduce", "blockwise", "cohorts"]
+ T_MethodOpt = None | Literal["map-reduce", "blockwise", "cohorts"]
T_IsBins = Union[bool | Sequence[bool]]
@@ -87,6 +96,8 @@ FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask")
# _simple_combine.
DUMMY_AXIS = -2
+logger = logging.getLogger("flox")
+
class FactorizeKwargs(TypedDict, total=False):
"""Used in _factorize_multiple"""
@@ -233,12 +244,11 @@ def _compute_label_chunk_bitmask(labels, chunks, nlabels):
labels = np.broadcast_to(labels, shape[-labels.ndim :])
- rows = []
cols = []
# Add one to handle the -1 sentinel value
label_is_present = np.zeros((nlabels + 1,), dtype=bool)
ilabels = np.arange(nlabels)
- for idx, region in enumerate(slices_from_chunks(chunks)):
+ for region in slices_from_chunks(chunks):
# This is a quite fast way to find unique integers, when we know how many there are
# inspired by a similar idea in numpy_groupies for first, last
# instead of explicitly finding uniques, repeatedly write True to the same location
@@ -248,10 +258,9 @@ def _compute_label_chunk_bitmask(labels, chunks, nlabels):
# skip the -1 sentinel by slicing
# Faster than np.argwhere by a lot
uniques = ilabels[label_is_present[:-1]]
- rows.append(np.full_like(uniques, idx))
cols.append(uniques)
label_is_present[:] = False
- rows_array = np.concatenate(rows)
+ rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols))
cols_array = np.concatenate(cols)
data = np.broadcast_to(np.array(1, dtype=np.uint8), rows_array.shape)
bitmask = csc_array((data, (rows_array, cols_array)), dtype=bool, shape=(nchunks, nlabels))
@@ -260,7 +269,9 @@ def _compute_label_chunk_bitmask(labels, chunks, nlabels):
# @memoize
-def find_group_cohorts(labels, chunks, expected_groups: None | pd.RangeIndex = None) -> dict:
+def find_group_cohorts(
+ labels, chunks, expected_groups: None | pd.RangeIndex = None, merge: bool = False
+) -> tuple[T_Method, dict]:
"""
Finds groups labels that occur together aka "cohorts"
@@ -277,9 +288,13 @@ def find_group_cohorts(labels, chunks, expected_groups: None | pd.RangeIndex = N
chunks of the array being reduced
expected_groups: pd.RangeIndex (optional)
Used to extract the largest label expected
+ merge: bool (optional)
+ Whether to merge cohorts or not. Set to True if a user
+ specifies "cohorts" but other methods are preferable.
Returns
-------
+ preferred_method: {"blockwise", cohorts", "map-reduce"}
cohorts: dict_values
Iterable of cohorts
"""
@@ -321,31 +336,67 @@ def find_group_cohorts(labels, chunks, expected_groups: None | pd.RangeIndex = N
chunks_cohorts = tlz.groupby(invert, label_chunks.keys())
- # No merging is possible when
- # 1. Our dataset has chunksize one along the axis,
+ # 1. Every group is contained to one block, use blockwise here.
+ if bitmask.shape[CHUNK_AXIS] == 1 or (chunks_per_label == 1).all():
+ logger.info("find_group_cohorts: blockwise is preferred.")
+ return "blockwise", chunks_cohorts
+
+ # 2. Perfectly chunked so there is only a single cohort
+ if len(chunks_cohorts) == 1:
+ logger.info("Only found a single cohort. 'map-reduce' is preferred.")
+ return "map-reduce", chunks_cohorts if merge else {}
+
+ # 3. Our dataset has chunksize one along the axis,
single_chunks = all(all(a == 1 for a in ac) for ac in chunks)
- # 2. Every chunk only has a single group, but that group might extend across multiple chunks
+ # 4. Every chunk only has a single group, but that group might extend across multiple chunks
one_group_per_chunk = (bitmask.sum(axis=LABEL_AXIS) == 1).all()
- # 3. Every group is contained to one block, we should be using blockwise here.
- every_group_one_block = (chunks_per_label == 1).all()
- # 4. Existing cohorts don't overlap, great for time grouping with perfect chunking
+ # 5. Existing cohorts don't overlap, great for time grouping with perfect chunking
no_overlapping_cohorts = (np.bincount(np.concatenate(tuple(chunks_cohorts.keys()))) == 1).all()
-
- if every_group_one_block or one_group_per_chunk or single_chunks or no_overlapping_cohorts:
- return chunks_cohorts
+ if one_group_per_chunk or single_chunks or no_overlapping_cohorts:
+ logger.info("find_group_cohorts: cohorts is preferred, chunking is perfect.")
+ return "cohorts", chunks_cohorts
# Containment = |Q & S| / |Q|
# - |X| is the cardinality of set X
# - Q is the query set being tested
# - S is the existing set
- MIN_CONTAINMENT = 0.75 # arbitrary
+ # We'll use containment to measure degree of overlap between labels. The bitmask
+ # matrix allows us to calculate this pretty efficiently.
asfloat = bitmask.astype(float)
+ # Note: While A.T @ A is a symmetric matrix, the division by chunks_per_label
+ # makes it non-symmetric.
containment = csr_array((asfloat.T @ asfloat) / chunks_per_label)
+
+ # The containment matrix is a measure of how much the labels overlap
+ # with each other. We treat the sparsity = (nnz/size) as a summary measure of the net overlap.
+ # 1. For high enough sparsity, there is a lot of overlap and we should use "map-reduce".
+ # 2. When labels are uniformly distributed amongst all chunks
+ # (and number of labels < chunk size), sparsity is 1.
+ # 3. Time grouping cohorts (e.g. dayofyear) appear as lines in this matrix.
+ # 4. When there are no overlaps at all between labels, containment is a block diagonal matrix
+ # (approximately).
+ MAX_SPARSITY_FOR_COHORTS = 0.6 # arbitrary
+ sparsity = containment.nnz / math.prod(containment.shape)
+ preferred_method: Literal["map-reduce"] | Literal["cohorts"]
+ if sparsity > MAX_SPARSITY_FOR_COHORTS:
+ logger.info("sparsity is {}".format(sparsity)) # noqa
+ if not merge:
+ logger.info("find_group_cohorts: merge=False, choosing 'map-reduce'")
+ return "map-reduce", {}
+ preferred_method = "map-reduce"
+ else:
+ preferred_method = "cohorts"
+
+ # Use a threshold to force some merging. We do not use the filtered
+ # containment matrix for estimating "sparsity" because it is a bit
+ # hard to reason about.
+ MIN_CONTAINMENT = 0.75 # arbitrary
mask = containment.data < MIN_CONTAINMENT
containment.data[mask] = 0
containment.eliminate_zeros()
# Iterate over labels, beginning with those with most chunks
+ logger.info("find_group_cohorts: merging cohorts")
order = np.argsort(containment.sum(axis=LABEL_AXIS))[::-1]
merged_cohorts = {}
merged_keys = set()
@@ -365,13 +416,14 @@ def find_group_cohorts(labels, chunks, expected_groups: None | pd.RangeIndex = N
merged_cohorts[chunk] = cohort
actual_ngroups = np.concatenate(tuple(merged_cohorts.values())).size
- expected_ngroups = bitmask.shape[LABEL_AXIS]
+ expected_ngroups = present_labels.size
assert expected_ngroups == actual_ngroups, (expected_ngroups, actual_ngroups)
# sort by first label in cohort
# This will help when sort=True (default)
# and we have to resort the dask array
- return dict(sorted(merged_cohorts.items(), key=lambda kv: kv[1][0]))
+ as_sorted = dict(sorted(merged_cohorts.items(), key=lambda kv: kv[1][0]))
+ return preferred_method, as_sorted
def rechunk_for_cohorts(
@@ -1551,9 +1603,7 @@ def dask_groupby_agg(
group_chunks = ((len(expected_groups),),)
elif method == "cohorts":
- chunks_cohorts = find_group_cohorts(
- by_input, [array.chunks[ax] for ax in axis], expected_groups=expected_groups
- )
+ assert chunks_cohorts
reduced_ = []
groups_ = []
for blks, cohort in chunks_cohorts.items():
@@ -1590,6 +1640,8 @@ def dask_groupby_agg(
groups = (expected_groups,)
group_chunks = ((len(expected_groups),),)
else:
+ # TODO: use chunks_cohorts here; hard because chunks_cohorts does not include all-NaN blocks
+ # but the array after applying the blockwise op; does. We'd have to insert a subsetting op.
# Here one input chunk → one output chunks
# find number of groups in each chunk, this is needed for output chunks
# along the reduced axis
@@ -1662,11 +1714,13 @@ def _extract_result(result_dict: FinalResultsDict, key) -> np.ndarray:
def _validate_reindex(
reindex: bool | None,
func,
- method: T_Method,
+ method: T_MethodOpt,
expected_groups,
any_by_dask: bool,
is_dask_array: bool,
-) -> bool:
+) -> bool | None:
+ logger.info("Entering _validate_reindex: reindex is {}".format(reindex)) # noqa
+
all_numpy = not is_dask_array and not any_by_dask
if reindex is True and not all_numpy:
if _is_arg_reduction(func):
@@ -1679,6 +1733,10 @@ def _validate_reindex(
raise ValueError("reindex must be None or False when func is 'first' or 'last.")
if reindex is None:
+ if method is None:
+ logger.info("Leaving _validate_reindex: method = None, returning None")
+ return None
+
if all_numpy:
return True
@@ -1703,6 +1761,7 @@ def _validate_reindex(
reindex = True
assert isinstance(reindex, bool)
+ logger.info("Leaving _validate_reindex: reindex is {}".format(reindex)) # noqa
return reindex
@@ -1868,6 +1927,33 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) ->
return expected_groups
+def _choose_method(
+ method: T_MethodOpt, preferred_method: T_Method, agg: Aggregation, by, nax: int
+) -> T_Method:
+ if method is None:
+ logger.info("_choose_method: method is None")
+ if agg.chunk == (None,):
+ if preferred_method != "blockwise":
+ raise ValueError(
+ f"Aggregation {agg.name} is only supported for `method='blockwise'`, "
+ "but the chunking is not right."
+ )
+ logger.info("_choose_method: choosing 'blockwise'")
+ return "blockwise"
+
+ if nax != by.ndim:
+ logger.info("_choose_method: choosing 'map-reduce'")
+ return "map-reduce"
+
+ if _is_arg_reduction(agg) and preferred_method == "blockwise":
+ return "cohorts"
+
+ logger.info("_choose_method: choosing preferred_method={}".format(preferred_method)) # noqa
+ return preferred_method
+ else:
+ return method
+
+
def _choose_engine(by, agg: Aggregation):
dtype = agg.dtype["user"]
@@ -1882,11 +1968,14 @@ def _choose_engine(by, agg: Aggregation):
if agg.name in ["all", "any"] or (
not_arg_reduce and has_blockwise_nan_skipping and dtype is None
):
+ logger.info("_choose_engine: Choosing 'numbagg'")
return "numbagg"
if not_arg_reduce and (not is_duck_dask_array(by) and _issorted(by)):
+ logger.info("_choose_engine: Choosing 'flox'")
return "flox"
else:
+ logger.info("_choose_engine: Choosing 'numpy'")
return "numpy"
@@ -1901,7 +1990,7 @@ def groupby_reduce(
fill_value=None,
dtype: np.typing.DTypeLike = None,
min_count: int | None = None,
- method: T_Method = "map-reduce",
+ method: T_MethodOpt = None,
engine: T_EngineOpt = None,
reindex: bool | None = None,
finalize_kwargs: dict[Any, Any] | None = None,
@@ -2076,6 +2165,7 @@ def groupby_reduce(
# can't do it if we are grouping by dask array but don't have expected_groups
any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups))
)
+ expected_: pd.RangeIndex | None
if factorize_early:
bys, final_groups, grp_shape = _factorize_multiple(
bys,
@@ -2083,17 +2173,18 @@ def groupby_reduce(
any_by_dask=any_by_dask,
sort=sort,
)
- expected_groups = (pd.RangeIndex(math.prod(grp_shape)),)
+ expected_ = pd.RangeIndex(math.prod(grp_shape))
+ else:
+ assert expected_groups == (None,)
+ expected_ = None
assert len(bys) == 1
(by_,) = bys
- (expected_groups,) = expected_groups
if axis is None:
axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0))
else:
- # TODO: How come this function doesn't exist according to mypy?
- axis_ = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore[attr-defined]
+ axis_ = normalize_axis_tuple(axis, array.ndim)
nax = len(axis_)
has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
@@ -2111,7 +2202,7 @@ def groupby_reduce(
"along a single axis or when reducing across all dimensions of `by`."
)
- if nax == 1 and by_.ndim > 1 and expected_groups is None:
+ if nax == 1 and by_.ndim > 1 and expected_ is None:
# When we reduce along all axes, we are guaranteed to see all
# groups in the final combine stage, so everything works.
# This is not necessarily true when reducing along a subset of axes
@@ -2161,7 +2252,7 @@ def groupby_reduce(
groups: tuple[np.ndarray | DaskArray, ...]
if not has_dask:
results = _reduce_blockwise(
- array, by_, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs
+ array, by_, agg, expected_groups=expected_, reindex=reindex, sort=sort, **kwargs
)
groups = (results["groups"],)
result = results[agg.name]
@@ -2177,12 +2268,42 @@ def groupby_reduce(
f"Received method={method!r}"
)
- if method in ["blockwise", "cohorts"] and nax != by_.ndim:
+ if (
+ _is_arg_reduction(agg)
+ and method == "blockwise"
+ and not all(nchunks == 1 for nchunks in array.numblocks[-nax:])
+ ):
+ raise NotImplementedError(
+ "arg-reductions are not supported with method='blockwise', use 'cohorts' instead."
+ )
+
+ if nax != by_.ndim and method in ["blockwise", "cohorts"]:
raise NotImplementedError(
"Must reduce along all dimensions of `by` when method != 'map-reduce'."
f"Received method={method!r}"
)
+ if (not any_by_dask and method is None) or method == "cohorts":
+ preferred_method, chunks_cohorts = find_group_cohorts(
+ by_,
+ [array.chunks[ax] for ax in axis_],
+ expected_groups=expected_,
+ # when provided with cohorts, we *always* 'merge'
+ merge=(method == "cohorts"),
+ )
+ else:
+ preferred_method = "map-reduce"
+ chunks_cohorts = {}
+
+ method = _choose_method(method, preferred_method, agg, by_, nax)
+ # TODO: clean this up
+ reindex = _validate_reindex(
+ reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array)
+ )
+
+ if TYPE_CHECKING:
+ assert method is not None
+
# TODO: just do this in dask_groupby_agg
# we always need some fill_value (see above) so choose the default if needed
if kwargs["fill_value"] is None:
@@ -2196,10 +2317,11 @@ def groupby_reduce(
result, groups = partial_agg(
array,
by_,
- expected_groups=expected_groups,
+ expected_groups=expected_,
agg=agg,
reindex=reindex,
method=method,
+ chunks_cohorts=chunks_cohorts,
sort=sort,
)
@@ -2215,9 +2337,9 @@ def groupby_reduce(
# nan group labels are factorized to -1, and preserved
# now we get rid of them by reindexing
# This also handles bins with no data
- result = reindex_(
- result, from_=groups[0], to=expected_groups, fill_value=fill_value
- ).reshape(result.shape[:-1] + grp_shape)
+ result = reindex_(result, from_=groups[0], to=expected_, fill_value=fill_value).reshape(
+ result.shape[:-1] + grp_shape
+ )
groups = final_groups
if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)):
=====================================
flox/visualize.py
=====================================
@@ -139,35 +139,31 @@ def visualize_cohorts_2d(by, chunks):
assert by.ndim == 2
print("finding cohorts...")
chunks = [chunks[ax] for ax in range(-by.ndim, 0)]
- before_merged = find_group_cohorts(by, chunks, merge=False)
- merged = find_group_cohorts(by, chunks, merge=True)
+ _, chunks_cohorts = find_group_cohorts(by, chunks)
print("finished cohorts...")
xticks = np.cumsum(chunks[-1])
yticks = np.cumsum(chunks[-2])
- f, ax = plt.subplots(1, 3, constrained_layout=True, sharex=False, sharey=False)
+ f, ax = plt.subplots(1, 2, constrained_layout=True, sharex=False, sharey=False)
ax = ax.ravel()
# ax[1].set_visible(False)
# ax = ax[[0, 2, 3]]
ngroups = len(_unique(by))
h0 = ax[0].imshow(by, vmin=0, cmap=get_colormap(ngroups))
- h1 = _visualize_cohorts(chunks, before_merged, ax=ax[1])
- h2 = _visualize_cohorts(chunks, merged, ax=ax[2])
+ h2 = _visualize_cohorts(chunks, chunks_cohorts, ax=ax[1])
- for axx in ax:
- axx.grid(True, which="both")
+ ax[0].grid(True, which="both")
for axx in ax[:1]:
axx.set_xticks(xticks)
axx.set_yticks(yticks)
- for h, axx in zip([h0, h1, h2], ax):
+ for h, axx in zip([h0, h2], ax):
f.colorbar(h, ax=axx, orientation="horizontal")
ax[0].set_title(f"by: {ngroups} groups")
- ax[1].set_title(f"{len(before_merged)} cohorts")
- ax[2].set_title(f"{len(merged)} merged cohorts")
- f.set_size_inches((12, 6))
+ ax[1].set_title(f"{len(chunks_cohorts)} cohorts")
+ f.set_size_inches((9, 6))
def _visualize_cohorts(chunks, cohorts, ax=None):
=====================================
tests/__init__.py
=====================================
@@ -45,6 +45,7 @@ def LooseVersion(vstring):
return packaging.version.Version(vstring)
+has_cftime, requires_cftime = _importorskip("cftime")
has_dask, requires_dask = _importorskip("dask")
has_numba, requires_numba = _importorskip("numba")
has_numbagg, requires_numbagg = _importorskip("numbagg")
=====================================
tests/test_core.py
=====================================
@@ -1,6 +1,7 @@
from __future__ import annotations
import itertools
+import logging
import warnings
from functools import partial, reduce
from typing import TYPE_CHECKING, Callable
@@ -38,6 +39,9 @@ from . import (
requires_scipy,
)
+logger = logging.getLogger("flox")
+logger.setLevel(logging.DEBUG)
+
labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0])
nan_labels = labels.astype(float) # copy
nan_labels[:5] = np.nan
@@ -293,7 +297,7 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine):
assert_equal(actual_group, expect)
if "arg" in func:
assert actual.dtype.kind == "i"
- assert_equal(actual, expected, tolerance)
+ assert_equal(expected, actual, tolerance)
if not has_dask or chunks is None or func in BLOCKWISE_FUNCS:
continue
@@ -851,7 +855,9 @@ def test_rechunk_for_blockwise(inchunks, expected):
],
)
def test_find_group_cohorts(expected, labels, chunks: tuple[int]) -> None:
- actual = list(find_group_cohorts(labels, (chunks,)).values())
+ # force merging of cohorts for the test
+ _, chunks_cohorts = find_group_cohorts(labels, (chunks,), merge=True)
+ actual = list(chunks_cohorts.values())
assert actual == expected, (actual, expected)
@@ -873,13 +879,31 @@ def test_verify_complex_cohorts(chunksize: int) -> None:
if len(by) != sum(chunks):
chunks += (len(by) - sum(chunks),)
- chunk_cohorts = find_group_cohorts(by - 1, (chunks,))
+ _, chunk_cohorts = find_group_cohorts(by - 1, (chunks,))
chunks_ = np.sort(np.concatenate(tuple(chunk_cohorts.keys())))
groups = np.sort(np.concatenate(tuple(chunk_cohorts.values())))
assert_equal(np.unique(chunks_).astype(np.int64), np.arange(len(chunks), dtype=np.int64))
assert_equal(groups.astype(np.int64), np.arange(366, dtype=np.int64))
+ at requires_dask
+ at pytest.mark.parametrize("chunksize", (12,) + tuple(range(1, 13)) + (-1,))
+def test_method_guessing(chunksize):
+ # just a regression test
+ labels = np.tile(np.arange(1, 13), 30)
+ by = dask.array.from_array(labels, chunks=chunksize) - 1
+ preferred_method, chunks_cohorts = find_group_cohorts(labels, by.chunks[slice(-1, None)])
+ if chunksize == -1:
+ assert preferred_method == "blockwise"
+ assert chunks_cohorts == {(0,): list(range(1, 13))}
+ elif chunksize in (1, 2, 3, 4, 6):
+ assert preferred_method == "cohorts"
+ assert len(chunks_cohorts) == 12 // chunksize
+ else:
+ assert preferred_method == "map-reduce"
+ assert chunks_cohorts == {}
+
+
@requires_dask
@pytest.mark.parametrize(
"chunk_at,expected",
=====================================
tests/test_xarray.py
=====================================
@@ -8,7 +8,13 @@ xr = pytest.importorskip("xarray")
from flox.xarray import rechunk_for_blockwise, xarray_reduce
-from . import assert_equal, has_dask, raise_if_dask_computes, requires_dask
+from . import (
+ assert_equal,
+ has_dask,
+ raise_if_dask_computes,
+ requires_cftime,
+ requires_dask,
+)
if has_dask:
import dask
@@ -178,10 +184,18 @@ def test_validate_expected_groups(expected_groups):
)
+ at requires_cftime
@requires_dask
def test_xarray_reduce_single_grouper(engine):
# DataArray
- ds = xr.tutorial.open_dataset("rasm", chunks={"time": 9})
+ ds = xr.Dataset(
+ {"Tair": (("time", "x", "y"), dask.array.ones((36, 205, 275), chunks=(9, -1, -1)))},
+ coords={
+ "time": xr.date_range(
+ "1980-09-01 00:00", "1983-09-18 00:00", freq="ME", calendar="noleap"
+ )
+ },
+ )
actual = xarray_reduce(ds.Tair, ds.time.dt.month, func="mean", engine=engine)
expected = ds.Tair.groupby("time.month").mean()
xr.testing.assert_allclose(actual, expected)
@@ -355,7 +369,14 @@ def test_xarray_groupby_bins(chunks, engine):
def test_func_is_aggregation():
from flox.aggregations import mean
- ds = xr.tutorial.open_dataset("rasm", chunks={"time": 9})
+ ds = xr.Dataset(
+ {"Tair": (("time", "x", "y"), dask.array.ones((36, 205, 275), chunks=(9, -1, -1)))},
+ coords={
+ "time": xr.date_range(
+ "1980-09-01 00:00", "1983-09-18 00:00", freq="ME", calendar="noleap"
+ )
+ },
+ )
expected = xarray_reduce(ds.Tair, ds.time.dt.month, func="mean")
actual = xarray_reduce(ds.Tair, ds.time.dt.month, func=mean)
xr.testing.assert_allclose(actual, expected)
@@ -392,10 +413,18 @@ def test_func_is_aggregation():
@requires_dask
@pytest.mark.parametrize("method", ["cohorts", "map-reduce"])
def test_groupby_bins_indexed_coordinate(method):
- ds = (
- xr.tutorial.open_dataset("air_temperature")
- .isel(time=slice(100))
- .chunk({"time": 20, "lat": 5})
+ ds = xr.Dataset(
+ {
+ "air": (
+ ("time", "lat", "lon"),
+ dask.array.random.random((125, 25, 53), chunks=(20, 5, -1)),
+ )
+ },
+ coords={
+ "time": pd.date_range("2013-01-01", "2013-02-01", freq="6H"),
+ "lat": np.arange(75.0, 14.9, -2.5),
+ "lon": np.arange(200.0, 331.0, 2.5),
+ },
)
bins = [40, 50, 60, 70]
expected = ds.groupby_bins("lat", bins=bins).mean(keep_attrs=True, dim=...)
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/6907084c2fffa5a695c4c77586f71d246c2ae0a5
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/6907084c2fffa5a695c4c77586f71d246c2ae0a5
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240127/b98a7ef7/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list