-
Notifications
You must be signed in to change notification settings - Fork 0
C Scan API #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
C Scan API #32
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,393 @@ | ||
| - Start Date: 2026-03-13 | ||
| - Authors: Mikhail Kot | ||
|
|
||
| # C Scan API | ||
|
|
||
| There is a scan API for Rust-compatible code available at | ||
| https://github.com/vortex-data/vortex/tree/develop/vortex-scan. | ||
|
|
||
| The goal of introducing C scan API is to make integration with non-Rust query | ||
| engines like Velox easier. In theory, such engines can use cxx.rs, but it | ||
| requires a lot of binding code and runtime bridging (see below). | ||
|
|
||
| There exists a partial scan API for C exposed over files [1], but it's limited | ||
| to single-file URIs without globs, and it's also not thread-safe. Its main | ||
| flaws, however, are: | ||
|
|
||
| - Inability to export to well-known format like ArrowArrayStream, | ||
| - Lack of introspection over produced `vx_array`s, and | ||
| - Inability to control scan on a level lower than just getting partitions and | ||
| `vx_array`s with filters and projections pre-configured. | ||
|
|
||
| Why does Scan API need to expose `vx_array`s? What's the benefit of using | ||
| own format over ArrowArrayStream? | ||
|
|
||
| The answer is "compression". Vortex DTypes don't exactly match with Arrow | ||
| physical encodings, so if you have i.e. a ConstantArray, you need to | ||
| decompress it into something Arrow-compatible. This was a major regression | ||
| in Duckdb integration. | ||
|
|
||
| C++ API works it out by allowing to produce an ArrowArrayStream interface out of | ||
| ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid | ||
| while adding C interfaces. C++ API future is outside of scope of this proposal | ||
| but it's expected to wrap C API directly over time, removing dependency on | ||
| cxx.rs for vortex-cxx. | ||
|
|
||
| ## Customization points | ||
|
|
||
| Main goal of providing customization points is to do as little work as possible | ||
| in Vortex code and as much work as possible in the query engine. Some engines | ||
| may request control over scan execution like pruning. Engines like Duckdb have | ||
| own remote storage, caching, and globbing implementations. API still needs an | ||
| ability to fall back to own implementation. | ||
|
|
||
| Still, Scan API is a relatively high-level concept, and if its level is not | ||
| suffifient, engines can resort to using a layout reader plan and executing it | ||
| directly. | ||
|
|
||
| ## Datasource | ||
|
|
||
| A Datasource is a reference to multiple possibly remote files. When created, it | ||
| opens first file to determine the schema from DType, all other operations are | ||
| deferred till a scan is requested. You can request multiple file scans from a | ||
| Datasource. | ||
|
|
||
| ```c | ||
| // Opaque, generated by bindgen | ||
| typedef struct vx_data_source vx_data_source; | ||
| typedef struct vx_file_handle vx_file_handle; | ||
|
|
||
| typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); | ||
| typedef void (*vx_glob_callback)(void* userdata, const char* file); | ||
|
|
||
| typedef struct vx_data_source_options { | ||
| // (1) Filesystem customization | ||
|
|
||
| bool (*fs_use_vortex)(const char* schema, const char* path); | ||
| void (*fs_set_userdata)(void* userdata); | ||
|
|
||
| // should be called after glob expansion, single-file mode | ||
| vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err); | ||
| vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err); | ||
|
|
||
| // non-recursive, callback is invoked with each path | ||
| void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err); | ||
|
|
||
| void fs_close(vx_file_handle handle); | ||
| uint64_t fs_size(vx_file_handle handle, vx_error *err); | ||
|
|
||
| // positional read, doesn't change file open cursor | ||
| void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, | ||
| vx_error *err); | ||
|
|
||
| // not needed for scanning but makes FS API complete | ||
| void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, | ||
| vx_error *err); | ||
| void fs_sync(vx_file_handle handle, vx_error *err); | ||
|
|
||
| // (2) Globbing customization | ||
|
|
||
| void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err); | ||
|
|
||
| /// (3) Cache customization | ||
|
|
||
| void* (*cache_init)(vx_error* err); | ||
| void (*cache_free)(void* cache, vx_error* err); | ||
| void (*cache_get)(void* cache, const char* key, void** value, vx_error* err); | ||
| void (*cache_put)(void* cache, const char* key, void* value, vx_error* err); | ||
| void (*cache_delete)(void* cache, const char* key, vx_error* err); | ||
| } vx_data_source_options; | ||
|
|
||
| // Addition to existing DType API, returns owned ArrowSchema which needs to | ||
| // be freed by the caller using release callback. If err is populated, out | ||
| // is not set. | ||
| void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err); | ||
|
|
||
| /// Create a new owned datasource which must be freed by the caller. | ||
| const vx_data_source * | ||
| vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); | ||
|
|
||
| // datasource is Arc'd inside so a clone creates another reference rather | ||
| // than cloning the state fully. | ||
| const vx_data_source *vx_data_source_clone(const vx_data_source *ptr); | ||
|
|
||
| // vx_dtype's lifetime is bound to datasource's lifetime, caller doesn't need | ||
| // to free it | ||
| const vx_dtype *vx_data_source_dtype(const vx_data_source *ds); | ||
|
|
||
| typedef enum { | ||
| VX_CARD_UNKNOWN = 0, | ||
| VX_CARD_ESTIMATE = 1, | ||
| VX_CARD_MAXIMUM = 2, | ||
| } vx_cardinality; | ||
| typedef struct { | ||
| vx_cardinality cardinality; | ||
| uint64_t rows; | ||
| } vx_data_source_row_count; | ||
|
|
||
| void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); | ||
| void vx_data_source_free(const vx_data_source *ptr); | ||
| ``` | ||
| 1. Open local or remote file. Allow using vortex's filesystem or query engine's | ||
| filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for | ||
| local reads, but vortex fs for remote reads. Remote filesystem customization | ||
| point has the benefit of not duplicating credentials e.g. S3 access key | ||
| between query engine and vortex. Local implementation may be more performant. | ||
| Vortex rolled back full duckdb fs usage due to performance implications. | ||
| 2. Open single file or multiple files. Query engines may have their own glob | ||
| expansion [4] which does HTTP IO. | ||
| 3. Cache intermediate results. Main use case for Vortex is caching schema in | ||
| memory for footer cache and conversion results. Benefits are in no | ||
| requirement to open first file in a glob eagerly if there's a cache hit. | ||
| Vortex had had an integration with Duckdb object cache which was deleted in | ||
| favor of own implementation which led to a performance regression. | ||
| When all three customization points are implemented, Vortex offloads all IO | ||
| to a query engine. | ||
| Why not expose API to consume byte ranges or emit byte range requests for | ||
| the query engine to read and populate buffers? | ||
| This approach is indeed easier than using a vtable with a specific | ||
| implementation, and requires slightly more scaffolding on the query engine | ||
| side, but it's significantly easier to implement on Vortex side and it is | ||
| coherent with current Rust implementation. | ||
| Similar implementation can be found in Arrow's RandomAccessFile or | ||
| parquet-rs's AsyncFileReader. | ||
| However, as we're thinking of changing our Rust API, we can try to invest | ||
| time into this approach as well. | ||
| Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows | ||
| Duckdb integration to abstract memory allocations to the database. | ||
| ## Runtime bridging | ||
| In Rust API, a Datasource produces a stream of Partitions. A Partition produces | ||
| a stream of Arrays. API is required to be used in an async runtime, current | ||
| runtime for Vortex is tokio. | ||
| Velox uses a non-coroutine but async runtime based on Folly executors. Engines | ||
| like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync | ||
| based on thread pools. Postgres runtime is sync based on processes. Some of | ||
| engines may use OS-specific IO like `io_uring`. | ||
| All potential usages of our API may be grouped into 4 cases: | ||
| - sync, single-thread runtime, trivial. (1) | ||
| - sync, multi-thread runtime. (2) | ||
| - async, multi-thread/coroutine. (3) | ||
| - async, single-thread. (4) | ||
| Scan/Partition suggestions outlined below play well with (2) but not with (3) | ||
| and (4) because Vortex has its own runtime which will block on current thread | ||
| when i.e. getting an Array out of Partition. An async-friendly API basically | ||
| means exposing a coroutine/state machine which hands control over to the host on | ||
| IO. | ||
| As Joe mentioned, we want to get away from the concept of partitions and emit | ||
| chunks of vx_array's directly from the scan. In this case, such state machine | ||
| may be expressed with roughly the following states: | ||
| ``` | ||
| Passed a file handle | ||
| START -> NEED_IO (offset, len) -> EXECUTE -> DONE | ||
| ^ When passed a file handle, instructs host to read following byte | ||
| range into buffer and return a handle to this buffer. | ||
| ^ Decompresses the buffer (executes the Array) | ||
| one step into other buffer | ||
| ^ | ||
| Array is executed/canonicalized to the form host can work with. | ||
| Host now transfers data from buffers to its own output format. | ||
| ``` | ||
| However, as the future of such approach is unclear, a async-unfriendly option is | ||
| described below | ||
|
Comment on lines
+205
to
+206
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be valuable to actually think through what an async API looks like in practice, because it is almost certainly the case that the devil is in the details. Otherwise, this should be in an unresolved questions section and we should acknowledge we are not going to work on this now. |
||
| ## Scan | ||
| Scan iterators: | ||
| ```c | ||
| ``` | ||
|
Comment on lines
+212
to
+213
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empty? |
||
|
|
||
| Scan options: | ||
|
|
||
| ```c | ||
| typedef enum { | ||
| VX_S_INCLUDE_ALL = 0, | ||
| VX_S_INCLUDE_RANGE = 1, | ||
| VX_S_EXCLUDE_RANGE = 2, | ||
| } vx_scan_selection_include; | ||
|
|
||
| typedef struct { | ||
| uint64_t *idx; | ||
| size_t idx_len; | ||
| // Roaring bitmaps won't be supported as for now | ||
| // If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query | ||
| // engine on scan invocation and can be freed after a scan iterator is requested | ||
| vx_scan_selection_include include; | ||
| } vx_scan_selection; | ||
|
|
||
| typedef struct vx_scan_selection { | ||
| const size_t* idx; | ||
| size_t idx_len; | ||
| } vx_scan_selection; | ||
|
Comment on lines
+233
to
+236
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this a remnant of a revision (this exists above) |
||
|
|
||
| typedef struct vx_scan_options { | ||
| // May be NULL which means "return all columns" | ||
| const vx_expression *projection; | ||
|
|
||
| // May be NULL which means "no filter" | ||
| const vx_expression *filter; | ||
|
|
||
| // Set both to 0 to indicate no range request | ||
| // Inclusive | ||
| uint64_t row_range_begin; | ||
| // Exclusive | ||
| uint64_t row_range_end; | ||
|
|
||
| vx_scan_selection selection; | ||
|
|
||
| // 0 for no limit | ||
| uint64_t limit; | ||
| int ordered; | ||
| } vx_scan_options; | ||
| ``` | ||
|
|
||
| Scan interface: | ||
|
|
||
| ```c | ||
| typedef struct vx_scan vx_scan; | ||
|
|
||
| /** | ||
| * A partition is a contiguous chunk of memory from which you can interatively | ||
| * get vx_arrays. | ||
| */ | ||
| typedef struct vx_partition vx_partition; | ||
|
|
||
| typedef enum { | ||
| VX_ESTIMATE_UNKNOWN = 0, | ||
| VX_ESTIMATE_EXACT = 1, | ||
| VX_ESTIMATE_INEXACT = 2, | ||
| } vx_estimate_boundary; | ||
|
|
||
| typedef struct { | ||
| // If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated | ||
| uint64_t estimate; | ||
| vx_estimate_boundary boundary; | ||
| } vx_estimate; | ||
|
|
||
| // Users are encouraged to create worker threads depending on est->estimate to | ||
| // distribute work. | ||
| // opts and est may be nullptr. | ||
| // Requesting a scan doesn't do anything unless vx_partition_next is called. | ||
| vx_scan * | ||
| vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); | ||
|
|
||
| /** | ||
| * Get next owned partition out of a scan request. | ||
| * Caller must free this partition using vx_partition_free. | ||
| * This method is thread-safe. | ||
| * If using in a sync multi-thread runtime, users are encouraged to create a | ||
| * worker thread per partition. | ||
| * Returns NULL and doesn't set err on exhaustion. | ||
| * Returns NULL and sets err on error. | ||
| */ | ||
| vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); | ||
|
|
||
| // Request an array stream in Arrow format from a partition, consuming it | ||
| // fully. Not thread-safe, should be called once. | ||
| // stream is owned and must be freed using the release callback | ||
| void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err); | ||
|
|
||
| // Thread-unsafe. Get an owned vx_array of an iterator. | ||
| // Returns NULL if iterator is exhausted. Array is owned and must be | ||
| // freed by caller. | ||
| const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); | ||
| ``` | ||
| There are examples of APIs also exposing batch reads, but I doubt this is a good | ||
| option as for every ArrayRef the work that needs to be done to execute it may be | ||
| significant, and if you want to parallelize work, you can use this with | ||
| partitions, so each thread will be still busy with one ArrayRef at a time. | ||
| It can be introduced in the future. | ||
| Scan functions are lazy as they operate on streams and it is | ||
| consumer's code responsibility to use parallelism at the desired degree. | ||
| ## What to do with `vx_array` | ||
| The main question is how to transform outputs of iteration, vx_array, into | ||
| something query engines can operate with. You need to execute the array | ||
| iteratively till you recognize data and start exporting it. Duckdb integration | ||
| is mostly written in Rust with C++ code calling Rust's vtable functions. Rust | ||
| code does all data export. PoC implementation moves Duckdb to use C API but | ||
| leaves existing Rust code for exporting `vx_array` into DataChunk. | ||
| However, the goal is not to interface with Rust code, so as a baseline the API | ||
| provides a way to scan partitions directly into ArrowArrayStream which should be | ||
| good enough for most consumers. | ||
| ## Cancellation | ||
| There will be no option to cancel the scan as this isn't possibe on Rust API | ||
| either and this is a low priority task. | ||
| ## Testing | ||
| C API doesn't have any testing. I suggest setting up a Catch3 testing target and | ||
| a CMake library for C API using FetchContent to download Catch. This way people | ||
| not working on Duckdb integration or FFI wouldn't need CMake and Catch. To | ||
| integrate C tests with `cargo test`, we can write a `build.rs` extension which | ||
| parses C test names and codegenerates rust tests targets calling to Catch. | ||
| ## Duckdb integration PoC | ||
| ``` | ||
| before: | ||
| Duckdb side Vortex side | ||
|
|
||
| C++ C++ Rust | ||
| duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation | ||
|
|
||
| after: | ||
|
|
||
| C++ C++ C | ||
| duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* | ||
|
|
||
| * - vx_array -> DataChunk reuses existing Rust code | ||
| ``` | ||
| https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has an | ||
| implementation of using C Scan API for Duckdb scan integration. Duckdb has a | ||
| sync multi-threaded runtime, and table function is called from multiple threads | ||
| simultaneously. Users can save a per-thread state. | ||
| The integration splits into following parts: | ||
| - DType -> LogicalType integration, done sans Temporal extension. | ||
| - Table function binding (creating a DataSource), done. | ||
| - Global state initialization (creating a Scan), done sans filter pushdown. | ||
| - Local state initialization (export batch id), done. | ||
| - Utility functions like cardinality estimates, done. | ||
| - vx_array -> DataChunk export, delegated to existing Rust code. | ||
| On filter pushdown: projection pushdown requires exposing only `select()` | ||
| expression. On the other hand, filter pushdown requires `TableFilter -> Vortex | ||
| Expression` conversion which is significant porting so left out. | ||
| On DataChunk export: it requires exposing features like array optimization, | ||
| validity masks, and other features, so left out. | ||
| Table function uses Vortex _partition_ concepts as a work splitting term only, | ||
| i.e. one worker thread operating on one or multiple partitions. Each thread | ||
| pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then | ||
| works on its own partition without synchronization. | ||
| [1] `vx_file_scan` | ||
| [2] Need to control pruning | ||
| https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999 | ||
| [4] e.g. Duckdb MultiFileReader / MultiFileList | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the way we want to expose a cache?
I feel like for Vortex arrays specifically this is way too generic to be useful. What exactly would we be storing in the cache? Just vortex arrays, or things like stats? Partitions? Array trees? And if we are passing around
void *values then shouldn't there also be a size parameter somewhere? Also things related to ownership and eviction semantics, etc are missing.