Commits


Miguel Pragier authored and GitHub committed 036a22eaff1
GH-40089: [Go] Concurrent Recordset for receiving huge recordset (#40090) ### Rationale for this change Enabling Support for Large Recordsets in Go FlightSQL Driver Replacing **download-all-at-once->read-later** with **download-chunk-as-reading** approach. The primary motivation for these changes is to enhance the driver's capability to handle large recordsets without the need for unnecessary memory pre-allocations. By implementing a concurrent streaming approach, the driver avoids loading the entire dataset into memory at once. ### Description: Implementing Concurrent Record Streaming to Better Support the Handling of Large Recordsets. For retrieving a recordset, the current implementation works as follows: - An SQL query results in a set of [endpoints] and a query ticket. - Each [endpoint] is requested (with the generated ticket), and its response is a [reader]. - Each reader is iterated for records. These records are, in fact, arrays of rows. - All the retrieved rows are stored at once in an array. - This means that data, potentially comprising billions of rows, is synchronously read into an array. - After this array is filled, it is then returned, all at once, to the consumer. - This can result in out-of-memory failures, or at the very least, unnecessary waiting times and huge pre-allocations. ### Proposed Changes: Iterate over [endpoints], [readers], and [records] ad hoc, reading only the necessary data according to consumer demand. ### What changes are included in this PR? **1. Introduction of `sync.Mutex`:** - The `Rows` struct has been updated to include a `currentRecordMux` mutex. This addition ensures that operations involving the release of the current record are thread-safe, thus preventing potential race conditions in a concurrent environment. **2. Channels for Asynchronous Record Fetching:** - A new buffered channel, `recordChan`, has been added to the `Rows` struct. This channel permits the driver to asynchronously fetch and queue records. It provides a non-blocking mechanism to manage incoming records, which is particularly advantageous when dealing with large recordsets. **3. Asynchronous Record Streaming via Goroutines:** - The `streamRecordset` function has been introduced and is designed to run concurrently using goroutines. This modification permits the driver to begin processing records as soon as they are received, without having to wait for the entire recordset to be loaded into memory. **4. Improved Record Management:** - A new method, `releaseRecord`, has been created to manage the lifecycle of the current record. This method ensures that resources are released when a record is no longer needed, thus reducing the memory footprint when processing large datasets. **5. Refactoring of the `Next` Method:** - The `Next` method in the `Rows` struct has been refactored to suit the new streaming model. It now efficiently waits for and retrieves the next available record from the `recordChan` channel, enabling a smooth and memory-efficient iteration over large datasets. ### Are These Changes Tested? The proposed changes have been validated against existing tests. ### Are There Any User-Facing Changes? No, there are no user-facing changes. * Closes: #40089 Lead-authored-by: miguel pragier <miguel.pragier@ebnerstolz.de> Co-authored-by: Miguel Pragier <miguelpragier@gmail.com> Signed-off-by: Matt Topol <zotthewizard@gmail.com>