Commits


Daniel Russo authored and Andy Grove committed 1c7581cf777
ARROW-10043: [Rust][DataFusion] Implement COUNT(DISTINCT col) This is a proposal for an initial and partial implementation of the `DISTINCT` keyword. Only `COUNT(DISTINCT)` is supported, with the following conditions: (a) only one argument, i.e. `COUNT(DISTINCT col)`, but not `COUNT(DISTINCT col, other)`, (b) the argument is an integer type, and (c) the query must have a `GROUP BY` clause. **Implementation Overview:** The `Expr::AggregateFunction` variant has a new field, `distinct`, which mirrors the `distinct` flag from `SQLExpr::Function` (up until now this flag was unused). Any `Expr::AggregateFunction` may have its `distinct` flag switched to `true` if the keyword is present in the SQL query. However, the physical planner respects it only for `COUNT` expressions. The count distinct aggregation slots into the existing physical plans as a new set of `AggregateExpr`. To demonstrate, below are examples of the physical plans for the following query, where `c1` may be any data type, and `c2` is a `UInt8` column: ``` SELECT c1, COUNT(DISTINCT c2) FROM t1 GROUP BY c1 ``` (a) Multiple Partitions: HashAggregateExec: mode: Final group_expr: Column(c1) aggr_expr: DistinctCountReduce(Column(c2)) schema: c1: any c2: UInt64 input: MergeExec: input: HashAggregateExec: mode: Partial group_expr: Column(c1) aggr_expr: DistinctCount(Column(c2)) schema: c1: any c2: LargeList(UInt8) input: CsvExec: schema: c1: any c2: UInt8 The `DistinctCount` accumulates each `UInt8` into a list of distinct `UInt8`. No counts are collected yet, this is a partial result: lists of distinct values. In the `RecordBatch`, this is a `LargeListArray<UInt8>` column. After the `MergeExec`, each list in `LargeListArray<UInt8>` is accumulated by `DistinctCountReduce` (via `accumulate_batch()`), producing the _final_ sets of distinct values. Finally, given the finalized sets of distinct values, the counts are computed (always as `UInt64`). (b) Single Partition: HashAggregateExec: mode: NoPartial group_expr: Column(c1) aggr_expr: DistinctCountReduce(Column(c2)) schema: c1: any c2: UInt64 input: CsvExec: schema: c1: any c2: UInt8 This scenario is unlike the multiple partition scenario: `DistinctCount` is _not_ used, and there are no partial sets of distinct values. Rather, in a single `HashAggregateExec` stage, each `UInt8` is accumulated into a distinct value set, then the counts are computed at the end of the stage. `DistinctCountReduce` is used, but note that unlike the multiple partition case, it accumulates scalars via `accumulate_scalar()`. There is a new aggregation mode: `NoPartial`. In summary, the modes are: - `NoPartial`: used in single-stage aggregations - `Partial`: used as the first stage of two-stage aggregations - `Final`: used as the second stage of two-stage aggregaions Prior to the new `NoPartial` mode, `Partial` was handling both of what are now the responsibilities of `Partial` and `NoPartial`. No distinction was required, because _non-distinct_ aggregations (such as count, sum, min, max, and avg) do not need the distinction: the first aggregation stage is always the same, regardless of whether the aggregation is one-stage or two-stage. This is not the case for a _distinct_ count aggregation, and we can see that in the physical plans above. Closes #8222 from drusso/ARROW-10043 Authored-by: Daniel Russo <danrusso@gmail.com> Signed-off-by: Andy Grove <andygrove@nvidia.com>