Commits


Li Jin authored and GitHub committed 916c453edba
ARROW-16083: [C++] Implement AsofJoin execution node (#13028) ## Overview This is a work in progress implementation of the AsofJoin node in Arrow C++ compute. The code needs quite a bit of clean up but I have worked on this long enough that I think I benefit from some inputs/comments from Arrow maintainers about the high levels before I potentially spend too much time in the wrong direction. All Credit to @stmrtn (Steven Martin) who is the original author of the code. ## Implementation There are quite a bit of code and here is how it works at the high level: Classes: * `InputState`: A class that handles queuing for input batches and purging unneeded batches. There are one input state per input table. * `MemoStore`: A class that responsible for advancing row index and getting the latest row for each key for each key given a timestamp. (Latest timestamp that is <= the given timestamp) * `CompositeReferenceTable`: A class that is responsible for storing temporary output rows and produces RecordBatches from those rows. Algorithm: * The node takes one left side table and n right side tables, and produces a joined table * It is currently assumed that each input table will call `InputReceived` with time-ordered batches. `InputReceived` will queue the batches inside `InputState` (it doesn't do any work). There is a separate process thread that wakes up when there is new inputs and attempts to produces a output batch. If the current data is not enough to produce the output batch (i.e., we have not received all the potential right side rows that could be a match for the current left batch), it will wait for new inputs. * The process thread works as follows: 1. Advance left row index for the current batch. Then advance right tables to get the latest right row (i.e., latest right row with timestamp <= left row timestamp) 2. Once advances are done, it will continue to check to produce the output row for the current left row 3. Go to 1 until left batches are processed 4. Output batch for the current left batch 5. Purge batches that are no longer needed 6. Wait until enough batches are received to process the next left batch Entry point for the algorithm is `process()` ## TODO - [x] More Tests - [x] Decide if we can replace `CompositeReferenceTable` with sth that already exits (perhaps `RowEncoder`?) - [x] Life cycle management for the process thread (or whether or not we should have it) - [x] Lint & Code Style - [x] Handle null results properly - [x] Handle errors properly (e.g., unsupported types) - [x] Clean up debug statement ## Follow up - Handle more datatypes (both key and value) - Handle multiple keys - Change from column name to column index for time and key column (Substrait integration) - Look into using `table_builder` to reproduce the materialize logic. Authored-by: Li Jin <ice.xelloss@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>