[Git][debian-gis-team/flox][upstream] New upstream version 0.9.8
Antonio Valentino (@antonio.valentino)
gitlab at salsa.debian.org
Thu May 30 07:02:02 BST 2024
Antonio Valentino pushed to branch upstream at Debian GIS Project / flox
Commits:
e46f6d2d by Antonio Valentino at 2024-05-30T05:44:32+00:00
New upstream version 0.9.8
- - - - -
5 changed files:
- asv_bench/benchmarks/cohorts.py
- ci/environment.yml
- docs/source/user-stories/climatology-hourly-cubed.ipynb
- flox/core.py
- tests/test_core.py
Changes:
=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -21,7 +21,6 @@ class Cohorts:
asfloat = self.bitmask().astype(float)
chunks_per_label = asfloat.sum(axis=0)
containment = (asfloat.T @ asfloat) / chunks_per_label
- print(containment.nnz / np.prod(containment.shape))
return containment.todense()
def chunks_cohorts(self):
=====================================
ci/environment.yml
=====================================
@@ -6,7 +6,7 @@ dependencies:
- cachey
- cftime
- codecov
- - cubed>=0.14.2
+ - cubed>=0.14.3
- dask-core
- pandas
- numpy>=1.22
=====================================
docs/source/user-stories/climatology-hourly-cubed.ipynb
=====================================
@@ -86,6 +86,37 @@
"source": [
"hourly.compute()"
]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7",
+ "metadata": {},
+ "source": [
+ "## Other climatologies: resampling by month\n",
+ "\n",
+ "This uses the \"blockwise\" strategy."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "8",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n",
+ "monthly"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "monthly.compute()"
+ ]
}
],
"metadata": {
=====================================
flox/core.py
=====================================
@@ -1798,87 +1798,123 @@ def cubed_groupby_agg(
assert isinstance(axis, Sequence)
assert all(ax >= 0 for ax in axis)
- inds = tuple(range(array.ndim))
+ if method == "blockwise":
+ assert by.ndim == 1
+ assert expected_groups is not None
- by_input = by
+ def _reduction_func(a, by, axis, start_group, num_groups):
+ # adjust group labels to start from 0 for each chunk
+ by_for_chunk = by - start_group
+ expected_groups_for_chunk = pd.RangeIndex(num_groups)
- # Unifying chunks is necessary for argreductions.
- # We need to rechunk before zipping up with the index
- # let's always do it anyway
- if not is_chunked_array(by):
- # chunk numpy arrays like the input array
- chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))
+ axis = (axis,) # convert integral axis to tuple
- by = cubed.from_array(by, chunks=chunks, spec=array.spec)
- _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
+ blockwise_method = partial(
+ _reduce_blockwise,
+ agg=agg,
+ axis=axis,
+ expected_groups=expected_groups_for_chunk,
+ fill_value=fill_value,
+ engine=engine,
+ sort=sort,
+ reindex=reindex,
+ )
+ out = blockwise_method(a, by_for_chunk)
+ return out[agg.name]
- # Cubed's groupby_reduction handles the generation of "intermediates", and the
- # "map-reduce" combination step, so we don't have to do that here.
- # Only the equivalent of "_simple_combine" is supported, there is no
- # support for "_grouped_combine".
- labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
- do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
+ num_groups = len(expected_groups)
+ result = cubed.core.groupby.groupby_blockwise(
+ array, by, axis=axis, func=_reduction_func, num_groups=num_groups
+ )
+ groups = (expected_groups.to_numpy(),)
+ return (result, groups)
- assert do_simple_combine
- assert method == "map-reduce"
- assert expected_groups is not None
- assert reindex is True
- assert len(axis) == 1 # one axis/grouping
+ else:
+ inds = tuple(range(array.ndim))
- def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
- blockwise_method = partial(
- _get_chunk_reduction(agg.reduction_type),
- func=agg.chunk,
- fill_value=agg.fill_value["intermediate"],
- dtype=agg.dtype["intermediate"],
- reindex=reindex,
- user_dtype=agg.dtype["user"],
+ by_input = by
+
+ # Unifying chunks is necessary for argreductions.
+ # We need to rechunk before zipping up with the index
+ # let's always do it anyway
+ if not is_chunked_array(by):
+ # chunk numpy arrays like the input array
+ chunks = tuple(
+ array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)
+ )
+
+ by = cubed.from_array(by, chunks=chunks, spec=array.spec)
+ _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
+
+ # Cubed's groupby_reduction handles the generation of "intermediates", and the
+ # "map-reduce" combination step, so we don't have to do that here.
+ # Only the equivalent of "_simple_combine" is supported, there is no
+ # support for "_grouped_combine".
+ labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
+ do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
+
+ assert do_simple_combine
+ assert method == "map-reduce"
+ assert expected_groups is not None
+ assert reindex is True
+ assert len(axis) == 1 # one axis/grouping
+
+ def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
+ blockwise_method = partial(
+ _get_chunk_reduction(agg.reduction_type),
+ func=agg.chunk,
+ fill_value=agg.fill_value["intermediate"],
+ dtype=agg.dtype["intermediate"],
+ reindex=reindex,
+ user_dtype=agg.dtype["user"],
+ axis=axis,
+ expected_groups=expected_groups,
+ engine=engine,
+ sort=sort,
+ )
+ out = blockwise_method(a, by)
+ # Convert dict to one that cubed understands, dropping groups since they are
+ # known, and the same for every block.
+ return {
+ f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])
+ }
+
+ def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
+ # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
+ # only combine over the dummy axis, to preserve grouping along 'axis'
+ dtype = dict(dtype)
+ out = {}
+ for idx, combine in enumerate(agg.simple_combine):
+ field = f"f{idx}"
+ out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
+ return out
+
+ def _groupby_aggregate(a):
+ # Convert cubed dict to one that _finalize_results works with
+ results = {"groups": expected_groups, "intermediates": a.values()}
+ out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
+ return out[agg.name]
+
+ # convert list of dtypes to a structured dtype for cubed
+ intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
+ dtype = agg.dtype["final"]
+ num_groups = len(expected_groups)
+
+ result = cubed.core.groupby.groupby_reduction(
+ array,
+ by,
+ func=_groupby_func,
+ combine_func=_groupby_combine,
+ aggregate_func=_groupby_aggregate,
axis=axis,
- expected_groups=expected_groups,
- engine=engine,
- sort=sort,
+ intermediate_dtype=intermediate_dtype,
+ dtype=dtype,
+ num_groups=num_groups,
)
- out = blockwise_method(a, by)
- # Convert dict to one that cubed understands, dropping groups since they are
- # known, and the same for every block.
- return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])}
-
- def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
- # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
- # only combine over the dummy axis, to preserve grouping along 'axis'
- dtype = dict(dtype)
- out = {}
- for idx, combine in enumerate(agg.simple_combine):
- field = f"f{idx}"
- out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
- return out
-
- def _groupby_aggregate(a):
- # Convert cubed dict to one that _finalize_results works with
- results = {"groups": expected_groups, "intermediates": a.values()}
- out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
- return out[agg.name]
-
- # convert list of dtypes to a structured dtype for cubed
- intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
- dtype = agg.dtype["final"]
- num_groups = len(expected_groups)
-
- result = cubed.core.groupby.groupby_reduction(
- array,
- by,
- func=_groupby_func,
- combine_func=_groupby_combine,
- aggregate_func=_groupby_aggregate,
- axis=axis,
- intermediate_dtype=intermediate_dtype,
- dtype=dtype,
- num_groups=num_groups,
- )
- groups = (expected_groups.to_numpy(),)
+ groups = (expected_groups.to_numpy(),)
- return (result, groups)
+ return (result, groups)
def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray:
@@ -2467,9 +2503,9 @@ def groupby_reduce(
if method is None:
method = "map-reduce"
- if method != "map-reduce":
+ if method not in ("map-reduce", "blockwise"):
raise NotImplementedError(
- "Reduction for Cubed arrays is only implemented for method 'map-reduce'."
+ "Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'."
)
partial_agg = partial(cubed_groupby_agg, **kwargs)
=====================================
tests/test_core.py
=====================================
@@ -1205,6 +1205,40 @@ def test_group_by_datetime(engine, method):
assert_equal(expected, actual)
+ at requires_cubed
+ at pytest.mark.parametrize("method", ["blockwise", "map-reduce"])
+def test_group_by_datetime_cubed(engine, method):
+ kwargs = dict(
+ func="mean",
+ method=method,
+ engine=engine,
+ )
+ t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
+ data = t.dt.dayofyear
+ cubedarray = cubed.from_array(data.values, chunks=30)
+
+ actual, _ = groupby_reduce(cubedarray, t, **kwargs)
+ expected = data.to_numpy().astype(float)
+ assert_equal(expected, actual)
+
+ edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy()
+ actual, _ = groupby_reduce(
+ cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs
+ )
+ expected = data.resample("ME").mean().to_numpy()
+ assert_equal(expected, actual)
+
+ actual, _ = groupby_reduce(
+ cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])),
+ t.to_numpy(),
+ isbin=True,
+ expected_groups=edges,
+ **kwargs,
+ )
+ expected = np.broadcast_to(expected, (2, 3, expected.shape[-1]))
+ assert_equal(expected, actual)
+
+
def test_factorize_values_outside_bins():
# pd.factorize returns intp
vals = factorize_(
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/e46f6d2d07885a27f00a54571f8e5b753ddd6671
--
This project does not include diff previews in email notifications.
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/e46f6d2d07885a27f00a54571f8e5b753ddd6671
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240530/c7183812/attachment-0001.htm>
More information about the Pkg-grass-devel
mailing list