[Git][debian-gis-team/flox][upstream] New upstream version 0.9.8

Antonio Valentino (@antonio.valentino) gitlab at salsa.debian.org
Thu May 30 07:02:02 BST 2024



Antonio Valentino pushed to branch upstream at Debian GIS Project / flox


Commits:
e46f6d2d by Antonio Valentino at 2024-05-30T05:44:32+00:00
New upstream version 0.9.8
- - - - -


5 changed files:

- asv_bench/benchmarks/cohorts.py
- ci/environment.yml
- docs/source/user-stories/climatology-hourly-cubed.ipynb
- flox/core.py
- tests/test_core.py


Changes:

=====================================
asv_bench/benchmarks/cohorts.py
=====================================
@@ -21,7 +21,6 @@ class Cohorts:
         asfloat = self.bitmask().astype(float)
         chunks_per_label = asfloat.sum(axis=0)
         containment = (asfloat.T @ asfloat) / chunks_per_label
-        print(containment.nnz / np.prod(containment.shape))
         return containment.todense()
 
     def chunks_cohorts(self):


=====================================
ci/environment.yml
=====================================
@@ -6,7 +6,7 @@ dependencies:
   - cachey
   - cftime
   - codecov
-  - cubed>=0.14.2
+  - cubed>=0.14.3
   - dask-core
   - pandas
   - numpy>=1.22


=====================================
docs/source/user-stories/climatology-hourly-cubed.ipynb
=====================================
@@ -86,6 +86,37 @@
    "source": [
     "hourly.compute()"
    ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "7",
+   "metadata": {},
+   "source": [
+    "## Other climatologies: resampling by month\n",
+    "\n",
+    "This uses the \"blockwise\" strategy."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "8",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n",
+    "monthly"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "monthly.compute()"
+   ]
   }
  ],
  "metadata": {


=====================================
flox/core.py
=====================================
@@ -1798,87 +1798,123 @@ def cubed_groupby_agg(
     assert isinstance(axis, Sequence)
     assert all(ax >= 0 for ax in axis)
 
-    inds = tuple(range(array.ndim))
+    if method == "blockwise":
+        assert by.ndim == 1
+        assert expected_groups is not None
 
-    by_input = by
+        def _reduction_func(a, by, axis, start_group, num_groups):
+            # adjust group labels to start from 0 for each chunk
+            by_for_chunk = by - start_group
+            expected_groups_for_chunk = pd.RangeIndex(num_groups)
 
-    # Unifying chunks is necessary for argreductions.
-    # We need to rechunk before zipping up with the index
-    # let's always do it anyway
-    if not is_chunked_array(by):
-        # chunk numpy arrays like the input array
-        chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))
+            axis = (axis,)  # convert integral axis to tuple
 
-        by = cubed.from_array(by, chunks=chunks, spec=array.spec)
-    _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
+            blockwise_method = partial(
+                _reduce_blockwise,
+                agg=agg,
+                axis=axis,
+                expected_groups=expected_groups_for_chunk,
+                fill_value=fill_value,
+                engine=engine,
+                sort=sort,
+                reindex=reindex,
+            )
+            out = blockwise_method(a, by_for_chunk)
+            return out[agg.name]
 
-    # Cubed's groupby_reduction handles the generation of "intermediates", and the
-    # "map-reduce" combination step, so we don't have to do that here.
-    # Only the equivalent of "_simple_combine" is supported, there is no
-    # support for "_grouped_combine".
-    labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
-    do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
+        num_groups = len(expected_groups)
+        result = cubed.core.groupby.groupby_blockwise(
+            array, by, axis=axis, func=_reduction_func, num_groups=num_groups
+        )
+        groups = (expected_groups.to_numpy(),)
+        return (result, groups)
 
-    assert do_simple_combine
-    assert method == "map-reduce"
-    assert expected_groups is not None
-    assert reindex is True
-    assert len(axis) == 1  # one axis/grouping
+    else:
+        inds = tuple(range(array.ndim))
 
-    def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
-        blockwise_method = partial(
-            _get_chunk_reduction(agg.reduction_type),
-            func=agg.chunk,
-            fill_value=agg.fill_value["intermediate"],
-            dtype=agg.dtype["intermediate"],
-            reindex=reindex,
-            user_dtype=agg.dtype["user"],
+        by_input = by
+
+        # Unifying chunks is necessary for argreductions.
+        # We need to rechunk before zipping up with the index
+        # let's always do it anyway
+        if not is_chunked_array(by):
+            # chunk numpy arrays like the input array
+            chunks = tuple(
+                array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)
+            )
+
+            by = cubed.from_array(by, chunks=chunks, spec=array.spec)
+        _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
+
+        # Cubed's groupby_reduction handles the generation of "intermediates", and the
+        # "map-reduce" combination step, so we don't have to do that here.
+        # Only the equivalent of "_simple_combine" is supported, there is no
+        # support for "_grouped_combine".
+        labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
+        do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
+
+        assert do_simple_combine
+        assert method == "map-reduce"
+        assert expected_groups is not None
+        assert reindex is True
+        assert len(axis) == 1  # one axis/grouping
+
+        def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
+            blockwise_method = partial(
+                _get_chunk_reduction(agg.reduction_type),
+                func=agg.chunk,
+                fill_value=agg.fill_value["intermediate"],
+                dtype=agg.dtype["intermediate"],
+                reindex=reindex,
+                user_dtype=agg.dtype["user"],
+                axis=axis,
+                expected_groups=expected_groups,
+                engine=engine,
+                sort=sort,
+            )
+            out = blockwise_method(a, by)
+            # Convert dict to one that cubed understands, dropping groups since they are
+            # known, and the same for every block.
+            return {
+                f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])
+            }
+
+        def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
+            # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
+            # only combine over the dummy axis, to preserve grouping along 'axis'
+            dtype = dict(dtype)
+            out = {}
+            for idx, combine in enumerate(agg.simple_combine):
+                field = f"f{idx}"
+                out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
+            return out
+
+        def _groupby_aggregate(a):
+            # Convert cubed dict to one that _finalize_results works with
+            results = {"groups": expected_groups, "intermediates": a.values()}
+            out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
+            return out[agg.name]
+
+        # convert list of dtypes to a structured dtype for cubed
+        intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
+        dtype = agg.dtype["final"]
+        num_groups = len(expected_groups)
+
+        result = cubed.core.groupby.groupby_reduction(
+            array,
+            by,
+            func=_groupby_func,
+            combine_func=_groupby_combine,
+            aggregate_func=_groupby_aggregate,
             axis=axis,
-            expected_groups=expected_groups,
-            engine=engine,
-            sort=sort,
+            intermediate_dtype=intermediate_dtype,
+            dtype=dtype,
+            num_groups=num_groups,
         )
-        out = blockwise_method(a, by)
-        # Convert dict to one that cubed understands, dropping groups since they are
-        # known, and the same for every block.
-        return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])}
-
-    def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
-        # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
-        # only combine over the dummy axis, to preserve grouping along 'axis'
-        dtype = dict(dtype)
-        out = {}
-        for idx, combine in enumerate(agg.simple_combine):
-            field = f"f{idx}"
-            out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
-        return out
-
-    def _groupby_aggregate(a):
-        # Convert cubed dict to one that _finalize_results works with
-        results = {"groups": expected_groups, "intermediates": a.values()}
-        out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
-        return out[agg.name]
-
-    # convert list of dtypes to a structured dtype for cubed
-    intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
-    dtype = agg.dtype["final"]
-    num_groups = len(expected_groups)
-
-    result = cubed.core.groupby.groupby_reduction(
-        array,
-        by,
-        func=_groupby_func,
-        combine_func=_groupby_combine,
-        aggregate_func=_groupby_aggregate,
-        axis=axis,
-        intermediate_dtype=intermediate_dtype,
-        dtype=dtype,
-        num_groups=num_groups,
-    )
 
-    groups = (expected_groups.to_numpy(),)
+        groups = (expected_groups.to_numpy(),)
 
-    return (result, groups)
+        return (result, groups)
 
 
 def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray:
@@ -2467,9 +2503,9 @@ def groupby_reduce(
         if method is None:
             method = "map-reduce"
 
-        if method != "map-reduce":
+        if method not in ("map-reduce", "blockwise"):
             raise NotImplementedError(
-                "Reduction for Cubed arrays is only implemented for method 'map-reduce'."
+                "Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'."
             )
 
         partial_agg = partial(cubed_groupby_agg, **kwargs)


=====================================
tests/test_core.py
=====================================
@@ -1205,6 +1205,40 @@ def test_group_by_datetime(engine, method):
     assert_equal(expected, actual)
 
 
+ at requires_cubed
+ at pytest.mark.parametrize("method", ["blockwise", "map-reduce"])
+def test_group_by_datetime_cubed(engine, method):
+    kwargs = dict(
+        func="mean",
+        method=method,
+        engine=engine,
+    )
+    t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
+    data = t.dt.dayofyear
+    cubedarray = cubed.from_array(data.values, chunks=30)
+
+    actual, _ = groupby_reduce(cubedarray, t, **kwargs)
+    expected = data.to_numpy().astype(float)
+    assert_equal(expected, actual)
+
+    edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy()
+    actual, _ = groupby_reduce(
+        cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs
+    )
+    expected = data.resample("ME").mean().to_numpy()
+    assert_equal(expected, actual)
+
+    actual, _ = groupby_reduce(
+        cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])),
+        t.to_numpy(),
+        isbin=True,
+        expected_groups=edges,
+        **kwargs,
+    )
+    expected = np.broadcast_to(expected, (2, 3, expected.shape[-1]))
+    assert_equal(expected, actual)
+
+
 def test_factorize_values_outside_bins():
     # pd.factorize returns intp
     vals = factorize_(



View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/e46f6d2d07885a27f00a54571f8e5b753ddd6671

-- 
This project does not include diff previews in email notifications.
View it on GitLab: https://salsa.debian.org/debian-gis-team/flox/-/commit/e46f6d2d07885a27f00a54571f8e5b753ddd6671
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-grass-devel/attachments/20240530/c7183812/attachment-0001.htm>


More information about the Pkg-grass-devel mailing list