Commits

Jorge C. Leitao authored 84e4b15889b
ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams. Recently, we introduced `async` to `execute`. This allowed us to parallelize multiple partitions as we denote an execution of a part (of a partition) as the unit of work. However, a part is often a large task composing multiple batches and steps. This PR makes all our execution nodes return a dynamically-typed [`Stream<Item = ArrowResult<RecordBatch>>`](https://docs.rs/futures/0.3.6/futures/stream/trait.Stream.html) instead of an Iterator. For reference, a Stream is an iterator of futures, which in this case is a future of a `Result<RecordBatch>`. This effectively breaks the execution in smaller units of work (on which an individual unit is an operation returns a `Result<RecordBatch>`) allowing each task to chew smaller bits. This adds `futures` as a direct dependency of DataFusion (it was only a dev-dependency). This leads to a +2% degradation in aggregates in micro benchmarking, which IMO is expected given that there is more context switching to handle. However, I expect (hope?) this to be independent of the number of batches and partitions, and be offset by any async work we perform to our sources (readers) and sinks (writers). I did not take the time to optimize - the primary goal was to implement the idea, have it compile and pass tests, and have some discussion about it. I expect that we should be able to replace some of our operations by `join_all`, thereby scheduling multiple tasks at once (instead of waiting one by one). <details> <summary>Benchmarks</summary> Aggregates: ``` aggregate_query_no_group_by 15 12 time: [782.11 us 784.12 us 786.19 us] change: [+1.1421% +2.5252% +3.8963%] (p = 0.00 < 0.05) Performance has regressed. Found 10 outliers among 100 measurements (10.00%) 4 (4.00%) high mild 6 (6.00%) high severe aggregate_query_group_by 15 12 time: [5.8751 ms 5.9206 ms 5.9679 ms] change: [+1.0645% +2.0027% +3.0333%] (p = 0.00 < 0.05) Performance has regressed. aggregate_query_group_by_with_filter 15 12 time: [2.7652 ms 2.7983 ms 2.8340 ms] change: [+0.3278% +1.8981% +3.3819%] (p = 0.02 < 0.05) Change within noise threshold. ``` Math: ``` sqrt_20_9 time: [6.9844 ms 7.0582 ms 7.1363 ms] change: [+0.0557% +1.5625% +3.0408%] (p = 0.05 < 0.05) Change within noise threshold. Found 3 outliers among 100 measurements (3.00%) 2 (2.00%) high mild 1 (1.00%) high severe sqrt_20_12 time: [2.8350 ms 2.9504 ms 3.1204 ms] change: [+3.8751% +8.2857% +14.671%] (p = 0.00 < 0.05) Performance has regressed. Found 5 outliers among 100 measurements (5.00%) 2 (2.00%) high mild 3 (3.00%) high severe sqrt_22_12 time: [14.888 ms 15.242 ms 15.620 ms] change: [+7.6388% +10.709% +14.098%] (p = 0.00 < 0.05) Performance has regressed. Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) high mild 2 (2.00%) high severe sqrt_22_14 time: [23.710 ms 23.817 ms 23.953 ms] change: [-4.3401% -3.1824% -2.0952%] (p = 0.00 < 0.05) Performance has improved. Found 11 outliers among 100 measurements (11.00%) 5 (5.00%) high mild 6 (6.00%) high severe ``` </details> I admit that this is a bit outside my comfort zone, and someone with more experience in `async/await` could be of help. IMO this would integrate very nicely with ARROW-10307, ARROW-9275, I _think_ it would also help ARROW-9707, and I _think_ that it also opens the possibility consuming / producing batches from/to sources and sinks from arrow-flight / IPC Closes #8473 from jorgecarleitao/streams Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>