Commits


Li Jin authored and GitHub committed baf17a20a35
GH-36252: [Python] Add non decomposable hash aggregate UDF (#36253) ### Rationale for this change In https://github.com/apache/arrow/issues/35515, I have implemented a Scalar version of the non decomposable UDF (Scalar as in SCALAR_AGGREGATE). I would like to support the Hash version of it (Hash as in HASH_AGGREGATE) With this PR, user can register an aggregate UDF once with `pc.register_aggregate_function` and it can be used as both scalar aggregate function and hash aggregate function. Example: ``` def median(x): return pa.scalar(np.nanmedian(x)) pc.register_aggregate_function(func=median, func_name='median_udf', ...) table = ... table.groupby("id").aggregate([("v", 'median_udf')]) ``` ### What changes are included in this PR? The main changes are: * In ResigterAggregateFunction (udf.cc), we now register the function both as a scalar aggregate function and a hash aggregate function (with signature adjustment for hash aggregate kernel because we need to append the grouping key) * Implemented PythonUdfHashAggregateImpl, similar to the PythonUdfScalarAggregateImpl. In Consume, it will accumulate both the input batches and the group id array. In Merge, it will merge the input batches and group id array (with the group_id_mapping). In Finalize, it will apply groupings to the accumulated batches to create one record batch per group, then apply the UDF over each group. * Some code clean up - `UdfWrapperCallback` objects are named `cb` (previously, `agg_cb` or `wrapper`) now and the user defined python function is now just called `function` (previously `agg_function`) For table.groupby().aggregate(...), the space complexity is O(n) where n is the size of the table (and therefore, is not very useful). However, this is more useful in the segmented aggregation case, where the space complexity of O(s), where s the size of the segments. ### Are these changes tested? Added new test in test_udf.py (with table.group_by().aggregate() and test_substrait.py (with segmented aggregation) ### Are there any user-facing changes? Yes with this change, user can call use registered aggregate UDF with `table.group_by().aggregate() ` or Acero's segmented aggregation. ### Checklist - [x] Self Review - [x] API Documentation * Closes: #36252 Lead-authored-by: Li Jin <ice.xelloss@gmail.com> Co-authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Li Jin <ice.xelloss@gmail.com>