[Git][debian-gis-team/flox][upstream] New upstream version 0.9.10
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Thu Aug 15 08:09:40 BST 2024
Antonio Valentino pushed to branch upstream at Debian GIS Project / flox
Commits:
9fbeaa3c by Antonio Valentino at 2024-08-15T06:20:43+00:00
New upstream version 0.9.10
- - - - -
12 changed files:
- ci/benchmark.yml
- ci/docs.yml
- flox/__init__.py
- flox/aggregate_flox.py
- flox/aggregations.py
- flox/core.py
- flox/xrdtypes.py
- readthedocs.yml
- tests/conftest.py
- + tests/test_asv.py
- tests/test_core.py
- tests/test_properties.py
Changes:
=====================================
ci/benchmark.yml
=====================================
@@ -6,7 +6,7 @@ dependencies:
- build
- cachey
- dask-core
- - numpy>=1.22
+ - numpy<2
- mamba
- pip
- python=3.10
=====================================
ci/docs.yml
=====================================
@@ -16,7 +16,7 @@ dependencies:
- myst-parser
- myst-nb
- sphinx
- - furo
+ - furo>=2024.08
- ipykernel
- jupyter
- sphinx-codeautolink
=====================================
flox/__init__.py
=====================================
@@ -2,7 +2,7 @@
# flake8: noqa
"""Top-level module for flox ."""
from . import cache
-from .aggregations import Aggregation # noqa
+from .aggregations import Aggregation, Scan # noqa
from .core import groupby_reduce, groupby_scan, rechunk_for_blockwise, rechunk_for_cohorts # noqa
=====================================
flox/aggregate_flox.py
=====================================
@@ -2,6 +2,7 @@ from functools import partial
import numpy as np
+from . import xrdtypes as dtypes
from .xrutils import is_scalar, isnull, notnull
@@ -98,7 +99,7 @@ def quantile_(array, inv_idx, *, q, axis, skipna, group_idx, dtype=None, out=Non
# partition the complex array in-place
labels_broadcast = np.broadcast_to(group_idx, array.shape)
with np.errstate(invalid="ignore"):
- cmplx = labels_broadcast + 1j * array
+ cmplx = labels_broadcast + 1j * (array.view(int) if array.dtype.kind in "Mm" else array)
cmplx.partition(kth=kth, axis=-1)
if is_scalar_q:
a_ = cmplx.imag
@@ -158,6 +159,8 @@ def _np_grouped_op(
def _nan_grouped_op(group_idx, array, func, fillna, *args, **kwargs):
+ if fillna in [dtypes.INF, dtypes.NINF]:
+ fillna = dtypes._get_fill_value(kwargs.get("dtype", None) or array.dtype, fillna)
result = func(group_idx, np.where(isnull(array), fillna, array), *args, **kwargs)
# np.nanmax([np.nan, np.nan]) = np.nan
# To recover this behaviour, we need to search for the fillna value
@@ -175,9 +178,9 @@ nansum = partial(_nan_grouped_op, func=sum, fillna=0)
prod = partial(_np_grouped_op, op=np.multiply.reduceat)
nanprod = partial(_nan_grouped_op, func=prod, fillna=1)
max = partial(_np_grouped_op, op=np.maximum.reduceat)
-nanmax = partial(_nan_grouped_op, func=max, fillna=-np.inf)
+nanmax = partial(_nan_grouped_op, func=max, fillna=dtypes.NINF)
min = partial(_np_grouped_op, op=np.minimum.reduceat)
-nanmin = partial(_nan_grouped_op, func=min, fillna=np.inf)
+nanmin = partial(_nan_grouped_op, func=min, fillna=dtypes.INF)
quantile = partial(_np_grouped_op, op=partial(quantile_, skipna=False))
nanquantile = partial(_np_grouped_op, op=partial(quantile_, skipna=True))
median = partial(partial(_np_grouped_op, q=0.5), op=partial(quantile_, skipna=False))
=====================================
flox/aggregations.py
=====================================
@@ -115,60 +115,6 @@ def generic_aggregate(
return result
-def _normalize_dtype(dtype: DTypeLike, array_dtype: np.dtype, fill_value=None) -> np.dtype:
- if dtype is None:
- dtype = array_dtype
- if dtype is np.floating:
- # mean, std, var always result in floating
- # but we preserve the array's dtype if it is floating
- if array_dtype.kind in "fcmM":
- dtype = array_dtype
- else:
- dtype = np.dtype("float64")
- elif not isinstance(dtype, np.dtype):
- dtype = np.dtype(dtype)
- if fill_value not in [None, dtypes.INF, dtypes.NINF, dtypes.NA]:
- dtype = np.result_type(dtype, fill_value)
- return dtype
-
-
-def _maybe_promote_int(dtype) -> np.dtype:
- # https://numpy.org/doc/stable/reference/generated/numpy.prod.html
- # The dtype of a is used by default unless a has an integer dtype of less precision
- # than the default platform integer.
- if not isinstance(dtype, np.dtype):
- dtype = np.dtype(dtype)
- if dtype.kind == "i":
- dtype = np.result_type(dtype, np.intp)
- elif dtype.kind == "u":
- dtype = np.result_type(dtype, np.uintp)
- return dtype
-
-
-def _get_fill_value(dtype, fill_value):
- """Returns dtype appropriate infinity. Returns +Inf equivalent for None."""
- if fill_value in [None, dtypes.NA] and dtype.kind in "US":
- return ""
- if fill_value == dtypes.INF or fill_value is None:
- return dtypes.get_pos_infinity(dtype, max_for_int=True)
- if fill_value == dtypes.NINF:
- return dtypes.get_neg_infinity(dtype, min_for_int=True)
- if fill_value == dtypes.NA:
- if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
- return np.nan
- # This is madness, but npg checks that fill_value is compatible
- # with array dtype even if the fill_value is never used.
- elif (
- np.issubdtype(dtype, np.integer)
- or np.issubdtype(dtype, np.timedelta64)
- or np.issubdtype(dtype, np.datetime64)
- ):
- return dtypes.get_neg_infinity(dtype, min_for_int=True)
- else:
- return None
- return fill_value
-
-
def _atleast_1d(inp, min_length: int = 1):
if xrutils.is_scalar(inp):
inp = (inp,) * min_length
@@ -210,6 +156,7 @@ class Aggregation:
final_dtype: DTypeLike | None = None,
reduction_type: Literal["reduce", "argreduce"] = "reduce",
new_dims_func: Callable | None = None,
+ preserves_dtype: bool = False,
):
"""
Blueprint for computing grouped aggregations.
@@ -256,6 +203,8 @@ class Aggregation:
Function that receives finalize_kwargs and returns a tupleof sizes of any new dimensions
added by the reduction. For e.g. quantile for q=(0.5, 0.85) adds a new dimension of size 2,
so returns (2,)
+ preserves_dtype: bool,
+ Whether a function preserves the dtype on return E.g. min, max, first, last, mode
"""
self.name = name
# preprocess before blockwise
@@ -292,6 +241,7 @@ class Aggregation:
self.new_dims_func: Callable = (
returns_empty_tuple if new_dims_func is None else new_dims_func
)
+ self.preserves_dtype = preserves_dtype
@cached_property
def new_dims(self) -> tuple[Dim]:
@@ -434,10 +384,14 @@ nanstd = Aggregation(
)
-min_ = Aggregation("min", chunk="min", combine="min", fill_value=dtypes.INF)
-nanmin = Aggregation("nanmin", chunk="nanmin", combine="nanmin", fill_value=np.nan)
-max_ = Aggregation("max", chunk="max", combine="max", fill_value=dtypes.NINF)
-nanmax = Aggregation("nanmax", chunk="nanmax", combine="nanmax", fill_value=np.nan)
+min_ = Aggregation("min", chunk="min", combine="min", fill_value=dtypes.INF, preserves_dtype=True)
+nanmin = Aggregation(
+ "nanmin", chunk="nanmin", combine="nanmin", fill_value=dtypes.NA, preserves_dtype=True
+)
+max_ = Aggregation("max", chunk="max", combine="max", fill_value=dtypes.NINF, preserves_dtype=True)
+nanmax = Aggregation(
+ "nanmax", chunk="nanmax", combine="nanmax", fill_value=dtypes.NA, preserves_dtype=True
+)
def argreduce_preprocess(array, axis):
@@ -525,10 +479,14 @@ nanargmin = Aggregation(
final_dtype=np.intp,
)
-first = Aggregation("first", chunk=None, combine=None, fill_value=None)
-last = Aggregation("last", chunk=None, combine=None, fill_value=None)
-nanfirst = Aggregation("nanfirst", chunk="nanfirst", combine="nanfirst", fill_value=dtypes.NA)
-nanlast = Aggregation("nanlast", chunk="nanlast", combine="nanlast", fill_value=dtypes.NA)
+first = Aggregation("first", chunk=None, combine=None, fill_value=None, preserves_dtype=True)
+last = Aggregation("last", chunk=None, combine=None, fill_value=None, preserves_dtype=True)
+nanfirst = Aggregation(
+ "nanfirst", chunk="nanfirst", combine="nanfirst", fill_value=dtypes.NA, preserves_dtype=True
+)
+nanlast = Aggregation(
+ "nanlast", chunk="nanlast", combine="nanlast", fill_value=dtypes.NA, preserves_dtype=True
+)
all_ = Aggregation(
"all",
@@ -579,8 +537,12 @@ nanquantile = Aggregation(
final_dtype=np.floating,
new_dims_func=quantile_new_dims_func,
)
-mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None)
-nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None)
+mode = Aggregation(
+ name="mode", fill_value=dtypes.NA, chunk=None, combine=None, preserves_dtype=True
+)
+nanmode = Aggregation(
+ name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None, preserves_dtype=True
+)
@dataclass
@@ -634,7 +596,7 @@ class AlignedArrays:
# TODO: automate?
engine="flox",
dtype=self.array.dtype,
- fill_value=_get_fill_value(self.array.dtype, dtypes.NA),
+ fill_value=dtypes._get_fill_value(self.array.dtype, dtypes.NA),
expected_groups=None,
)
return AlignedArrays(array=reduced["intermediates"][0], group_idx=reduced["groups"])
@@ -729,6 +691,7 @@ ffill = Scan(
binary_op=None,
reduction="nanlast",
scan="ffill",
+ # Important: this must be NaN otherwise, ffill does not work.
identity=np.nan,
mode="concat_then_scan",
)
@@ -737,6 +700,7 @@ bfill = Scan(
binary_op=None,
reduction="nanlast",
scan="ffill",
+ # Important: this must be NaN otherwise, bfill does not work.
identity=np.nan,
mode="concat_then_scan",
preprocess=reverse,
@@ -815,17 +779,18 @@ def _initialize_aggregation(
dtype_: np.dtype | None = (
np.dtype(dtype) if dtype is not None and not isinstance(dtype, np.dtype) else dtype
)
-
- final_dtype = _normalize_dtype(dtype_ or agg.dtype_init["final"], array_dtype, fill_value)
- if agg.name not in ["first", "last", "nanfirst", "nanlast", "min", "max", "nanmin", "nanmax"]:
- final_dtype = _maybe_promote_int(final_dtype)
+ final_dtype = dtypes._normalize_dtype(
+ dtype_ or agg.dtype_init["final"], array_dtype, fill_value
+ )
+ if not agg.preserves_dtype:
+ final_dtype = dtypes._maybe_promote_int(final_dtype)
agg.dtype = {
"user": dtype, # Save to automatically choose an engine
"final": final_dtype,
"numpy": (final_dtype,),
"intermediate": tuple(
(
- _normalize_dtype(int_dtype, np.result_type(array_dtype, final_dtype), int_fv)
+ dtypes._normalize_dtype(int_dtype, np.result_type(array_dtype, final_dtype), int_fv)
if int_dtype is None
else np.dtype(int_dtype)
)
@@ -838,10 +803,10 @@ def _initialize_aggregation(
# Replace sentinel fill values according to dtype
agg.fill_value["user"] = fill_value
agg.fill_value["intermediate"] = tuple(
- _get_fill_value(dt, fv)
+ dtypes._get_fill_value(dt, fv)
for dt, fv in zip(agg.dtype["intermediate"], agg.fill_value["intermediate"])
)
- agg.fill_value[func] = _get_fill_value(agg.dtype["final"], agg.fill_value[func])
+ agg.fill_value[func] = dtypes._get_fill_value(agg.dtype["final"], agg.fill_value[func])
fv = fill_value if fill_value is not None else agg.fill_value[agg.name]
if _is_arg_reduction(agg):
=====================================
flox/core.py
=====================================
@@ -170,7 +170,9 @@ def _is_minmax_reduction(func: T_Agg) -> bool:
def _is_first_last_reduction(func: T_Agg) -> bool:
- return isinstance(func, str) and func in ["nanfirst", "nanlast", "first", "last"]
+ if isinstance(func, Aggregation):
+ func = func.name
+ return func in ["nanfirst", "nanlast", "first", "last"]
def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex:
@@ -1642,7 +1644,12 @@ def dask_groupby_agg(
# This allows us to discover groups at compute time, support argreductions, lower intermediate
# memory usage (but method="cohorts" would also work to reduce memory in some cases)
labels_are_unknown = is_duck_dask_array(by_input) and expected_groups is None
- do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
+ do_grouped_combine = (
+ _is_arg_reduction(agg)
+ or labels_are_unknown
+ or (_is_first_last_reduction(agg) and array.dtype.kind != "f")
+ )
+ do_simple_combine = not do_grouped_combine
if method == "blockwise":
# use the "non dask" code path, but applied blockwise
@@ -1698,7 +1705,7 @@ def dask_groupby_agg(
tree_reduce = partial(
dask.array.reductions._tree_reduce,
- name=f"{name}-reduce",
+ name=f"{name}-simple-reduce",
dtype=array.dtype,
axis=axis,
keepdims=True,
@@ -1733,14 +1740,20 @@ def dask_groupby_agg(
groups_ = []
for blks, cohort in chunks_cohorts.items():
cohort_index = pd.Index(cohort)
- reindexer = partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
+ reindexer = (
+ partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
+ if do_simple_combine
+ else identity
+ )
reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer)
# now that we have reindexed, we can set reindex=True explicitlly
reduced_.append(
tree_reduce(
reindexed,
- combine=partial(combine, agg=agg, reindex=True),
- aggregate=partial(aggregate, expected_groups=cohort_index, reindex=True),
+ combine=partial(combine, agg=agg, reindex=do_simple_combine),
+ aggregate=partial(
+ aggregate, expected_groups=cohort_index, reindex=do_simple_combine
+ ),
)
)
# This is done because pandas promotes to 64-bit types when an Index is created
@@ -1986,8 +1999,13 @@ def _validate_reindex(
expected_groups,
any_by_dask: bool,
is_dask_array: bool,
+ array_dtype: Any,
) -> bool | None:
# logger.debug("Entering _validate_reindex: reindex is {}".format(reindex)) # noqa
+ def first_or_last():
+ return func in ["first", "last"] or (
+ _is_first_last_reduction(func) and array_dtype.kind != "f"
+ )
all_numpy = not is_dask_array and not any_by_dask
if reindex is True and not all_numpy:
@@ -1997,7 +2015,7 @@ def _validate_reindex(
raise ValueError(
"reindex=True is not a valid choice for method='blockwise' or method='cohorts'."
)
- if func in ["first", "last"]:
+ if first_or_last():
raise ValueError("reindex must be None or False when func is 'first' or 'last.")
if reindex is None:
@@ -2008,9 +2026,10 @@ def _validate_reindex(
if all_numpy:
return True
- if func in ["first", "last"]:
+ if first_or_last():
# have to do the grouped_combine since there's no good fill_value
- reindex = False
+ # Also needed for nanfirst, nanlast with no-NaN dtypes
+ return False
if method == "blockwise":
# for grouping by dask arrays, we set reindex=True
@@ -2412,12 +2431,19 @@ def groupby_reduce(
if method == "cohorts" and any_by_dask:
raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.")
+ if not is_duck_array(array):
+ array = np.asarray(array)
+
reindex = _validate_reindex(
- reindex, func, method, expected_groups, any_by_dask, is_duck_dask_array(array)
+ reindex,
+ func,
+ method,
+ expected_groups,
+ any_by_dask,
+ is_duck_dask_array(array),
+ array.dtype,
)
- if not is_duck_array(array):
- array = np.asarray(array)
is_bool_array = np.issubdtype(array.dtype, bool)
array = array.astype(np.intp) if is_bool_array else array
@@ -2601,7 +2627,7 @@ def groupby_reduce(
# TODO: clean this up
reindex = _validate_reindex(
- reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array)
+ reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array), array.dtype
)
if TYPE_CHECKING:
@@ -2637,10 +2663,18 @@ def groupby_reduce(
groups = (groups[0][sorted_idx],)
if factorize_early:
+ assert len(groups) == 1
+ (groups_,) = groups
# nan group labels are factorized to -1, and preserved
# now we get rid of them by reindexing
- # This also handles bins with no data
- result = reindex_(result, from_=groups[0], to=expected_, fill_value=fill_value).reshape(
+ # First, for "blockwise", we can have -1 repeated in different blocks
+ # This breaks the reindexing so remove those first.
+ if method == "blockwise" and (mask := groups_ == -1).sum(axis=-1) > 1:
+ result = result[..., ~mask]
+ groups_ = groups_[..., ~mask]
+
+ # This reindex also handles bins with no data
+ result = reindex_(result, from_=groups_, to=expected_, fill_value=fill_value).reshape(
result.shape[:-1] + grp_shape
)
groups = final_groups
=====================================
flox/xrdtypes.py
=====================================
@@ -1,6 +1,7 @@
import functools
import numpy as np
+from numpy.typing import DTypeLike
from . import xrutils as utils
@@ -147,3 +148,57 @@ def get_neg_infinity(dtype, min_for_int=False):
def is_datetime_like(dtype):
"""Check if a dtype is a subclass of the numpy datetime types"""
return np.issubdtype(dtype, np.datetime64) or np.issubdtype(dtype, np.timedelta64)
+
+
+def _normalize_dtype(dtype: DTypeLike, array_dtype: np.dtype, fill_value=None) -> np.dtype:
+ if dtype is None:
+ dtype = array_dtype
+ if dtype is np.floating:
+ # mean, std, var always result in floating
+ # but we preserve the array's dtype if it is floating
+ if array_dtype.kind in "fcmM":
+ dtype = array_dtype
+ else:
+ dtype = np.dtype("float64")
+ elif not isinstance(dtype, np.dtype):
+ dtype = np.dtype(dtype)
+ if fill_value not in [None, INF, NINF, NA]:
+ dtype = np.result_type(dtype, fill_value)
+ return dtype
+
+
+def _maybe_promote_int(dtype) -> np.dtype:
+ # https://numpy.org/doc/stable/reference/generated/numpy.prod.html
+ # The dtype of a is used by default unless a has an integer dtype of less precision
+ # than the default platform integer.
+ if not isinstance(dtype, np.dtype):
+ dtype = np.dtype(dtype)
+ if dtype.kind == "i":
+ dtype = np.result_type(dtype, np.intp)
+ elif dtype.kind == "u":
+ dtype = np.result_type(dtype, np.uintp)
+ return dtype
+
+
+def _get_fill_value(dtype, fill_value):
+ """Returns dtype appropriate infinity. Returns +Inf equivalent for None."""
+ if fill_value in [None, NA] and dtype.kind in "US":
+ return ""
+ if fill_value == INF or fill_value is None:
+ return get_pos_infinity(dtype, max_for_int=True)
+ if fill_value == NINF:
+ return get_neg_infinity(dtype, min_for_int=True)
+ if fill_value == NA:
+ if np.issubdtype(dtype, np.floating) or np.issubdtype(dtype, np.complexfloating):
+ return np.nan
+ # This is madness, but npg checks that fill_value is compatible
+ # with array dtype even if the fill_value is never used.
+ elif np.issubdtype(dtype, np.integer):
+ return get_neg_infinity(dtype, min_for_int=True)
+ elif np.issubdtype(dtype, np.timedelta64):
+ return np.timedelta64("NaT")
+ elif np.issubdtype(dtype, np.datetime64):
+ return np.datetime64("NaT")
+ else:
+ return None
+ return fill_value
=====================================
readthedocs.yml
=====================================
@@ -1,9 +1,9 @@
version: 2
build:
- os: "ubuntu-20.04"
+ os: "ubuntu-lts-latest"
tools:
- python: "mambaforge-4.10"
+ python: "mambaforge-latest"
conda:
environment: ci/docs.yml
=====================================
tests/conftest.py
=====================================
@@ -10,11 +10,12 @@ settings.register_profile(
suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow],
)
settings.register_profile(
- "local",
+ "default",
max_examples=300,
suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow],
verbosity=Verbosity.verbose,
)
+settings.load_profile("default")
@pytest.fixture(
=====================================
tests/test_asv.py
=====================================
@@ -0,0 +1,24 @@
+# Run asv benchmarks as tests
+
+import pytest
+
+pytest.importorskip("dask")
+
+from asv_bench.benchmarks import reduce
+
+
+ at pytest.mark.parametrize(
+ "problem", [reduce.ChunkReduce1D, reduce.ChunkReduce2D, reduce.ChunkReduce2DAllAxes]
+)
+def test_reduce(problem) -> None:
+ testcase = problem()
+ testcase.setup()
+ for args in zip(*testcase.time_reduce.params):
+ testcase.time_reduce(*args)
+
+
+def test_reduce_bare() -> None:
+ testcase = reduce.ChunkReduce1D()
+ testcase.setup()
+ for args in zip(*testcase.time_reduce_bare.params):
+ testcase.time_reduce_bare(*args)
=====================================
tests/test_core.py
=====================================
@@ -13,8 +13,9 @@ import pytest
from numpy_groupies.aggregate_numpy import aggregate
import flox
+from flox import xrdtypes as dtypes
from flox import xrutils
-from flox.aggregations import Aggregation, _initialize_aggregation, _maybe_promote_int
+from flox.aggregations import Aggregation, _initialize_aggregation
from flox.core import (
HAS_NUMBAGG,
_choose_engine,
@@ -161,7 +162,7 @@ def test_groupby_reduce(
if func == "mean" or func == "nanmean":
expected_result = np.array(expected, dtype=np.float64)
elif func == "sum":
- expected_result = np.array(expected, dtype=_maybe_promote_int(array.dtype))
+ expected_result = np.array(expected, dtype=dtypes._maybe_promote_int(array.dtype))
elif func == "count":
expected_result = np.array(expected, dtype=np.intp)
@@ -389,7 +390,7 @@ def test_groupby_reduce_preserves_dtype(dtype, func):
array = np.ones((2, 12), dtype=dtype)
by = np.array([labels] * 2)
result, _ = groupby_reduce(from_array(array, chunks=(-1, 4)), by, func=func)
- expect_dtype = _maybe_promote_int(array.dtype)
+ expect_dtype = dtypes._maybe_promote_int(array.dtype)
assert result.dtype == expect_dtype
@@ -613,6 +614,33 @@ def test_dask_reduce_axis_subset():
)
+ at pytest.mark.parametrize("group_idx", [[0, 1, 0], [0, 0, 1], [1, 0, 0], [1, 1, 0]])
+ at pytest.mark.parametrize(
+ "func",
+ [
+ # "first", "last",
+ "nanfirst",
+ "nanlast",
+ ],
+)
+ at pytest.mark.parametrize(
+ "chunks",
+ [
+ None,
+ pytest.param(1, marks=pytest.mark.skipif(not has_dask, reason="no dask")),
+ pytest.param(2, marks=pytest.mark.skipif(not has_dask, reason="no dask")),
+ pytest.param(3, marks=pytest.mark.skipif(not has_dask, reason="no dask")),
+ ],
+)
+def test_first_last_useless(func, chunks, group_idx):
+ array = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int8)
+ if chunks is not None:
+ array = dask.array.from_array(array, chunks=chunks)
+ actual, _ = groupby_reduce(array, np.array(group_idx), func=func, engine="numpy")
+ expected = np.array([[0, 0], [0, 0]], dtype=np.int8)
+ assert_equal(actual, expected)
+
+
@pytest.mark.parametrize("func", ["first", "last", "nanfirst", "nanlast"])
@pytest.mark.parametrize("axis", [(0, 1)])
def test_first_last_disallowed(axis, func):
@@ -1027,7 +1055,7 @@ def test_dtype_preservation(dtype, func, engine):
# https://github.com/numbagg/numbagg/issues/121
pytest.skip()
if func == "sum":
- expected = _maybe_promote_int(dtype)
+ expected = dtypes._maybe_promote_int(dtype)
elif func == "mean" and "int" in dtype:
expected = np.float64
else:
@@ -1058,7 +1086,7 @@ def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype):
actual, actual_groups = groupby_reduce(array, labels, func="sum", method=method)
assert_equal(actual_groups, np.arange(6, dtype=labels.dtype))
- expect_dtype = _maybe_promote_int(dtype)
+ expect_dtype = dtypes._maybe_promote_int(dtype)
assert_equal(actual, np.array([0, 4, 24, 6, 12, 20], dtype=expect_dtype))
@@ -1563,18 +1591,36 @@ def test_validate_reindex_map_reduce(
dask_expected, reindex, func, expected_groups, any_by_dask
) -> None:
actual = _validate_reindex(
- reindex, func, "map-reduce", expected_groups, any_by_dask, is_dask_array=True
+ reindex,
+ func,
+ "map-reduce",
+ expected_groups,
+ any_by_dask,
+ is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
assert actual is dask_expected
# always reindex with all numpy inputs
actual = _validate_reindex(
- reindex, func, "map-reduce", expected_groups, any_by_dask=False, is_dask_array=False
+ reindex,
+ func,
+ "map-reduce",
+ expected_groups,
+ any_by_dask=False,
+ is_dask_array=False,
+ array_dtype=np.dtype("int32"),
)
assert actual
actual = _validate_reindex(
- True, func, "map-reduce", expected_groups, any_by_dask=False, is_dask_array=False
+ True,
+ func,
+ "map-reduce",
+ expected_groups,
+ any_by_dask=False,
+ is_dask_array=False,
+ array_dtype=np.dtype("int32"),
)
assert actual
@@ -1584,19 +1630,37 @@ def test_validate_reindex() -> None:
for method in methods:
with pytest.raises(NotImplementedError):
_validate_reindex(
- True, "argmax", method, expected_groups=None, any_by_dask=False, is_dask_array=True
+ True,
+ "argmax",
+ method,
+ expected_groups=None,
+ any_by_dask=False,
+ is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
methods: list[T_Method] = ["blockwise", "cohorts"]
for method in methods:
with pytest.raises(ValueError):
_validate_reindex(
- True, "sum", method, expected_groups=None, any_by_dask=False, is_dask_array=True
+ True,
+ "sum",
+ method,
+ expected_groups=None,
+ any_by_dask=False,
+ is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
for func in ["sum", "argmax"]:
actual = _validate_reindex(
- None, func, method, expected_groups=None, any_by_dask=False, is_dask_array=True
+ None,
+ func,
+ method,
+ expected_groups=None,
+ any_by_dask=False,
+ is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
assert actual is False
@@ -1608,6 +1672,7 @@ def test_validate_reindex() -> None:
expected_groups=np.array([1, 2, 3]),
any_by_dask=False,
is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
assert _validate_reindex(
@@ -1617,6 +1682,7 @@ def test_validate_reindex() -> None:
expected_groups=np.array([1, 2, 3]),
any_by_dask=True,
is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
assert _validate_reindex(
None,
@@ -1625,8 +1691,24 @@ def test_validate_reindex() -> None:
expected_groups=np.array([1, 2, 3]),
any_by_dask=True,
is_dask_array=True,
+ array_dtype=np.dtype("int32"),
)
+ kwargs = dict(
+ method="blockwise",
+ expected_groups=np.array([1, 2, 3]),
+ any_by_dask=True,
+ is_dask_array=True,
+ )
+
+ for func in ["nanfirst", "nanlast"]:
+ assert not _validate_reindex(None, func, array_dtype=np.dtype("int32"), **kwargs) # type: ignore[arg-type]
+ assert _validate_reindex(None, func, array_dtype=np.dtype("float32"), **kwargs) # type: ignore[arg-type]
+
+ for func in ["first", "last"]:
+ assert not _validate_reindex(None, func, array_dtype=np.dtype("int32"), **kwargs) # type: ignore[arg-type]
+ assert not _validate_reindex(None, func, array_dtype=np.dtype("float32"), **kwargs) # type: ignore[arg-type]
+
@requires_dask
def test_1d_blockwise_sort_optimization():
@@ -1847,3 +1929,17 @@ def test_ffill_bfill(chunks, size, add_nan_by, func):
expected = flox.groupby_scan(array.compute(), by, func=func)
actual = flox.groupby_scan(array, by, func=func)
assert_equal(expected, actual)
+
+
+ at requires_dask
+def test_blockwise_nans():
+ array = dask.array.ones((1, 10), chunks=2)
+ by = np.array([-1, 0, -1, 1, -1, 2, -1, 3, 4, 4])
+ actual, actual_groups = flox.groupby_reduce(
+ array, by, func="sum", expected_groups=pd.RangeIndex(0, 5)
+ )
+ expected, expected_groups = flox.groupby_reduce(
+ array.compute(), by, func="sum", expected_groups=pd.RangeIndex(0, 5)
+ )
+ assert_equal(expected_groups, actual_groups)
+ assert_equal(expected, actual)
=====================================
tests/test_properties.py
=====================================
@@ -9,6 +9,7 @@ pytest.importorskip("dask")
pytest.importorskip("cftime")
import dask
+import hypothesis.extra.numpy as npst
import hypothesis.strategies as st
import numpy as np
from hypothesis import assume, given, note
@@ -19,6 +20,7 @@ from flox.xrutils import notnull
from . import assert_equal
from .strategies import by_arrays, chunked_arrays, func_st, numeric_arrays
+from .strategies import chunks as chunks_strategy
dask.config.set(scheduler="sync")
@@ -59,7 +61,11 @@ def not_overflowing_array(array: np.ndarray[Any, Any]) -> bool:
return result
- at given(data=st.data(), array=numeric_arrays, func=func_st)
+ at given(
+ data=st.data(),
+ array=st.one_of(numeric_arrays, chunked_arrays(arrays=numeric_arrays)),
+ func=func_st,
+)
def test_groupby_reduce(data, array, func: str) -> None:
# overflow behaviour differs between bincount and sum (for example)
assume(not_overflowing_array(array))
@@ -91,7 +97,13 @@ def test_groupby_reduce(data, array, func: str) -> None:
# numpy-groupies always does the calculation in float64
if (
- ("var" in func or "std" in func or "sum" in func or "mean" in func)
+ (
+ "var" in func
+ or "std" in func
+ or "sum" in func
+ or "mean" in func
+ or "quantile" in func
+ )
and array.dtype.kind == "f"
and array.dtype.itemsize != 8
):
@@ -208,3 +220,16 @@ def test_first_last(data, array: dask.array.Array, func: str) -> None:
first, *_ = groupby_reduce(array, by, func=func, engine="flox")
second, *_ = groupby_reduce(array, by, func=mate, engine="flox")
assert_equal(first, second)
+
+
+ at given(data=st.data(), func=st.sampled_from(["nanfirst", "nanlast"]))
+def test_first_last_useless(data, func):
+ shape = data.draw(npst.array_shapes())
+ by = data.draw(by_arrays(shape=shape[slice(-1, None)]))
+ chunks = data.draw(chunks_strategy(shape=shape))
+ array = np.zeros(shape, dtype=np.int8)
+ if chunks is not None:
+ array = dask.array.from_array(array, chunks=chunks)
+ actual, groups = groupby_reduce(array, by, axis=-1, func=func, engine="numpy")
+ expected = np.zeros(shape[:-1] + (len(groups),), dtype=array.dtype)
+ assert_equal(actual, expected)
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/9fbeaa3c30ae8cb0998fbde29e4a8a6985027ffd
--
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/9fbeaa3c30ae8cb0998fbde29e4a8a6985027ffd
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240815/71a16d7e/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list