[Git][debian-gis-team/flox][upstream] New upstream version 0.9.6
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Thu Mar 28 08:19:41 GMT 2024
Antonio Valentino pushed to branch upstream at Debian GIS Project / flox
Commits:
2cc87746 by Antonio Valentino at 2024-03-28T08:05:13+00:00
New upstream version 0.9.6
- - - - -
14 changed files:
- .github/workflows/ci-additional.yaml
- .github/workflows/ci.yaml
- .github/workflows/upstream-dev-ci.yaml
- asv_bench/benchmarks/cohorts.py
- ci/benchmark.yml
- docs/source/implementation.md
- flox/aggregate_flox.py
- flox/aggregate_npg.py
- flox/aggregate_numbagg.py
- flox/core.py
- flox/xarray.py
- tests/__init__.py
- tests/test_core.py
- tests/test_xarray.py
Changes:
=====================================
.github/workflows/ci-additional.yaml
=====================================
@@ -77,7 +77,7 @@ jobs:
--ignore flox/tests \
--cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v4.0.0
+ uses: codecov/codecov-action at v4.1.0
with:
file: ./coverage.xml
flags: unittests
@@ -131,7 +131,7 @@ jobs:
python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
- uses: codecov/codecov-action at v4.0.0
+ uses: codecov/codecov-action at v4.1.0
with:
file: mypy_report/cobertura.xml
flags: mypy
=====================================
.github/workflows/ci.yaml
=====================================
@@ -49,7 +49,7 @@ jobs:
run: |
pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v4.0.0
+ uses: codecov/codecov-action at v4.1.0
with:
file: ./coverage.xml
flags: unittests
@@ -93,7 +93,7 @@ jobs:
run: |
python -m pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
- uses: codecov/codecov-action at v4.0.0
+ uses: codecov/codecov-action at v4.1.0
with:
file: ./coverage.xml
flags: unittests
=====================================
.github/workflows/upstream-dev-ci.yaml
=====================================
@@ -10,6 +10,7 @@ on:
paths:
- ".github/workflows/upstream-dev-ci.yaml"
- "ci/upstream-dev-env.yml"
+ - "flox/*"
schedule:
- cron: "0 0 * * *" # Daily “At 00:00” UTC
workflow_dispatch: # allows you to trigger the workflow run manually
=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -95,7 +95,7 @@ class ERA5Dataset:
"""ERA5"""
def __init__(self, *args, **kwargs):
- self.time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H"))
+ self.time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="h"))
self.axis = (-1,)
self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 48))
@@ -143,7 +143,7 @@ class PerfectMonthly(Cohorts):
"""Perfectly chunked for a "cohorts" monthly mean climatology"""
def setup(self, *args, **kwargs):
- self.time = pd.Series(pd.date_range("1961-01-01", "2018-12-31 23:59", freq="M"))
+ self.time = pd.Series(pd.date_range("1961-01-01", "2018-12-31 23:59", freq="ME"))
self.axis = (-1,)
self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 4))
self.by = self.time.dt.month.values - 1
@@ -164,7 +164,7 @@ class PerfectMonthly(Cohorts):
class ERA5Google(Cohorts):
def setup(self, *args, **kwargs):
TIME = 900 # 92044 in Google ARCO ERA5
- self.time = pd.Series(pd.date_range("1959-01-01", freq="6H", periods=TIME))
+ self.time = pd.Series(pd.date_range("1959-01-01", freq="6h", periods=TIME))
self.axis = (2,)
self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, 1))
self.by = self.time.dt.day.values - 1
@@ -201,3 +201,12 @@ class OISST(Cohorts):
self.time = pd.Series(index)
self.by = self.time.dt.dayofyear.values - 1
self.expected = pd.RangeIndex(self.by.max() + 1)
+
+
+class RandomBigArray(Cohorts):
+ def setup(self, *args, **kwargs):
+ M, N = 100_000, 20_000
+ self.array = dask.array.random.normal(size=(M, N), chunks=(10_000, N // 5)).T
+ self.by = np.random.choice(5_000, size=M)
+ self.expected = pd.RangeIndex(5000)
+ self.axis = (1,)
=====================================
ci/benchmark.yml
=====================================
@@ -3,6 +3,7 @@ channels:
- conda-forge
dependencies:
- asv
+ - build
- cachey
- dask-core
- numpy>=1.22
=====================================
docs/source/implementation.md
=====================================
@@ -300,8 +300,10 @@ label overlaps with all other labels. The algorithm is as follows.
![cohorts-schematic](/../diagrams/containment.png)
1. To choose between `"map-reduce"` and `"cohorts"`, we need a summary measure of the degree to which the labels overlap with
- each other. We use _sparsity_ --- the number of non-zero elements in `C` divided by the number of elements in `C`, `C.nnz/C.size`.
- When sparsity > 0.6, we choose `"map-reduce"` since there is decent overlap between (any) cohorts. Otherwise we use `"cohorts"`.
+ each other. We can use _sparsity_ --- the number of non-zero elements in `C` divided by the number of elements in `C`, `C.nnz/C.size`.
+ We use sparsity(`S`) as an approximation for the sparsity(`C`) to avoid a potentially expensive sparse matrix dot product when `S`
+ isn't particularly sparse. When sparsity(`S`) > 0.4 (arbitrary), we choose `"map-reduce"` since there is decent overlap between
+ (any) cohorts. Otherwise we use `"cohorts"`.
Cool, isn't it?!
=====================================
flox/aggregate_flox.py
=====================================
@@ -37,7 +37,8 @@ def _lerp(a, b, *, t, dtype, out=None):
"""
if out is None:
out = np.empty_like(a, dtype=dtype)
- diff_b_a = np.subtract(b, a)
+ with np.errstate(invalid="ignore"):
+ diff_b_a = np.subtract(b, a)
# asanyarray is a stop-gap until gh-13105
np.add(a, diff_b_a * t, out=out)
np.subtract(b, diff_b_a * (1 - t), out=out, where=t >= 0.5)
@@ -95,7 +96,8 @@ def quantile_(array, inv_idx, *, q, axis, skipna, group_idx, dtype=None, out=Non
# partition the complex array in-place
labels_broadcast = np.broadcast_to(group_idx, array.shape)
- cmplx = labels_broadcast + 1j * array
+ with np.errstate(invalid="ignore"):
+ cmplx = labels_broadcast + 1j * array
cmplx.partition(kth=kth, axis=-1)
if is_scalar_q:
a_ = cmplx.imag
=====================================
flox/aggregate_npg.py
=====================================
@@ -88,6 +88,8 @@ def nanprod(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dt
def _len(group_idx, array, engine, *, func, axis=-1, size=None, fill_value=None, dtype=None):
+ if array.dtype.kind in "US":
+ array = np.broadcast_to(np.array([1]), array.shape)
result = _get_aggregate(engine).aggregate(
group_idx,
array,
=====================================
flox/aggregate_numbagg.py
=====================================
@@ -105,11 +105,24 @@ def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None,
)
+def nanlen(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
+ if array.dtype.kind in "US":
+ array = np.broadcast_to(np.array([1]), array.shape)
+ return _numbagg_wrapper(
+ group_idx,
+ array,
+ axis=axis,
+ size=size,
+ func="nancount",
+ # fill_value=fill_value,
+ # dtype=dtype,
+ )
+
+
nansum = partial(_numbagg_wrapper, func="nansum")
nanmean = partial(_numbagg_wrapper, func="nanmean")
nanprod = partial(_numbagg_wrapper, func="nanprod")
nansum_of_squares = partial(_numbagg_wrapper, func="nansum_of_squares")
-nanlen = partial(_numbagg_wrapper, func="nancount")
nanprod = partial(_numbagg_wrapper, func="nanprod")
nanfirst = partial(_numbagg_wrapper, func="nanfirst")
nanlast = partial(_numbagg_wrapper, func="nanlast")
=====================================
flox/core.py
=====================================
@@ -363,37 +363,55 @@ def find_group_cohorts(
logger.info("find_group_cohorts: cohorts is preferred, chunking is perfect.")
return "cohorts", chunks_cohorts
- # Containment = |Q & S| / |Q|
+ # We'll use containment to measure degree of overlap between labels.
+ # Containment C = |Q & S| / |Q|
# - |X| is the cardinality of set X
# - Q is the query set being tested
# - S is the existing set
- # We'll use containment to measure degree of overlap between labels. The bitmask
- # matrix allows us to calculate this pretty efficiently.
- asfloat = bitmask.astype(float)
- # Note: While A.T @ A is a symmetric matrix, the division by chunks_per_label
- # makes it non-symmetric.
- containment = csr_array((asfloat.T @ asfloat) / chunks_per_label)
-
- # The containment matrix is a measure of how much the labels overlap
- # with each other. We treat the sparsity = (nnz/size) as a summary measure of the net overlap.
+ # The bitmask matrix S allows us to calculate this pretty efficiently using a dot product.
+ # S.T @ S / chunks_per_label
+ #
+ # We treat the sparsity(C) = (nnz/size) as a summary measure of the net overlap.
# 1. For high enough sparsity, there is a lot of overlap and we should use "map-reduce".
# 2. When labels are uniformly distributed amongst all chunks
# (and number of labels < chunk size), sparsity is 1.
# 3. Time grouping cohorts (e.g. dayofyear) appear as lines in this matrix.
# 4. When there are no overlaps at all between labels, containment is a block diagonal matrix
# (approximately).
- MAX_SPARSITY_FOR_COHORTS = 0.6 # arbitrary
- sparsity = containment.nnz / math.prod(containment.shape)
+ #
+ # However computing S.T @ S can still be the slowest step, especially if S
+ # is not particularly sparse. Empirically the sparsity( S.T @ S ) > min(1, 2 x sparsity(S)).
+ # So we use sparsity(S) as a shortcut.
+ MAX_SPARSITY_FOR_COHORTS = 0.4 # arbitrary
+ sparsity = bitmask.nnz / math.prod(bitmask.shape)
preferred_method: Literal["map-reduce"] | Literal["cohorts"]
+ logger.debug(
+ "sparsity of bitmask is {}, threshold is {}".format( # noqa
+ sparsity, MAX_SPARSITY_FOR_COHORTS
+ )
+ )
if sparsity > MAX_SPARSITY_FOR_COHORTS:
- logger.info("sparsity is {}".format(sparsity)) # noqa
if not merge:
- logger.info("find_group_cohorts: merge=False, choosing 'map-reduce'")
+ logger.info(
+ "find_group_cohorts: bitmask sparsity={}, merge=False, choosing 'map-reduce'".format( # noqa
+ sparsity
+ )
+ )
return "map-reduce", {}
preferred_method = "map-reduce"
else:
preferred_method = "cohorts"
+ # Note: While A.T @ A is a symmetric matrix, the division by chunks_per_label
+ # makes it non-symmetric.
+ asfloat = bitmask.astype(float)
+ containment = csr_array(asfloat.T @ asfloat / chunks_per_label)
+
+ logger.debug(
+ "sparsity of containment matrix is {}".format( # noqa
+ containment.nnz / math.prod(containment.shape)
+ )
+ )
# Use a threshold to force some merging. We do not use the filtered
# containment matrix for estimating "sparsity" because it is a bit
# hard to reason about.
=====================================
flox/xarray.py
=====================================
@@ -201,10 +201,10 @@ def xarray_reduce(
>>> da = da = xr.ones_like(labels)
>>> # Sum all values in da that matches the elements in the group index:
>>> xarray_reduce(da, labels, func="sum")
- <xarray.DataArray 'label' (label: 4)>
+ <xarray.DataArray 'label' (label: 4)> Size: 32B
array([3, 2, 2, 2])
Coordinates:
- * label (label) int64 0 1 2 3
+ * label (label) int64 32B 0 1 2 3
"""
if skipna is not None and isinstance(func, Aggregation):
@@ -259,7 +259,7 @@ def xarray_reduce(
for var in maybe_drop:
maybe_midx = ds._indexes.get(var, None)
if isinstance(maybe_midx, PandasMultiIndex):
- idx_coord_names = set(maybe_midx.index.names + [maybe_midx.dim])
+ idx_coord_names = set(tuple(maybe_midx.index.names) + (maybe_midx.dim,))
idx_other_names = idx_coord_names - set(maybe_drop)
more_drop.update(idx_other_names)
maybe_drop.update(more_drop)
@@ -303,14 +303,17 @@ def xarray_reduce(
# reducing along a dimension along which groups do not vary
# This is really just a normal reduction.
# This is not right when binning so we exclude.
- if isinstance(func, str):
- dsfunc = func[3:] if skipna else func
- else:
+ if isinstance(func, str) and func.startswith("nan"):
+ raise ValueError(f"Specify func={func[3:]}, skipna=True instead of func={func}")
+ elif isinstance(func, Aggregation):
raise NotImplementedError(
"func must be a string when reducing along a dimension not present in `by`"
)
- # TODO: skipna needs test
- result = getattr(ds_broad, dsfunc)(dim=dim_tuple, skipna=skipna)
+ # skipna is not supported for all reductions
+ # https://github.com/pydata/xarray/issues/8819
+ kwargs = {"skipna": skipna} if skipna is not None else {}
+ kwargs.update(finalize_kwargs)
+ result = getattr(ds_broad, func)(dim=dim_tuple, **kwargs)
if isinstance(obj, xr.DataArray):
return obj._from_temp_dataset(result)
else:
=====================================
tests/__init__.py
=====================================
@@ -124,3 +124,35 @@ def assert_equal_tuple(a, b):
np.testing.assert_array_equal(a_, b_)
else:
assert a_ == b_
+
+
+SCIPY_STATS_FUNCS = ("mode", "nanmode")
+BLOCKWISE_FUNCS = ("median", "nanmedian", "quantile", "nanquantile") + SCIPY_STATS_FUNCS
+ALL_FUNCS = (
+ "sum",
+ "nansum",
+ "argmax",
+ "nanfirst",
+ "nanargmax",
+ "prod",
+ "nanprod",
+ "mean",
+ "nanmean",
+ "var",
+ "nanvar",
+ "std",
+ "nanstd",
+ "max",
+ "nanmax",
+ "min",
+ "nanmin",
+ "argmin",
+ "nanargmin",
+ "any",
+ "all",
+ "nanlast",
+ "median",
+ "nanmedian",
+ "quantile",
+ "nanquantile",
+) + tuple(pytest.param(func, marks=requires_scipy) for func in SCIPY_STATS_FUNCS)
=====================================
tests/test_core.py
=====================================
@@ -31,12 +31,14 @@ from flox.core import (
)
from . import (
+ ALL_FUNCS,
+ BLOCKWISE_FUNCS,
+ SCIPY_STATS_FUNCS,
assert_equal,
assert_equal_tuple,
has_dask,
raise_if_dask_computes,
requires_dask,
- requires_scipy,
)
logger = logging.getLogger("flox")
@@ -60,36 +62,6 @@ else:
DEFAULT_QUANTILE = 0.9
-SCIPY_STATS_FUNCS = ("mode", "nanmode")
-BLOCKWISE_FUNCS = ("median", "nanmedian", "quantile", "nanquantile") + SCIPY_STATS_FUNCS
-ALL_FUNCS = (
- "sum",
- "nansum",
- "argmax",
- "nanfirst",
- "nanargmax",
- "prod",
- "nanprod",
- "mean",
- "nanmean",
- "var",
- "nanvar",
- "std",
- "nanstd",
- "max",
- "nanmax",
- "min",
- "nanmin",
- "argmin",
- "nanargmin",
- "any",
- "all",
- "nanlast",
- "median",
- "nanmedian",
- "quantile",
- "nanquantile",
-) + tuple(pytest.param(func, marks=requires_scipy) for func in SCIPY_STATS_FUNCS)
if TYPE_CHECKING:
from flox.core import T_Agg, T_Engine, T_ExpectedGroupsOpt, T_Method
@@ -908,7 +880,7 @@ def test_find_cohorts_missing_groups():
@pytest.mark.parametrize("chunksize", [12, 13, 14, 24, 36, 48, 72, 71])
def test_verify_complex_cohorts(chunksize: int) -> None:
- time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H"))
+ time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="h"))
chunks = (chunksize,) * (len(time) // chunksize)
by = np.array(time.dt.dayofyear.values)
@@ -1091,7 +1063,7 @@ def test_empty_bins(func, engine):
def test_datetime_binning():
- time_bins = pd.date_range(start="2010-08-01", end="2010-08-15", freq="24H")
+ time_bins = pd.date_range(start="2010-08-01", end="2010-08-15", freq="24h")
by = pd.date_range("2010-08-01", "2010-08-15", freq="15min")
(actual,) = _convert_expected_groups_to_index((time_bins,), isbin=(True,), sort=False)
@@ -1153,9 +1125,9 @@ def test_group_by_datetime(engine, method):
if method == "blockwise":
return None
- edges = pd.date_range("1999-12-31", "2000-12-31", freq="M").to_series().to_numpy()
+ edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy()
actual, _ = groupby_reduce(daskarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs)
- expected = data.resample("M").mean().to_numpy()
+ expected = data.resample("ME").mean().to_numpy()
assert_equal(expected, actual)
actual, _ = groupby_reduce(
@@ -1548,7 +1520,7 @@ def test_validate_reindex() -> None:
@requires_dask
def test_1d_blockwise_sort_optimization():
# Make sure for resampling problems sorting isn't done.
- time = pd.Series(pd.date_range("2020-09-01", "2020-12-31 23:59", freq="3H"))
+ time = pd.Series(pd.date_range("2020-09-01", "2020-12-31 23:59", freq="3h"))
array = dask.array.ones((len(time),), chunks=(224,))
actual, _ = groupby_reduce(array, time.dt.dayofyear.values, method="blockwise", func="count")
@@ -1710,7 +1682,18 @@ def test_multiple_quantiles(q, chunk, func, by_ndim):
actual, _ = groupby_reduce(array, labels, func=func, finalize_kwargs=dict(q=q), axis=axis)
sorted_array = array[..., [0, 1, 2, 4, 3, 5, 6]]
f = partial(getattr(np, func), q=q, axis=axis, keepdims=True)
+ if chunk:
+ sorted_array = sorted_array.compute()
expected = np.concatenate((f(sorted_array[..., :4]), f(sorted_array[..., 4:])), axis=-1)
if by_ndim == 2:
expected = expected.squeeze(axis=-2)
assert_equal(expected, actual, tolerance=1e-14)
+
+
+ at pytest.mark.parametrize("dtype", ["U3", "S3"])
+def test_nanlen_string(dtype, engine):
+ array = np.array(["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"], dtype=dtype)
+ by = np.array([0, 0, 1, 2, 1, 0])
+ expected = np.array([3, 2, 1], dtype=np.intp)
+ actual, *_ = groupby_reduce(array, by, func="count", engine=engine)
+ assert_equal(expected, actual)
=====================================
tests/test_xarray.py
=====================================
@@ -9,6 +9,7 @@ xr = pytest.importorskip("xarray")
from flox.xarray import rechunk_for_blockwise, xarray_reduce
from . import (
+ ALL_FUNCS,
assert_equal,
has_dask,
raise_if_dask_computes,
@@ -710,3 +711,32 @@ def test_multiple_quantiles(q, chunk, by_ndim, skipna):
with xr.set_options(use_flox=False):
expected = da.groupby(by).quantile(q, skipna=skipna)
xr.testing.assert_allclose(expected, actual)
+
+
+ at pytest.mark.parametrize("func", ALL_FUNCS)
+def test_direct_reduction(func):
+ if "arg" in func or "mode" in func:
+ pytest.skip()
+ # regression test for https://github.com/pydata/xarray/issues/8819
+ rand = np.random.choice([True, False], size=(2, 3))
+ if func not in ["any", "all"]:
+ rand = rand.astype(float)
+
+ if "nan" in func:
+ func = func[3:]
+ kwargs = {"skipna": True}
+ else:
+ kwargs = {}
+
+ if "first" not in func and "last" not in func:
+ kwargs["dim"] = "y"
+
+ if "quantile" in func:
+ kwargs["q"] = 0.9
+
+ data = xr.DataArray(rand, dims=("x", "y"), coords={"x": [10, 20], "y": [0, 1, 2]})
+ with xr.set_options(use_flox=True):
+ actual = xarray_reduce(data, "x", func=func, **kwargs)
+ with xr.set_options(use_flox=False):
+ expected = getattr(data.groupby("x", squeeze=False), func)(**kwargs)
+ xr.testing.assert_identical(expected, actual)
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/2cc87746821d9aeff2bb389981cd152e905f68ce
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/2cc87746821d9aeff2bb389981cd152e905f68ce
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240328/337bc7ba/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list