diff --git a/Cargo.lock b/Cargo.lock index 0c1d847db67..776a631edf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10024,6 +10024,7 @@ dependencies = [ "url", "vortex", "vortex-array", + "vortex-ffi", "vortex-runend", "vortex-sequence", "vortex-utils", @@ -10070,6 +10071,7 @@ dependencies = [ name = "vortex-ffi" version = "0.1.0" dependencies = [ + "arrow-array", "async-fs", "cbindgen", "futures", diff --git a/Cargo.toml b/Cargo.toml index 260e5300ab4..6444bf60bba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -263,6 +263,7 @@ vortex-decimal-byte-parts = { version = "0.1.0", path = "encodings/decimal-byte- vortex-error = { version = "0.1.0", path = "./vortex-error", default-features = false } vortex-fastlanes = { version = "0.1.0", path = "./encodings/fastlanes", default-features = false } vortex-file = { version = "0.1.0", path = "./vortex-file", default-features = false } +vortex-ffi = { version = "0.1.0", path = "./vortex-ffi", default-features = false } vortex-flatbuffers = { version = "0.1.0", path = "./vortex-flatbuffers", default-features = false } vortex-fsst = { version = "0.1.0", path = "./encodings/fsst", default-features = false } vortex-io = { version = "0.1.0", path = "./vortex-io", default-features = false } diff --git a/vortex-array/src/dtype/field_names.rs b/vortex-array/src/dtype/field_names.rs index 737ad0cfd52..f148954b30b 100644 --- a/vortex-array/src/dtype/field_names.rs +++ b/vortex-array/src/dtype/field_names.rs @@ -341,6 +341,12 @@ impl From> for FieldNames { } } +impl From> for FieldNames { + fn from(value: Vec) -> Self { + Self(value.into_iter().map(FieldName::from).collect()) + } +} + impl From<&[FieldName]> for FieldNames { fn from(value: &[FieldName]) -> Self { Self(Arc::from(value)) diff --git a/vortex-cxx/cpp/include/vortex/scan.hpp b/vortex-cxx/cpp/include/vortex/scan.hpp index 7debb9ef6aa..b5cd075f9f7 100644 --- a/vortex-cxx/cpp/include/vortex/scan.hpp +++ b/vortex-cxx/cpp/include/vortex/scan.hpp @@ -105,4 +105,4 @@ class ScanBuilder { rust::Box impl_; }; -} // namespace vortex \ No newline at end of file +} // namespace vortex diff --git a/vortex-duckdb/Cargo.toml b/vortex-duckdb/Cargo.toml index 2d095c1c3e5..6c821c2ac4b 100644 --- a/vortex-duckdb/Cargo.toml +++ b/vortex-duckdb/Cargo.toml @@ -37,6 +37,7 @@ parking_lot = { workspace = true } paste = { workspace = true } tracing = { workspace = true } url = { workspace = true } +vortex-ffi = { workspace = true } vortex = { workspace = true, features = ["files", "tokio", "object_store"] } vortex-utils = { workspace = true, features = ["dashmap"] } @@ -55,6 +56,6 @@ workspace = true [build-dependencies] bindgen = { workspace = true } cbindgen = { workspace = true } -cc = { workspace = true } +cc = { workspace = true , features = ["parallel"] } reqwest = { workspace = true } zip = { workspace = true } diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index be20dde4b8a..a15e7854cdd 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -41,6 +41,8 @@ const SOURCE_FILES: [&str; 18] = [ "cpp/vector_buffer.cpp", ]; +const FFI_INCLUDE: &str = "../vortex-ffi/cinclude"; + const DOWNLOAD_MAX_RETRIES: i32 = 3; const DOWNLOAD_TIMEOUT: u64 = 90; @@ -302,6 +304,7 @@ fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path) { .rustified_non_exhaustive_enum("DUCKDB_TYPE") .size_t_is_usize(true) .clang_arg(format!("-I{}", duckdb_include_dir.display())) + .clang_arg(format!("-I{}", crate_dir.join(FFI_INCLUDE).display())) .clang_arg(format!("-I{}", crate_dir.join("cpp/include").display())) .generate_comments(true) // Tell cargo to invalidate the built crate whenever any of the @@ -323,13 +326,18 @@ fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path) { } fn cpp(duckdb_include_dir: &Path) { + //println!("cargo:rustc-link-arg=-fsanitize=address"); cc::Build::new() .std("c++20") // Duckdb sources fail -Wno-unused-parameter .flags(["-Wall", "-Wextra", "-Wpedantic", "-Wno-unused-parameter"]) + // TODO + //.flag("-fsanitize=address") .cpp(true) + .debug(true) .include(duckdb_include_dir) .include("cpp/include") + .include(FFI_INCLUDE) .files(SOURCE_FILES) .compile("vortex-duckdb-extras"); // bindgen generates rerun-if-changed for .h/.hpp files diff --git a/vortex-duckdb/cpp/CMakeLists.txt b/vortex-duckdb/cpp/CMakeLists.txt index 9671d93dd6d..fdc96e472a5 100644 --- a/vortex-duckdb/cpp/CMakeLists.txt +++ b/vortex-duckdb/cpp/CMakeLists.txt @@ -23,7 +23,7 @@ if (NOT CMAKE_BUILD_TYPE) endif() # Enable compiler warnings (matching build.rs flags). -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic -Wno-unused-parameter") # Find DuckDB include directory via the symlink created by build.rs. # The symlink points to target/duckdb-source-vX.Y.Z which contains duckdb-X.Y.Z/ @@ -39,7 +39,7 @@ else() ) endif() -include_directories(include ${DUCKDB_INCLUDE}) +include_directories(include ${DUCKDB_INCLUDE} ../../vortex-ffi/cinclude) # Auto-discover C++ source files file(GLOB CPP_SOURCES "*.cpp") diff --git a/vortex-duckdb/cpp/include/duckdb_vx/duckdb_diagnostics.h b/vortex-duckdb/cpp/include/duckdb_vx/duckdb_diagnostics.h index e294af29bad..65d4e7fff9d 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/duckdb_diagnostics.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/duckdb_diagnostics.h @@ -18,9 +18,10 @@ _Pragma("GCC diagnostic ignored \"-Wall\"") \ _Pragma("GCC diagnostic ignored \"-Wextra\"") \ _Pragma("GCC diagnostic ignored \"-Wpedantic\"") + _Pragma("GCC diagnostic ignored \"-Wunused-parameter\"") #define DUCKDB_INCLUDES_END _Pragma("GCC diagnostic pop") #else #define DUCKDB_INCLUDES_BEGIN #define DUCKDB_INCLUDES_END #endif -// clang-format on \ No newline at end of file +// clang-format on diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index e8f483514a5..8bd54315da9 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -12,7 +12,22 @@ #include "error.h" #include "table_filter.h" #include "duckdb_vx/data.h" -#include "duckdb_vx/client_context.h" +#include "duckdb_vx/duckdb_diagnostics.h" + +#ifdef __cplusplus + DUCKDB_INCLUDES_BEGIN + #include "duckdb/common/arrow/arrow.hpp" + DUCKDB_INCLUDES_END + + using FFI_ArrowSchema = ArrowSchema; + using FFI_ArrowArrayStream = ArrowArrayStream; +#else + // TODO nanoarrow + typedef void FFI_ArrowSchema; + typedef void FFI_ArrowArrayStream; +#endif + +#include "vortex.h" #ifdef __cplusplus /* If compiled as C++, use C ABI */ extern "C" { @@ -150,8 +165,6 @@ typedef struct { duckdb_vx_string_map (*to_string)(void *bind_data); // void *dynamic_to_string; - double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state); - idx_t (*get_partition_data)(const void *bind_data, void *init_global_data, void *init_local_data, @@ -171,10 +184,18 @@ typedef struct { bool sampling_pushdown; bool late_materialization; idx_t max_threads; + + // PoC hack: retain Rust exporter code + // return local batch id + uint64_t (*export_array)(const vx_array* arr, duckdb_data_chunk chunk); + } duckdb_vx_tfunc_vtab_t; // A single function for configuring the DuckDB table function vtable. -duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const duckdb_vx_tfunc_vtab_t *vtab); +duckdb_state duckdb_vx_tfunc_register( + duckdb_database ffi_db, + const duckdb_vx_tfunc_vtab_t *vtab +); #ifdef __cplusplus /* End C ABI */ } diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index fc4ac1b66da..8405941ea07 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -1,7 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -#include "duckdb_vx/duckdb_diagnostics.h" +#include "duckdb_vx/table_function.h" +#include "vortex.h" DUCKDB_INCLUDES_BEGIN #include "duckdb.h" @@ -13,336 +14,525 @@ DUCKDB_INCLUDES_BEGIN #include "duckdb/parser/parsed_data/create_table_function_info.hpp" DUCKDB_INCLUDES_END -#include "duckdb_vx.h" -#include "duckdb_vx/data.hpp" -#include "duckdb_vx/error.hpp" - using namespace duckdb; -namespace vortex { -struct CTableFunctionInfo final : TableFunctionInfo { - explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) - : vtab(vtab), max_threads(vtab.max_threads) { - } +std::string to_string(const vx_string* str) { + return { vx_string_ptr(str), vx_string_len(str) }; +} - duckdb_vx_tfunc_vtab_t vtab; - idx_t max_threads; -}; +std::string move_vx_err(vx_error* error) { + const vx_string* vx_str = vx_error_get_message(error); + string str{vx_string_ptr(vx_str), vx_string_len(vx_str)}; + vx_error_free(error); + return str; +} -struct CTableBindData final : TableFunctionData { - CTableBindData(unique_ptr info_p, unique_ptr ffi_data_p) - : info(std::move(info_p)), ffi_data(std::move(ffi_data_p)) { +LogicalTypeId from_ptype(vx_ptype ptype) { + using enum LogicalTypeId; + switch (ptype) { + case PTYPE_U8: return UTINYINT; + case PTYPE_U16: return USMALLINT; + case PTYPE_U32: return UINTEGER; + case PTYPE_U64: return UBIGINT; + case PTYPE_I8: return TINYINT; + case PTYPE_I16: return SMALLINT; + case PTYPE_I32: return INTEGER; + case PTYPE_I64: return BIGINT; + case PTYPE_F16: throw BinderException("F16 type not supported in Duckdb"); + case PTYPE_F32: return FLOAT; + case PTYPE_F64: return DOUBLE; } + throw BinderException( + StringUtil::Format("value %d out of range for vx_ptype", ptype)); +} - unique_ptr Copy() const override { - assert(info->vtab.bind_data_clone != nullptr); +LogicalType from_dtype(const vx_dtype* dtype); +LogicalType from_struct(const vx_dtype* dtype) { + const vx_struct_fields* struct_dtype = vx_dtype_struct_dtype(dtype); + uint64_t struct_size = vx_struct_fields_nfields(struct_dtype); + child_list_t children(struct_size); + for (uint64_t i = 0; i < struct_size; ++i) { + const vx_string* field_name = vx_struct_fields_field_name(struct_dtype, i); + const vx_dtype* field_dtype = vx_struct_fields_field_dtype(struct_dtype, i); + children[i] = { to_string(field_name), from_dtype(field_dtype) }; + } + + return LogicalType::STRUCT(children); +} - duckdb_vx_error error_out = nullptr; - const auto copied_ffi_data = info->vtab.bind_data_clone(ffi_data->DataPtr(), &error_out); - if (error_out) { - throw BinderException(IntoErrString(error_out)); +LogicalType from_dtype(const vx_dtype* dtype) { + using enum LogicalTypeId; + switch (vx_dtype_get_variant(dtype)) { + case DTYPE_NULL: return SQLNULL; + case DTYPE_BOOL: return BOOLEAN; + case DTYPE_PRIMITIVE: return from_ptype(vx_dtype_primitive_ptype(dtype)); + case DTYPE_UTF8: return VARCHAR; + case DTYPE_BINARY: return BLOB; + case DTYPE_STRUCT: return from_struct(dtype); + case DTYPE_DECIMAL: { + uint8_t width = vx_dtype_decimal_precision(dtype); + uint8_t scale = vx_dtype_decimal_scale(dtype); + return LogicalType::DECIMAL(width, scale); + }; + case DTYPE_LIST: { + LogicalType child_type = from_dtype(vx_dtype_list_element(dtype)); + return LogicalType::LIST(child_type); } - return make_uniq(make_uniq(info->vtab), - unique_ptr(reinterpret_cast(copied_ffi_data))); + case DTYPE_FIXED_SIZE_LIST: { + LogicalType child_type = from_dtype(vx_dtype_fixed_size_list_element(dtype)); + idx_t idx = vx_dtype_fixed_size_list_size(dtype); + return LogicalType::ARRAY(child_type, idx); + }; + case DTYPE_EXTENSION: { // TODO Temporal + throw BinderException("DTYPE_EXTENSION not supported"); + }; + }; + throw BinderException(StringUtil::Format("value %d out of range for vx_dtype", dtype)); +} + +// TODO This belongs in C++ part + +class Session { +public: + Session(): session(vx_session_new()) {} + + Session(const Session& other): session(vx_session_clone(other.session.get())) { } + Session(Session&& other) noexcept { + std::swap(session, other.session); + } + + Session& operator=(const Session& other) + { + session.reset(vx_session_clone(other.session.get())); + return *this; + } + + Session& operator=(Session&& other) noexcept { + std::swap(session, other.session); + return *this; } - unique_ptr info; - unique_ptr ffi_data; + vx_session* handle() const noexcept { return session.get(); } + +private: + struct deleter { + void operator()(vx_session* session) { + vx_session_free(session); + } + }; + std::unique_ptr session; }; -struct CTableGlobalData final : GlobalTableFunctionState { - explicit CTableGlobalData(unique_ptr ffi_data_p, idx_t max_threads_p) - : ffi_data(std::move(ffi_data_p)), max_threads(max_threads_p) { +class DataSource { +public: + DataSource(const Session& session, vx_data_source_options& opts) { + vx_error* err; + data_source.reset(vx_data_source_new(session.handle(), &opts, &err)); + if (err) { + throw BinderException(move_vx_err(err)); + } } - unique_ptr ffi_data; - idx_t max_threads; + DataSource(const DataSource& other) + : data_source(vx_data_source_clone(other.data_source.get())) { } - idx_t MaxThreads() const override { - return max_threads; + DataSource(DataSource&& other) noexcept { + std::swap(data_source, other.data_source); } -}; -struct CTableLocalData final : LocalTableFunctionState { - explicit CTableLocalData(unique_ptr ffi_data_p) : ffi_data(std::move(ffi_data_p)) { + DataSource& operator=(const DataSource& other) { + data_source.reset(vx_data_source_clone(other.data_source.get())); + return *this; } - unique_ptr ffi_data; -}; + DataSource& operator=(DataSource&& other) noexcept { + std::swap(data_source, other.data_source); + return *this; + } + + vx_data_source_row_count row_count() const noexcept { + vx_data_source_row_count rc; + vx_data_source_get_row_count(handle(), &rc); + return rc; + } + + const vx_data_source* handle() const noexcept { return data_source.get(); } -/** - * Result of the bind function encapsulates the output schema. - */ -struct CTableBindResult { - vector &return_types; - vector &names; +private: + struct deleter { + void operator()(const vx_data_source* data_source) { + vx_data_source_free(data_source); + } + }; + std::unique_ptr data_source; }; -double c_table_scan_progress(ClientContext &context, - const FunctionData *bind_data, - const GlobalTableFunctionState *global_state) { - auto &bind = bind_data->Cast(); - duckdb_client_context c_ctx = reinterpret_cast(&context); - void *const c_bind_data = bind.ffi_data->DataPtr(); - void *const c_global_state = global_state->Cast().ffi_data->DataPtr(); - return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state); -} +class Array { +public: + const vx_array* handle() const noexcept { return array.get(); } +private: + friend class Partition; -unique_ptr c_bind(ClientContext &context, - TableFunctionBindInput &input, - vector &return_types, - vector &names) { - const auto &info = input.table_function.function_info->Cast(); + Array(const vx_array* array): array(array) {} - // Setup bind info to pass into the callback. - CTableBindResult result = { - return_types, - names, + struct deleter { + void operator()(const vx_array* array) { + vx_array_free(array); + } }; + std::unique_ptr array; +}; - duckdb_vx_error error_out = nullptr; - auto ctx = reinterpret_cast(&context); - auto ffi_bind_data = info.vtab.bind(ctx, - reinterpret_cast(&input), - reinterpret_cast(&result), - &error_out); - if (error_out) { - throw BinderException(IntoErrString(error_out)); +class Partition { +public: + Partition(const Partition&) = delete; + Partition& operator=(const Partition&) = delete; + Partition(Partition&& other) noexcept { + std::swap(partition, other.partition); } - return make_uniq(make_uniq(info.vtab), - unique_ptr(reinterpret_cast(ffi_bind_data))); -} + std::optional next_array() { + vx_error* err = nullptr; + const vx_array* array = vx_partition_next(handle(), &err); + if (err) { + throw BinderException(move_vx_err(err)); + } + if (!array) return std::nullopt; + return Array{array}; + } -unique_ptr c_init_global(ClientContext &context, TableFunctionInitInput &input) { - const auto &bind = input.bind_data->Cast(); + vx_partition* handle() const noexcept { return partition.get(); } + +private: + friend class Scan; + Partition(vx_partition* partition): partition(partition) {} - duckdb_vx_tfunc_init_input ffi_input = { - .bind_data = bind.ffi_data->DataPtr(), - .column_ids = input.column_ids.data(), - .column_ids_count = input.column_ids.size(), - .projection_ids = input.projection_ids.data(), - .projection_ids_count = input.projection_ids.size(), - .filters = reinterpret_cast(input.filters.get()), - .client_context = reinterpret_cast(&context), + struct deleter { + void operator()(vx_partition* partition) { + vx_partition_free(partition); + } }; + std::unique_ptr partition; +}; - duckdb_vx_error error_out = nullptr; - auto ffi_global_data = bind.info->vtab.init_global(&ffi_input, &error_out); - if (error_out) { - throw BinderException(IntoErrString(error_out)); - } - return make_uniq( - unique_ptr(reinterpret_cast(ffi_global_data)), - bind.info->max_threads); -} +class Scan { +public: + Scan(const DataSource& data_source, vx_scan_options& options) { + vx_error* err = nullptr; + scan.reset(vx_data_source_scan(data_source.handle(), &options, &err)); + if (err) { + throw BinderException(move_vx_err(err)); + } + } -unique_ptr c_init_local(ExecutionContext &context, - TableFunctionInitInput &input, - GlobalTableFunctionState *global_state) { - const auto &bind = input.bind_data->Cast(); - auto global_data = global_state->Cast().ffi_data->DataPtr(); - - duckdb_vx_tfunc_init_input ffi_input = { - .bind_data = bind.ffi_data->DataPtr(), - .column_ids = input.column_ids.data(), - .column_ids_count = input.column_ids.size(), - .projection_ids = input.projection_ids.data(), - .projection_ids_count = input.projection_ids.size(), - .filters = reinterpret_cast(input.filters.get()), - .client_context = reinterpret_cast(&context), - }; + Scan(const Scan&) = delete; + Scan& operator=(const Scan&) = delete; - duckdb_vx_error error_out = nullptr; - auto ffi_local_data = bind.info->vtab.init_local(&ffi_input, global_data, &error_out); - if (error_out) { - throw BinderException(IntoErrString(error_out)); + Scan(Scan&& other) noexcept { + std::swap(scan, other.scan); } - return make_uniq( - unique_ptr(reinterpret_cast(ffi_local_data))); -} - -void c_function(ClientContext &context, TableFunctionInput &input, DataChunk &output) { - const auto &bind = input.bind_data->Cast(); + Scan& operator= (Scan&& other) noexcept { + std::swap(scan, other.scan); + return *this; + } - auto ctx = reinterpret_cast(&context); - const auto bind_data = bind.ffi_data->DataPtr(); - auto global_data = input.global_state->Cast().ffi_data->DataPtr(); - auto local_data = input.local_state->Cast().ffi_data->DataPtr(); - - duckdb_vx_error error_out = nullptr; - bind.info->vtab.function(ctx, - bind_data, - global_data, - local_data, - reinterpret_cast(&output), - &error_out); - if (error_out) { - throw InvalidInputException(IntoErrString(error_out)); + double progress() const noexcept { + return vx_scan_progress(handle()); } -} -void c_pushdown_complex_filter(ClientContext & /*context*/, - LogicalGet & /*get*/, - FunctionData *bind_data, - vector> &filters) { - if (filters.empty()) { - return; + std::optional next_partition() { + vx_error* err = nullptr; + vx_partition* partition = vx_scan_next(handle(), &err); + if (err) { + throw BinderException(move_vx_err(err)); + } + if (!partition) return std::nullopt; + return Partition{partition}; } - auto &bind = bind_data->Cast(); + vx_scan* handle() const noexcept { return scan.get(); } - for (auto iter = filters.begin(); iter != filters.end();) { - duckdb_vx_error error_out = nullptr; - auto pushed = - bind.info->vtab.pushdown_complex_filter(bind_data->Cast().ffi_data->DataPtr(), - reinterpret_cast(iter->get()), - &error_out); - if (error_out) { - throw BinderException(IntoErrString(error_out)); +private: + struct deleter { + void operator()(vx_scan* scan) { + vx_scan_free(scan); } + }; + std::unique_ptr scan; +}; + +namespace vortex { +struct CTableFunctionInfo final : TableFunctionInfo { + explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {} + duckdb_vx_tfunc_vtab_t vtab; +}; + +struct CTableBindData final : TableFunctionData { + CTableBindData( + unique_ptr info_p, + Session&& session, + DataSource&& data_source, + const vector& column_names) + : info(std::move(info_p)) + , session(std::move(session)) + , data_source(std::move(data_source)) + , column_names(column_names) {} + + ~CTableBindData() override = default; - // If the pushdown complex filter returns true, we can remove the filter from the list. - iter = pushed ? filters.erase(iter) : std::next(iter); + unique_ptr Copy() const override { + return make_uniq( + make_uniq(info->vtab), + Session{session}, + DataSource{data_source}, + column_names); } -} -unique_ptr c_cardinality(ClientContext & /*context*/, const FunctionData *bind_data) { - auto &bind = bind_data->Cast(); + unique_ptr info; // TODO remove + + Session session; + DataSource data_source; + + vector column_names; + vector filters; +}; + +struct CTableGlobalData final : GlobalTableFunctionState { + explicit CTableGlobalData(Scan&& scan, idx_t max_threads) + : scan(std::move(scan)), max_threads(max_threads) {} + + ~CTableGlobalData() override = default; + + idx_t MaxThreads() const override { return max_threads; } - duckdb_vx_node_statistics node_stats_out = { - .estimated_cardinality = 0, - .max_cardinality = 0, - .has_estimated_cardinality = false, - .has_max_cardinality = false, + Scan scan; + idx_t max_threads; +}; + +struct CTableLocalData final : LocalTableFunctionState { + explicit CTableLocalData() {} + std::optional batch_id; +}; + +unique_ptr c_bind( + ClientContext &context, + TableFunctionBindInput &info, + vector &types, + vector &names) +{ + if (info.inputs.size() != 1) + throw BinderException("expected single file glob parameter"); + std::string files_glob = StringValue::Get(info.inputs[0]); + + Session session; + + vx_data_source_options opts = { + // files_glob lives till end of c_bind, vx_data_source_new copies the argument + .files = files_glob.data(), + .fs_use_vortex = nullptr, + .fs_set_userdata = nullptr, + .fs_open = nullptr, + .fs_create = nullptr, + .fs_list = nullptr, + .fs_close = nullptr, + .fs_size = nullptr, + .fs_read = nullptr, + .fs_write = nullptr, + .fs_sync = nullptr, + .glob = nullptr, + .cache_init = nullptr, + .cache_free = nullptr, + .cache_get = nullptr, + .cache_put = nullptr, + .cache_delete = nullptr }; - bind.info->vtab.cardinality(bind_data->Cast().ffi_data->DataPtr(), &node_stats_out); - auto stats = make_uniq(); - stats->has_estimated_cardinality = node_stats_out.has_estimated_cardinality; - stats->estimated_cardinality = node_stats_out.estimated_cardinality; - stats->has_max_cardinality = node_stats_out.has_max_cardinality; - stats->max_cardinality = node_stats_out.max_cardinality; + DataSource data_source{session, opts}; - return stats; -} + const vx_dtype* dtype = vx_data_source_dtype(data_source.handle()); + const vx_struct_fields* struct_dtype = vx_dtype_struct_dtype(dtype); -extern "C" size_t duckdb_vx_tfunc_bind_input_get_parameter_count(duckdb_vx_tfunc_bind_input ffi_input) { - if (!ffi_input) { - return 0; + for (uint64_t i = 0; i < vx_struct_fields_nfields(struct_dtype); ++i) { + const vx_string* field_name = vx_struct_fields_field_name(struct_dtype, i); + const vx_dtype* field_dtype = vx_struct_fields_field_dtype(struct_dtype, i); + if (!field_dtype) + throw BinderException(StringUtil::Format( + "Field dtype %s at index %d can't be parsed", + to_string(field_name), i)); + names.emplace_back(to_string(field_name)); + types.emplace_back(from_dtype(field_dtype)); } - const auto input = reinterpret_cast(ffi_input); - return input->inputs.size(); -} -extern "C" duckdb_value duckdb_vx_tfunc_bind_input_get_parameter(duckdb_vx_tfunc_bind_input ffi_input, - size_t index) { - if (!ffi_input || index >= duckdb_vx_tfunc_bind_input_get_parameter_count(ffi_input)) { - return nullptr; - } - const auto info = reinterpret_cast(ffi_input); - return reinterpret_cast(new Value(info->inputs[index])); + auto& vtab = info.table_function.function_info->Cast().vtab; + return make_uniq( + make_uniq(vtab), + std::move(session), + std::move(data_source), + names); } -extern "C" duckdb_value duckdb_vx_tfunc_bind_input_get_named_parameter(duckdb_vx_tfunc_bind_input ffi_input, - const char *name) { - if (!ffi_input || !name) { - return nullptr; +const vx_expression* make_projection( + const vector& column_ids, + const vector& projection_ids, + const vector& column_names +) { + vector projected_names; + projected_names.reserve(column_names.size()); + + if (projection_ids.empty()) { + for (column_t id : column_ids) { + if (id == COLUMN_IDENTIFIER_EMPTY) continue; + if (column_names.size() < id) + throw InvalidInputException(StringUtil::Format( + "Expected column id %d but there are %d columns", + id, column_names.size())); + // column_names[id] lives till end of projection(). Initialized + // vx_expression copies the buffer, so it's safe to use data() + projected_names.emplace_back(column_names[id].data()); + } + } else { + for (idx_t projection_id : projection_ids) { + if (column_ids.size() < projection_id) + throw InvalidInputException(StringUtil::Format( + "Expected projection id %d but there are %d columns", + projection_id, column_ids.size())); + column_t id = column_ids[projection_id]; + if (column_names.size() < id) + throw InvalidInputException(StringUtil::Format( + "Expected column id %d but there are %d columns", + id, column_names.size())); + projected_names.emplace_back(column_names[id].data()); + } } - const auto info = reinterpret_cast(ffi_input); - if (!info->named_parameters.contains(name)) { - return nullptr; - } - auto value = duckdb::make_uniq(info->named_parameters.at(name)); - return reinterpret_cast(value.release()); + vx_expression* root = vx_expression_root(); + // TODO vx_expression may take a string vx_array + const vx_expression* expr = vx_expression_select( + projected_names.data(), + projected_names.size(), + root); + vx_expression_free(root); + return expr; } -extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_result ffi_result, - const char *name_str, - size_t name_len, - duckdb_logical_type ffi_type) { - if (!name_str || !ffi_type) { - return; - } - const auto result = reinterpret_cast(ffi_result); - const auto logical_type = reinterpret_cast(ffi_type); - - result->names.emplace_back(name_str, name_len); - result->return_types.emplace_back(*logical_type); +const vx_expression* make_filter( + optional_ptr table_filters, + const vector& column_ids, + const vector& column_names, + const vector& additional_filters, + const vx_dtype* dtype +) { + return nullptr; } -virtual_column_map_t c_get_virtual_columns(ClientContext & /*context*/, - optional_ptr bind_data) { - auto &bind = bind_data->Cast(); +unique_ptr c_init_global(ClientContext &, TableFunctionInitInput &input) { + const auto &bind = input.bind_data->Cast(); - auto result = virtual_column_map_t(); - bind.info->vtab.get_virtual_columns(bind_data->Cast().ffi_data->DataPtr(), - reinterpret_cast(&result)); - return result; -} + vx_scan_selection selection = { + .idx = nullptr, + .idx_len = 0, + .include = VX_S_INCLUDE_ALL, + }; -extern "C" void duckdb_vx_tfunc_virtual_columns_push(duckdb_vx_tfunc_virtual_cols_result ffi_result, - idx_t column_idx, - const char *name_str, - size_t name_len, - duckdb_logical_type ffi_type) { - if (!ffi_result || !name_str || !ffi_type) { - return; - } + const vx_dtype* dtype = vx_data_source_dtype(bind.data_source.handle()); + const vx_expression* projection = make_projection( + input.column_ids, input.projection_ids, bind.column_names); + const vx_expression* filter = make_filter(input.filters, + input.column_ids, bind.column_names, bind.filters, dtype); + + vx_scan_options options = { + .projection = projection, + .filter = filter, + .row_range_begin = 0, + .row_range_end = 0, + .selection = selection, + .limit = 0, + .ordered = 0, + }; - auto result = reinterpret_cast(ffi_result); - const auto logical_type = reinterpret_cast(ffi_type); - const auto name = string(name_str, name_len); + Scan scan{bind.data_source, options}; + return make_uniq(std::move(scan), bind.info->vtab.max_threads); +} - auto table_col = TableColumn(std::move(name), *logical_type); - result->emplace(column_idx, std::move(table_col)); +unique_ptr c_init_local(ExecutionContext &, + TableFunctionInitInput &, + GlobalTableFunctionState*) { + return make_uniq(); } -OperatorPartitionData c_get_partition_data(ClientContext & /*context*/, - TableFunctionGetPartitionInput &input) { - if (input.partition_info.RequiresPartitionColumns()) { - throw InternalException("TableScan::GetPartitionData: partition columns not supported"); +void c_function(ClientContext &context, TableFunctionInput &input, DataChunk &output) { + auto& global_state = input.global_state->Cast(); + auto& batch_id = input.local_state->Cast().batch_id; + + auto next_partition = global_state.scan.next_partition(); + if (!next_partition) return; + Partition partition = std::move(*next_partition); + + std::optional array; + auto export_array = input.bind_data->Cast().info->vtab.export_array; + while ((array = partition.next_array())) { + uint64_t export_res = export_array( + array->handle(), + reinterpret_cast(&output)); + if (export_res == std::numeric_limits::max()) + batch_id = std::nullopt; + else + batch_id = export_res; } - auto &bind = input.bind_data->Cast(); - auto &global = input.global_state->Cast(); - auto &local = input.local_state->Cast(); +} + +const vx_expression* from_filter(const Expression& filter) { + return nullptr; +} - duckdb_vx_error error_out = nullptr; - auto index = bind.info->vtab.get_partition_data(bind.ffi_data->DataPtr(), - global.ffi_data->DataPtr(), - local.ffi_data->DataPtr(), - &error_out); - if (error_out) { - throw InvalidInputException(IntoErrString(error_out)); +void c_pushdown_complex_filter(ClientContext &, LogicalGet &, FunctionData *bind_data, vector> &filters) { + auto &bind = bind_data->Cast(); + // We don't erase filters, see Nick's comment in datasource.rs + for (auto iter = filters.begin(); iter != filters.end(); ++iter) { + bind.filters.emplace_back(from_filter(**iter)); } - return OperatorPartitionData(index); } -InsertionOrderPreservingMap c_to_string(TableFunctionToStringInput &input) { - InsertionOrderPreservingMap result; - auto &bind = input.bind_data->Cast(); - - // Call the Rust side to get custom string representation if available - if (bind.info->vtab.to_string) { - auto map = bind.info->vtab.to_string(bind.ffi_data->DataPtr()); - if (map) { - // Copy the map contents to the result - auto *cpp_map = reinterpret_cast *>(map); - for (const auto &[key, value] : *cpp_map) { - result[key] = value; - } - // Free the map allocated by Rust - duckdb_vx_string_map_free(map); +unique_ptr c_cardinality(ClientContext &, const FunctionData *bind_data) { + auto stats = make_uniq(); + vx_data_source_row_count rc = bind_data->Cast().data_source.row_count(); + switch (rc.cardinality) { + case VX_CARD_ESTIMATE: { + stats->has_estimated_cardinality = true; + stats->has_max_cardinality = false; + stats->estimated_cardinality = rc.rows; + return stats; + } + case VX_CARD_MAXIMUM: { + stats->has_estimated_cardinality = true; + stats->has_max_cardinality = true; + stats->estimated_cardinality = rc.rows; + stats->max_cardinality = rc.rows; + return stats; + } + default: { + stats->has_estimated_cardinality = false; + stats->has_max_cardinality = false; + return stats; } } +} - return result; +OperatorPartitionData c_get_partition_data(ClientContext&, + TableFunctionGetPartitionInput &input) { + if (input.partition_info.RequiresPartitionColumns()) + throw InternalException("TableScan::GetPartitionData: partition columns not supported"); + if (auto &batch_id = input.local_state->Cast().batch_id; batch_id) + return OperatorPartitionData(*batch_id); + throw InvalidInputException("Batch id missing, no batches exported"); } -extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const duckdb_vx_tfunc_vtab_t *vtab) { +extern "C" duckdb_state duckdb_vx_tfunc_register( + duckdb_database ffi_db, + const duckdb_vx_tfunc_vtab_t *vtab +) { if (!ffi_db || !vtab) { return DuckDBError; } @@ -353,30 +543,50 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d tf.pushdown_complex_filter = c_pushdown_complex_filter; - tf.projection_pushdown = vtab->projection_pushdown; - tf.filter_pushdown = vtab->filter_pushdown; - tf.filter_prune = vtab->filter_prune; - tf.sampling_pushdown = vtab->sampling_pushdown; - tf.late_materialization = vtab->late_materialization; + //tf.projection_pushdown = vtab->projection_pushdown; + //tf.filter_pushdown = vtab->filter_pushdown; + //tf.filter_prune = vtab->filter_prune; + //tf.sampling_pushdown = vtab->sampling_pushdown; + //tf.late_materialization = vtab->late_materialization; + + tf.projection_pushdown = true; + tf.filter_pushdown = false; + tf.filter_prune = false; + tf.sampling_pushdown = false; + tf.late_materialization = false; + tf.cardinality = c_cardinality; tf.get_partition_data = c_get_partition_data; - tf.get_virtual_columns = c_get_virtual_columns; - tf.to_string = c_to_string; - tf.table_scan_progress = c_table_scan_progress; - // Set up the parameters + tf.to_string = [](TableFunctionToStringInput&) { + InsertionOrderPreservingMap result; + result.insert("Function", "Vortex Scan"); + // TODO filters + return result; + }; + + tf.get_virtual_columns = [](auto&, auto) { + virtual_column_map_t map = { + {COLUMN_IDENTIFIER_EMPTY, TableColumn{"", LogicalType::BOOLEAN}} + }; + return map; + }; + + tf.table_scan_progress = [](auto&, auto*, const GlobalTableFunctionState* state) { + return state->Cast().scan.progress(); + }; + tf.arguments.reserve(vtab->parameter_count); for (size_t i = 0; i < vtab->parameter_count; i++) { auto logical_type = reinterpret_cast(vtab->parameters[i]); tf.arguments.emplace_back(*logical_type); } - // And the named parameters + for (size_t i = 0; i < vtab->named_parameter_count; i++) { auto logical_type = reinterpret_cast(vtab->named_parameter_types[i]); tf.named_parameters.emplace(vtab->named_parameter_names[i], *logical_type); } - // Assign the VTable to the function info so we can access it later to invoke the callbacks. tf.function_info = make_shared_ptr(*vtab); try { @@ -390,6 +600,10 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d return DuckDBSuccess; } +// +// TODO this stuff should be removed +// + extern "C" duckdb_vx_string_map duckdb_vx_string_map_create() { auto map = new InsertionOrderPreservingMap(); return reinterpret_cast(map); @@ -410,4 +624,60 @@ extern "C" void duckdb_vx_string_map_free(duckdb_vx_string_map map) { auto *cpp_map = reinterpret_cast *>(map); delete cpp_map; } + +extern "C" void duckdb_vx_tfunc_virtual_columns_push(duckdb_vx_tfunc_virtual_cols_result ffi_result, + idx_t column_idx, + const char *name_str, + size_t name_len, + duckdb_logical_type ffi_type) { + if (!ffi_result || !name_str || !ffi_type) { + return; + } + + auto result = reinterpret_cast(ffi_result); + const auto logical_type = reinterpret_cast(ffi_type); + const auto name = string(name_str, name_len); + + auto table_col = TableColumn(std::move(name), *logical_type); + result->emplace(column_idx, std::move(table_col)); +} + +extern "C" size_t duckdb_vx_tfunc_bind_input_get_parameter_count(duckdb_vx_tfunc_bind_input ffi_input) { + if (!ffi_input) { + return 0; + } + const auto input = reinterpret_cast(ffi_input); + return input->inputs.size(); +} + +extern "C" duckdb_value duckdb_vx_tfunc_bind_input_get_parameter(duckdb_vx_tfunc_bind_input ffi_input, + size_t index) { + if (!ffi_input || index >= duckdb_vx_tfunc_bind_input_get_parameter_count(ffi_input)) { + return nullptr; + } + const auto info = reinterpret_cast(ffi_input); + return reinterpret_cast(new Value(info->inputs[index])); +} + +extern "C" duckdb_value duckdb_vx_tfunc_bind_input_get_named_parameter(duckdb_vx_tfunc_bind_input ffi_input, + const char *name) { + if (!ffi_input || !name) { + return nullptr; + } + const auto info = reinterpret_cast(ffi_input); + + if (!info->named_parameters.contains(name)) { + return nullptr; + } + auto value = duckdb::make_uniq(info->named_parameters.at(name)); + return reinterpret_cast(value.release()); +} + +extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_result ffi_result, + const char *name_str, + size_t name_len, + duckdb_logical_type ffi_type) { + return; +} + } // namespace vortex diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 9e4d39c677d..dcdc43e11a8 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -320,6 +320,10 @@ impl TableFunction for T { global_state: &Self::GlobalState, chunk: &mut DataChunkRef, ) -> VortexResult<()> { + // TODO create exporter for every chunk, use global + // variable to keep track of exporter till it fully + // consumes generated array + loop { if local_state.exporter.is_none() { let mut ctx = SESSION.create_execution_ctx(); @@ -406,14 +410,6 @@ impl TableFunction for T { Ok(false) } - fn cardinality(bind_data: &Self::BindData) -> Cardinality { - match bind_data.data_source.row_count() { - Some(Precision::Exact(v)) => Cardinality::Maximum(v), - Some(Precision::Inexact(v)) => Cardinality::Estimate(v), - None => Cardinality::Unknown, - } - } - fn partition_data( _bind_data: &Self::BindData, _global_init_data: &Self::GlobalState, diff --git a/vortex-duckdb/src/duckdb/table_function/export_array.rs b/vortex-duckdb/src/duckdb/table_function/export_array.rs new file mode 100644 index 00000000000..69ecef6156f --- /dev/null +++ b/vortex-duckdb/src/duckdb/table_function/export_array.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use vortex::array::Canonical; +use vortex::array::DynArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::ScalarFnVTable; +use vortex::array::arrays::Struct; +use vortex::array::arrays::StructArray; +use vortex::array::optimizer::ArrayOptimizer; +use vortex::error::VortexExpect; +use vortex::scalar_fn::fns::pack::Pack; + +use crate::SESSION; +use crate::cpp::duckdb_data_chunk; +use crate::duckdb::DataChunk; +use crate::duckdb::TableFunction; +use crate::exporter::ArrayExporter; +use crate::exporter::ConversionCache; + +// TODO In the original implementation, exporter is initialized in the +// local state, and conversion cache is scoped per partition. + +pub(crate) unsafe extern "C-unwind" fn export_array_callback( + array: *const crate::cpp::vx_array, + chunk: duckdb_data_chunk, +) -> u64 { + let chunk = unsafe { DataChunk::borrow_mut(chunk) }; + let mut batch_id = u64::MAX; + if array.is_null() { + return batch_id; + } + let array_result: Arc = + vortex_ffi::vx_array::as_ref(array as *const vortex_ffi::vx_array).clone(); + + // TODO this will produce incorrect results as exporter may export + // multiple data chunks. This exporter exports only one data chunk + + let conversion_cache = ConversionCache::default(); + + let mut ctx = SESSION.create_execution_ctx(); + let array_result = array_result + .optimize_recursive() + .vortex_expect("failed to optimize array"); + + let array_result = if let Some(array) = array_result.as_opt::() { + array.clone() + } else if let Some(array) = array_result.as_opt::() + && let Some(pack_options) = array.scalar_fn().as_opt::() + { + StructArray::new( + pack_options.names.clone(), + array.children(), + array.len(), + pack_options.nullability.into(), + ) + } else { + array_result + .execute::(&mut ctx) + .vortex_expect("failed to canonicalize array") + .into_struct() + }; + + let mut exporter = ArrayExporter::try_new(&array_result, &conversion_cache, ctx) + .vortex_expect("failed to initialize array exporter"); + + // Relaxed since there is no intra-instruction ordering required. + //batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); + batch_id += 1; + + let has_more_data = exporter + .export(chunk) + .vortex_expect("failed to export chunk"); + //global_state + // .bytes_read + // .fetch_add(chunk.len(), Ordering::Relaxed); + + if !has_more_data { + // This exporter is fully consumed. + //EXPORTER = None; + batch_id = u64::MAX; + } + + assert!(!chunk.is_empty()); + batch_id +} diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index f20e844d381..ec16cfc9b97 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -11,10 +11,10 @@ use vortex::error::VortexExpect; use vortex::error::VortexResult; mod bind; mod cardinality; +mod export_array; mod init; mod partition; mod pushdown_complex_filter; -mod table_scan_progress; mod virtual_columns; pub use bind::*; @@ -32,9 +32,9 @@ use crate::duckdb::client_context::ClientContextRef; use crate::duckdb::data_chunk::DataChunkRef; use crate::duckdb::expr::ExpressionRef; use crate::duckdb::table_function::cardinality::cardinality_callback; +use crate::duckdb::table_function::export_array::export_array_callback; use crate::duckdb::table_function::partition::get_partition_data_callback; use crate::duckdb::table_function::pushdown_complex_filter::pushdown_complex_filter_callback; -use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback; use crate::duckdb::table_function::virtual_columns::get_virtual_columns_callback; use crate::duckdb_try; @@ -194,7 +194,6 @@ impl DatabaseRef { pushdown_expression: ptr::null_mut::(), get_virtual_columns: Some(get_virtual_columns_callback::), to_string: Some(to_string_callback::), - table_scan_progress: Some(table_scan_progress_callback::), get_partition_data: Some(get_partition_data_callback::), projection_pushdown: T::PROJECTION_PUSHDOWN, filter_pushdown: T::FILTER_PUSHDOWN, @@ -202,6 +201,7 @@ impl DatabaseRef { sampling_pushdown: false, late_materialization: false, max_threads: T::MAX_THREADS, + export_array: Some(export_array_callback::), }; duckdb_try!( diff --git a/vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs b/vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs deleted file mode 100644 index cfe3dd43b45..00000000000 --- a/vortex-duckdb/src/duckdb/table_function/table_scan_progress.rs +++ /dev/null @@ -1,19 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use vortex::error::VortexExpect; - -use crate::duckdb::TableFunction; - -pub(crate) unsafe extern "C-unwind" fn table_scan_progress_callback( - ctx: crate::cpp::duckdb_client_context, - bind_data: *mut ::std::os::raw::c_void, - global_state: *mut ::std::os::raw::c_void, -) -> f64 { - let ctx = unsafe { crate::duckdb::ClientContext::borrow(ctx) }; - let bind_data = - unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); - let global_state = unsafe { global_state.cast::().as_ref() } - .vortex_expect("global_init_data null pointer"); - T::table_scan_progress(ctx, bind_data, global_state) -} diff --git a/vortex-ffi/Cargo.toml b/vortex-ffi/Cargo.toml index 377e1125dc1..949cbb0a1d5 100644 --- a/vortex-ffi/Cargo.toml +++ b/vortex-ffi/Cargo.toml @@ -2,8 +2,8 @@ name = "vortex-ffi" description = "Native C FFI bindings for Vortex" readme = "README.md" -# This crate is not meant to be consumed by other Rust code but rather produces a static binary -# that can be linked to by other languages. +# This crate is not meant to be consumed by Rust code outside of Vortex but rather +# producee a static binary that can be linked to by other languages. publish = false version = { workspace = true } homepage = { workspace = true } @@ -31,6 +31,7 @@ tracing = { workspace = true, features = ["std", "log"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } url = { workspace = true, features = [] } vortex = { workspace = true, features = ["object_store"] } +arrow-array = { workspace = true, features = ["ffi"] } [dev-dependencies] tempfile = { workspace = true } @@ -44,7 +45,8 @@ mimalloc = ["dep:mimalloc"] [lib] name = "vortex_ffi" -crate-type = ["staticlib", "cdylib"] +# TODO lib for vortex-duckdb exporter hack +crate-type = ["lib", "staticlib", "cdylib"] [lints] workspace = true diff --git a/vortex-ffi/build.rs b/vortex-ffi/build.rs index 8c14001790f..223a3ab9073 100644 --- a/vortex-ffi/build.rs +++ b/vortex-ffi/build.rs @@ -7,9 +7,28 @@ use std::env; use std::path::PathBuf; use std::process::Command; +const SOURCE_FILES: [&str; 15] = [ + "array.rs", + "array_iterator.rs", + "binary.rs", + "dtype.rs", + "error.rs", + "file.rs", + "lib.rs", + "log.rs", + "macros.rs", + "ptype.rs", + "scan.rs", + "session.rs", + "sink.rs", + "string.rs", + "struct_fields.rs", +]; + fn main() { - // Set up dependency tracking - println!("cargo:rerun-if-changed=src/"); + for f in SOURCE_FILES { + println!("cargo:rerun-if-changed=src/{f}"); + } println!("cargo:rerun-if-changed=cbindgen.toml"); println!("cargo:rerun-if-changed=Cargo.toml"); println!("cargo:rerun-if-changed=build.rs"); @@ -43,6 +62,7 @@ fn main() { let result = cbindgen::Builder::new() .with_crate(&crate_dir) .with_config(config) + .with_documentation(true) .generate(); match result { diff --git a/vortex-ffi/cinclude/vortex.h b/vortex-ffi/cinclude/vortex.h index fc28a952fdc..529b83ad360 100644 --- a/vortex-ffi/cinclude/vortex.h +++ b/vortex-ffi/cinclude/vortex.h @@ -18,6 +18,12 @@ */ #define BinaryView_MAX_INLINED_SIZE 12 +typedef enum { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} vx_cardinality; + /** * Variant enum for Vortex primitive types. */ @@ -144,6 +150,18 @@ typedef enum { LOG_LEVEL_TRACE = 5, } vx_log_level; +typedef enum { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} vx_scan_selection_include; + +typedef enum { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} vx_estimate_boundary; + /** * Physical type enum, represents the in-memory physical layout but might represent a different logical type. */ @@ -235,6 +253,8 @@ typedef struct Nullability Nullability; typedef struct Primitive Primitive; +typedef struct VxFileHandle VxFileHandle; + /** * Base type for all Vortex arrays. * @@ -284,6 +304,14 @@ typedef struct vx_array_sink vx_array_sink; */ typedef struct vx_binary vx_binary; +/** + * A data source is a reference to multiple possibly remote files. When + * created, it opens first file to determine the schema from DType, all + * other operations are deferred till a scan is requested. You can request + * multiple file scans from a data source + */ +typedef struct vx_data_source vx_data_source; + /** * A Vortex data type. * @@ -297,11 +325,23 @@ typedef struct vx_dtype vx_dtype; */ typedef struct vx_error vx_error; +typedef struct vx_expression vx_expression; + /** * A handle to a Vortex file encapsulating the footer and logic for instantiating a reader. */ typedef struct vx_file vx_file; +/** + * A partition is a contiguous chunk of memory from which you can + * interatively get vx_arrays. + * TODO We're going away from exposing partitions to user, revise + * design + */ +typedef struct vx_partition vx_partition; + +typedef struct vx_scan vx_scan; + /** * A handle to a Vortex session. */ @@ -322,6 +362,88 @@ typedef struct vx_struct_fields vx_struct_fields; */ typedef struct vx_struct_fields_builder vx_struct_fields_builder; +typedef int (*vx_fs_use_vortex)(const char *schema, const char *path); + +typedef void (*vx_fs_set_userdata)(void *userdata); + +typedef vx_error **vx_error_out; + +typedef void (*vx_fs_open)(void *userdata, const char *path, vx_error_out err); + +typedef void (*vx_fs_create)(void *userdata, const char *path, vx_error_out err); + +typedef void (*vx_list_callback)(void *userdata, const char *name, int is_dir); + +typedef void (*vx_fs_list)(const void *userdata, + const char *path, + vx_list_callback callback, + vx_error_out error); + +typedef const VxFileHandle *vx_file_handle; + +typedef void (*vx_fs_close)(vx_file_handle handle); + +typedef uint64_t (*vx_fs_size)(vx_file_handle handle, vx_error_out err); + +typedef uint64_t ( + *vx_fs_read)(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, vx_error_out err); + +typedef uint64_t ( + *vx_fs_write)(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, vx_error_out err); + +typedef void (*vx_fs_sync)(vx_file_handle handle, vx_error_out err); + +typedef void (*vx_glob_callback)(void *userdata, const char *file); + +typedef void (*vx_glob)(const char *glob, vx_glob_callback callback, vx_error_out err); + +typedef void *vx_cache; + +typedef vx_cache (*vx_cache_init)(vx_error_out err); + +typedef void (*vx_cache_free)(vx_cache cache, vx_error_out err); + +typedef const char *vx_cache_key; + +typedef void (*vx_cache_get)(vx_cache cache, vx_cache_key key, void **value, vx_error_out err); + +typedef void (*vx_cache_put)(vx_cache cache, vx_cache_key key, void *value, vx_error_out err); + +typedef void (*vx_cache_delete)(vx_cache cache, vx_cache_key key, vx_error_out err); + +/** + * Host must either implement all or none of fs_* callbacks. + */ +typedef struct { + const char *files; + /** + * Whether to use Vortex filesystem or host's filesystem. + * Return 1 to use Vortex for a given schema ("file", "s3") and path. + * Return 0 to use host's filesystem. + */ + vx_fs_use_vortex fs_use_vortex; + vx_fs_set_userdata fs_set_userdata; + vx_fs_open fs_open; + vx_fs_create fs_create; + vx_fs_list fs_list; + vx_fs_close fs_close; + vx_fs_size fs_size; + vx_fs_read fs_read; + vx_fs_write fs_write; + vx_fs_sync fs_sync; + vx_glob glob; + vx_cache_init cache_init; + vx_cache_free cache_free; + vx_cache_get cache_get; + vx_cache_put cache_put; + vx_cache_delete cache_delete; +} vx_data_source_options; + +typedef struct { + vx_cardinality cardinality; + uint64_t rows; +} vx_data_source_row_count; + /** * Options supplied for opening a file. */ @@ -384,6 +506,27 @@ typedef struct { unsigned long row_offset; } vx_file_scan_options; +typedef struct { + uint64_t *idx; + size_t idx_len; + vx_scan_selection_include include; +} vx_scan_selection; + +typedef struct { + const vx_expression *projection; + const vx_expression *filter; + uint64_t row_range_begin; + uint64_t row_range_end; + vx_scan_selection selection; + uint64_t limit; + int ordered; +} vx_scan_options; + +typedef struct { + uint64_t estimate; + vx_estimate_boundary boundary; +} vx_estimate; + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -521,6 +664,29 @@ size_t vx_binary_len(const vx_binary *ptr); */ const char *vx_binary_ptr(const vx_binary *ptr); +/** + * Clone a borrowed [`vx_data_source`], returning an owned [`vx_data_source`]. + * + * + * Must be released with [`vx_data_source_free`]. + */ +const vx_data_source *vx_data_source_clone(const vx_data_source *ptr); + +/** + * Free an owned [`vx_data_source`] object. + */ +void vx_data_source_free(const vx_data_source *ptr); + +/** + * Create a new owned datasource which must be freed by the caller + */ +const vx_data_source * +vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); + +const vx_dtype *vx_data_source_dtype(const vx_data_source *ds); + +void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); + /** * Clone a borrowed [`vx_dtype`], returning an owned [`vx_dtype`]. * @@ -664,6 +830,8 @@ uint8_t vx_dtype_time_unit(const DType *dtype); */ const vx_string *vx_dtype_time_zone(const DType *dtype); +void vx_type_to_arrow_schema(const vx_dtype *_dtype, FFI_ArrowSchema *_schema, vx_error_out _err); + /** * Free an owned [`vx_error`] object. */ @@ -677,6 +845,15 @@ void vx_error_free(vx_error *ptr); */ const vx_string *vx_error_get_message(const vx_error *error); +/** + * Free an owned [`vx_expression`] object. + */ +void vx_expression_free(vx_expression *ptr); + +vx_expression *vx_expression_root(void); + +vx_expression *vx_expression_select(const char *const *names, size_t names_len, const vx_expression *child); + /** * Clone a borrowed [`vx_file`], returning an owned [`vx_file`]. * @@ -735,6 +912,47 @@ vx_array_iterator *vx_file_scan(const vx_session *session, */ void vx_set_log_level(vx_log_level level); +/** + * Free an owned [`vx_scan`] object. + */ +void vx_scan_free(vx_scan *ptr); + +/** + * Free an owned [`vx_partition`] object. + */ +void vx_partition_free(vx_partition *ptr); + +vx_scan * +vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); + +void vx_scan_partition_count(const vx_scan *scan, vx_estimate *count, vx_error_out err); + +/** + * Get next owned partition out of a scan request. + * Caller must free this partition using vx_partition_free. + * This method is thread-safe. + * If using in a sync multi-thread runtime, users are encouraged to create a + * worker thread per partition. + * Returns NULL and doesn't set err on exhaustion. + * Returns NULL and sets err on error. + */ +vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); + +void vx_partition_row_count(const vx_partition *partition, vx_estimate *count, vx_error_out err); + +void vx_partition_scan_arrow(const vx_partition *_partition, FFI_ArrowArrayStream *_stream, vx_error_out err); + +/** + * Get next vx_array out of this partition. + * Thread-unsafe. + */ +const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); + +/** + * Scan progress between 0.0 and 1.0 + */ +double vx_scan_progress(const vx_scan *_scan); + /** * Free an owned [`vx_session`] object. */ @@ -747,6 +965,13 @@ void vx_session_free(vx_session *ptr); */ vx_session *vx_session_new(void); +/** + * Clone a Vortex session, returning an owned copy. + * + * The caller is responsible for freeing the session with [`vx_session_free`]. + */ +vx_session *vx_session_clone(vx_session *session); + /** * Opens a writable array stream, where sink is used to push values into the stream. * To close the stream close the sink with `vx_array_sink_close`. diff --git a/vortex-ffi/src/data_source.rs b/vortex-ffi/src/data_source.rs new file mode 100644 index 00000000000..2cbe897eb9d --- /dev/null +++ b/vortex-ffi/src/data_source.rs @@ -0,0 +1,215 @@ +#![allow(non_camel_case_types)] + +use std::ffi::c_char; +use std::ffi::c_int; +use std::ffi::c_void; +use std::sync::Arc; + +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::expr::stats::Precision::Exact; +use vortex::expr::stats::Precision::Inexact; +use vortex::file::multi::MultiFileDataSource; +use vortex::io::runtime::BlockingRuntime; +use vortex::scan::api::DataSource; +use vortex::scan::api::DataSourceRef; + +use crate::RUNTIME; +use crate::dtype::vx_dtype; +use crate::error::try_or_default; +use crate::error::vx_error_out; +use crate::session::vx_session; +use crate::to_string; + +crate::arc_dyn_wrapper!( + /// A data source is a reference to multiple possibly remote files. When + /// created, it opens first file to determine the schema from DType, all + /// other operations are deferred till a scan is requested. You can request + /// multiple file scans from a data source + dyn DataSource, + vx_data_source); + +pub struct VxFileHandle; +pub type vx_file_handle = *const VxFileHandle; + +pub type vx_list_callback = + Option; +pub type vx_glob_callback = + Option; + +pub type vx_fs_use_vortex = + Option c_int>; +pub type vx_fs_set_userdata = Option; + +pub type vx_fs_open = + Option; +pub type vx_fs_create = + Option; + +pub type vx_fs_list = Option< + unsafe extern "C" fn( + userdata: *const c_void, + path: *const c_char, + callback: vx_list_callback, + error: vx_error_out, + ), +>; + +pub type vx_fs_close = Option; +pub type vx_fs_size = + Option u64>; + +pub type vx_fs_read = Option< + unsafe extern "C" fn( + handle: vx_file_handle, + offset: u64, + len: usize, + buffer: *mut u8, + err: vx_error_out, + ) -> u64, +>; + +pub type vx_fs_write = Option< + unsafe extern "C" fn( + handle: vx_file_handle, + offset: u64, + len: usize, + buffer: *mut u8, + err: vx_error_out, + ) -> u64, +>; + +pub type vx_fs_sync = Option; + +pub type vx_glob = Option< + unsafe extern "C" fn(glob: *const c_char, callback: vx_glob_callback, err: vx_error_out), +>; + +pub type vx_cache = *mut c_void; +pub type vx_cache_key = *const c_char; + +pub type vx_cache_init = Option vx_cache>; +pub type vx_cache_free = Option; +pub type vx_cache_get = Option< + unsafe extern "C" fn( + cache: vx_cache, + key: vx_cache_key, + value: *mut *mut c_void, + err: vx_error_out, + ), +>; +pub type vx_cache_put = Option< + unsafe extern "C" fn(cache: vx_cache, key: vx_cache_key, value: *mut c_void, err: vx_error_out), +>; +pub type vx_cache_delete = + Option; + +#[repr(C)] +/// Host must either implement all or none of fs_* callbacks. +pub struct vx_data_source_options { + // TODO what if the program wants to read a Vortex file from an existing buffer? + files: *const c_char, + + /// Whether to use Vortex filesystem or host's filesystem. + /// Return 1 to use Vortex for a given schema ("file", "s3") and path. + /// Return 0 to use host's filesystem. + fs_use_vortex: vx_fs_use_vortex, + fs_set_userdata: vx_fs_set_userdata, + fs_open: vx_fs_open, + fs_create: vx_fs_create, + fs_list: vx_fs_list, + fs_close: vx_fs_close, + fs_size: vx_fs_size, + fs_read: vx_fs_read, + fs_write: vx_fs_write, + fs_sync: vx_fs_sync, + + glob: vx_glob, + + cache_init: vx_cache_init, + cache_free: vx_cache_free, + cache_get: vx_cache_get, + cache_put: vx_cache_put, + cache_delete: vx_cache_delete, +} + +unsafe fn data_source_new( + session: *const vx_session, + opts: *const vx_data_source_options, +) -> VortexResult<*const vx_data_source> { + if session.is_null() { + vortex_bail!("empty session"); + } + let session = vx_session::as_ref(session).clone(); + + if opts.is_null() { + vortex_bail!("empty opts"); + } + let opts = unsafe { &*opts }; + + if opts.files.is_null() { + vortex_bail!("empty opts.files"); + } + let glob = unsafe { to_string(opts.files) }; + + RUNTIME.block_on(async { + let data_source = MultiFileDataSource::new(session) + //.with_filesystem(fs) + .with_glob(glob) + .build() + .await?; + Ok(vx_data_source::new(Arc::new(data_source) as DataSourceRef)) + }) +} + +/// Create a new owned datasource which must be freed by the caller +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_data_source_new( + session: *const vx_session, + opts: *const vx_data_source_options, + err: vx_error_out, +) -> *const vx_data_source { + try_or_default(err, || unsafe { data_source_new(session, opts) }) +} + +#[unsafe(no_mangle)] +// Create a non-owned dtype referencing dataframe. +// This dtype's lifetime is bound to underlying data source. +// Caller should not free this dtype manually +pub unsafe extern "C-unwind" fn vx_data_source_dtype(ds: *const vx_data_source) -> *const vx_dtype { + vx_dtype::new_ref(vx_data_source::as_ref(ds).dtype()) +} + +#[repr(C)] +enum vx_cardinality { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} + +#[repr(C)] +pub struct vx_data_source_row_count { + cardinality: vx_cardinality, + rows: u64, +} + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_data_source_get_row_count( + ds: *const vx_data_source, + rc: *mut vx_data_source_row_count, +) { + let rc = unsafe { &mut *rc }; + match vx_data_source::as_ref(ds).row_count() { + Some(Exact(rows)) => { + rc.cardinality = vx_cardinality::VX_CARD_MAXIMUM; + rc.rows = rows; + } + Some(Inexact(rows)) => { + rc.cardinality = vx_cardinality::VX_CARD_ESTIMATE; + rc.rows = rows; + } + None => { + rc.cardinality = vx_cardinality::VX_CARD_UNKNOWN; + } + } +} diff --git a/vortex-ffi/src/dtype.rs b/vortex-ffi/src/dtype.rs index a05a53459ca..98a52f59444 100644 --- a/vortex-ffi/src/dtype.rs +++ b/vortex-ffi/src/dtype.rs @@ -4,6 +4,7 @@ use std::ptr; use std::sync::Arc; +use arrow_array::ffi::FFI_ArrowSchema; use vortex::dtype::DType; use vortex::dtype::DecimalDType; use vortex::error::VortexExpect; @@ -14,6 +15,7 @@ use vortex::extension::datetime::Time; use vortex::extension::datetime::Timestamp; use crate::arc_wrapper; +use crate::error::vx_error_out; use crate::ptype::vx_ptype; use crate::string::vx_string; use crate::struct_fields::vx_struct_fields; @@ -324,6 +326,15 @@ pub unsafe extern "C-unwind" fn vx_dtype_time_zone(dtype: *const DType) -> *cons } } +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_type_to_arrow_schema( + _dtype: *const vx_dtype, + _schema: *mut FFI_ArrowSchema, + _err: vx_error_out, +) { + todo!(); +} + #[cfg(test)] #[allow(clippy::cast_possible_truncation)] mod tests { diff --git a/vortex-ffi/src/error.rs b/vortex-ffi/src/error.rs index 53c91bca7eb..372a4a07775 100644 --- a/vortex-ffi/src/error.rs +++ b/vortex-ffi/src/error.rs @@ -19,9 +19,19 @@ box_wrapper!( vx_error ); +#[allow(non_camel_case_types)] +pub type vx_error_out = *mut *mut vx_error; + +pub(crate) fn write_error(error_out: vx_error_out, error_string: &str) { + let err = vx_error::new(Box::new(VortexError { + message: error_string.into(), + })); + unsafe { error_out.write(err) }; +} + #[inline] pub fn try_or_default( - error_out: *mut *mut vx_error, + error_out: vx_error_out, function: impl FnOnce() -> VortexResult, ) -> T { match function() { @@ -30,10 +40,7 @@ pub fn try_or_default( value } Err(err) => { - let err = vx_error::new(Box::new(VortexError { - message: err.to_string().into(), - })); - unsafe { error_out.write(err) }; + write_error(error_out, &err.to_string()); T::default() } } diff --git a/vortex-ffi/src/expression.rs b/vortex-ffi/src/expression.rs new file mode 100644 index 00000000000..2aa93db21e8 --- /dev/null +++ b/vortex-ffi/src/expression.rs @@ -0,0 +1,27 @@ +use std::ffi::c_char; +use std::ffi::c_int; + +use vortex::expr::Expression; +use vortex::expr::root; +use vortex::expr::select; + +use crate::to_string_vec; + +crate::box_wrapper!(Expression, vx_expression); + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn vx_expression_root() -> *mut vx_expression { + vx_expression::new(Box::new(root())) +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn vx_expression_select( + names: *const *const c_char, + names_len: usize, + child: *const vx_expression, +) -> *mut vx_expression { + // TODO don't allocate, convert to [&str] + let names = unsafe { to_string_vec(names, names_len as c_int) }; + let expr = select(names, vx_expression::as_ref(child).clone()); + vx_expression::new(Box::new(expr)) +} diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index 1d97f29a01b..655b3af2ccc 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -9,12 +9,15 @@ mod array; mod array_iterator; mod binary; +mod data_source; mod dtype; mod error; +mod expression; mod file; mod log; mod macros; mod ptype; +mod scan; mod session; mod sink; mod string; @@ -25,6 +28,8 @@ use std::ffi::c_char; use std::ffi::c_int; use std::sync::LazyLock; +// TODO hack for duckdb exporter +pub use array::vx_array; pub use log::vx_log_level; use vortex::io::runtime::current::CurrentThreadRuntime; diff --git a/vortex-ffi/src/macros.rs b/vortex-ffi/src/macros.rs index a0a0f9b4711..52760e32caa 100644 --- a/vortex-ffi/src/macros.rs +++ b/vortex-ffi/src/macros.rs @@ -66,7 +66,8 @@ macro_rules! arc_dyn_wrapper { } /// Extract a borrowed reference from a const pointer. - pub(crate) fn as_ref<'a>(ptr: *const $ffi_ident) -> &'a std::sync::Arc<$T> { + /// TODO hack for duckdb exporter + pub fn as_ref<'a>(ptr: *const $ffi_ident) -> &'a std::sync::Arc<$T> { use vortex::error::VortexExpect; // TODO(joe): propagate this error up instead of expecting &unsafe { ptr.as_ref() } diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs new file mode 100644 index 00000000000..bab4b888947 --- /dev/null +++ b/vortex-ffi/src/scan.rs @@ -0,0 +1,313 @@ +#![allow(non_camel_case_types)] + +use core::slice; +use std::ffi::c_int; +use std::ops::Range; +use std::ptr; +use std::sync::Arc; +use std::sync::Mutex; + +use arrow_array::ffi_stream::FFI_ArrowArrayStream; +use futures::StreamExt; +use vortex::array::expr::stats::Precision; +use vortex::array::stream::SendableArrayStream; +use vortex::buffer::Buffer; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::io::runtime::BlockingRuntime; +use vortex::scan::Selection; +use vortex::scan::api::DataSourceScan; +use vortex::scan::api::Partition; +use vortex::scan::api::PartitionStream; +use vortex::scan::api::ScanRequest; + +use crate::RUNTIME; +use crate::array::vx_array; +use crate::data_source::vx_data_source; +use crate::error::try_or_default; +use crate::error::vx_error_out; +use crate::error::write_error; +use crate::expression::vx_expression; + +pub enum VxScanState { + Pending(Box), + Started(PartitionStream), + Finished, +} +pub type VxScan = Mutex; +crate::box_wrapper!(VxScan, vx_scan); + +pub enum VxPartitionScan { + Pending(Box), + Started(SendableArrayStream), + Finished, +} +crate::box_wrapper!( + /// A partition is a contiguous chunk of memory from which you can + /// interatively get vx_arrays. + /// TODO We're going away from exposing partitions to user, revise + /// design + VxPartitionScan, + vx_partition); + +#[repr(C)] +pub enum vx_scan_selection_include { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} + +#[repr(C)] +pub struct vx_scan_selection { + pub idx: *mut u64, + pub idx_len: usize, + pub include: vx_scan_selection_include, +} + +// Distinct from ScanRequest for easier option handling from C +#[repr(C)] +pub struct vx_scan_options { + pub projection: *const vx_expression, + pub filter: *const vx_expression, + pub row_range_begin: u64, + pub row_range_end: u64, + pub selection: vx_scan_selection, + pub limit: u64, + pub ordered: c_int, +} + +#[repr(C)] +pub enum vx_estimate_boundary { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} + +#[repr(C)] +pub struct vx_estimate { + estimate: u64, + boundary: vx_estimate_boundary, +} + +fn scan_request(opts: *const vx_scan_options) -> VortexResult { + if opts.is_null() { + return Ok(ScanRequest::default()); + } + let opts = unsafe { &*opts }; + + let projection = if opts.projection.is_null() { + vortex_bail!("empty opts.projection"); + } else { + vx_expression::as_ref(opts.projection).clone() + }; + + let filter = if opts.filter.is_null() { + None + } else { + Some(vx_expression::as_ref(opts.filter).clone()) + }; + + let selection = &opts.selection; + let selection = match selection.include { + vx_scan_selection_include::VX_S_INCLUDE_ALL => Selection::All, + vx_scan_selection_include::VX_S_INCLUDE_RANGE => { + let buf = unsafe { slice::from_raw_parts(selection.idx, selection.idx_len) }; + let buf = Buffer::copy_from(buf); + Selection::IncludeByIndex(buf) + } + vx_scan_selection_include::VX_S_EXCLUDE_RANGE => { + let buf = unsafe { slice::from_raw_parts(selection.idx, selection.idx_len) }; + let buf = Buffer::copy_from(buf); + Selection::ExcludeByIndex(buf) + } + }; + + let ordered = opts.ordered == 1; + + let start = opts.row_range_begin; + let end = opts.row_range_end; + let row_range = (start > 0 || end > 0).then_some(Range { start, end }); + + let limit = (opts.limit != 0).then_some(opts.limit); + + Ok(ScanRequest { + projection, + filter, + row_range, + selection, + ordered, + limit, + }) +} + +#[unsafe(no_mangle)] +// Create a new owned data source scan which must be freed by the caller. +// Scan can be consumed only once. +// Returns NULL and sets err on error. +// options may not be NULL. +pub unsafe extern "C-unwind" fn vx_data_source_scan( + data_source: *const vx_data_source, + options: *const vx_scan_options, + err: vx_error_out, +) -> *mut vx_scan { + try_or_default(err, || { + let request = scan_request(options)?; + RUNTIME.block_on(async { + let scan = vx_data_source::as_ref(data_source).scan(request).await?; + Ok(vx_scan::new(Box::new(Mutex::new(VxScanState::Pending(scan))))) + }) + }) +} + +fn estimate>(estimate: Option>, out: &mut vx_estimate) { + match estimate { + Some(Precision::Exact(value)) => { + out.boundary = vx_estimate_boundary::VX_ESTIMATE_EXACT; + out.estimate = value.into(); + } + Some(Precision::Inexact(value)) => { + out.boundary = vx_estimate_boundary::VX_ESTIMATE_INEXACT; + out.estimate = value.into(); + } + None => { + out.boundary = vx_estimate_boundary::VX_ESTIMATE_UNKNOWN; + } + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_scan_partition_count( + scan: *const vx_scan, + count: *mut vx_estimate, + err: vx_error_out, +) { + let count = unsafe { &mut *count }; + let scan = vx_scan::as_ref(scan); + let mut scan = scan.lock().expect("failed to lock mutex"); + let scan = &mut *scan; + let VxScanState::Pending(scan) = scan else { + write_error( + err, + "can't get partition count of a scan that's already started", + ); + return; + }; + estimate(scan.partition_count().map(|x| match x { + Precision::Exact(v) => Precision::Exact(v as u64), + Precision::Inexact(v) => Precision::Inexact(v as u64), + }), count) +} + +#[unsafe(no_mangle)] +/// Get next owned partition out of a scan request. +/// Caller must free this partition using vx_partition_free. +/// This method is thread-safe. +/// If using in a sync multi-thread runtime, users are encouraged to create a +/// worker thread per partition. +/// Returns NULL and doesn't set err on exhaustion. +/// Returns NULL and sets err on error. +pub unsafe extern "C-unwind" fn vx_scan_next( + scan: *mut vx_scan, + err: vx_error_out, +) -> *mut vx_partition { + let scan = vx_scan::as_mut(scan); + let mut scan = scan.lock().expect("failed to lock mutex"); + let scan = &mut *scan; + unsafe { + let ptr = scan as *mut VxScanState; + + let on_finish = || -> VortexResult<*mut vx_partition> { + ptr::write(ptr, VxScanState::Finished); + Ok(ptr::null_mut()) + }; + + let on_stream = |mut stream: PartitionStream| -> VortexResult<*mut vx_partition> { + match RUNTIME.block_on(stream.next()) { + Some(partition) => { + let partition = VxPartitionScan::Pending(partition?); + let partition = vx_partition::new(Box::new(partition)); + ptr::write(ptr, VxScanState::Started(stream)); + Ok(partition) + } + None => on_finish(), + } + }; + + let owned = ptr::read(ptr); + try_or_default(err, || match owned { + VxScanState::Pending(scan) => on_stream(scan.partitions()), + VxScanState::Started(stream) => on_stream(stream), + VxScanState::Finished => on_finish(), + }) + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_partition_row_count( + partition: *const vx_partition, + count: *mut vx_estimate, + err: vx_error_out +) { + let partition = vx_partition::as_ref(partition); + let VxPartitionScan::Pending(partition) = partition else { + write_error( + err, + "can't get row count of a partition that's already started", + ); + return; + }; + estimate(partition.row_count(), unsafe { &mut *count} ) +} + +// TODO export nanoarrow headers? + +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn vx_partition_scan_arrow( + _partition: *const vx_partition, + _stream: *mut FFI_ArrowArrayStream, + err: vx_error_out, +) { + write_error(err, "failed to scan partition to Arrow"); +} + +#[unsafe(no_mangle)] +/// Get next vx_array out of this partition. +/// Thread-unsafe. +pub unsafe extern "C-unwind" fn vx_partition_next( + partition: *mut vx_partition, + err: vx_error_out, +) -> *const vx_array { + let partition = vx_partition::as_mut(partition); + unsafe { + let ptr = partition as *mut VxPartitionScan; + + let on_finish = || -> VortexResult<*const vx_array> { + ptr::write(ptr, VxPartitionScan::Finished); + Ok(ptr::null_mut()) + }; + + let on_stream = |mut stream: SendableArrayStream| -> VortexResult<*const vx_array> { + match RUNTIME.block_on(stream.next()) { + Some(array) => { + let array = vx_array::new(Arc::new(array?)); + ptr::write(ptr, VxPartitionScan::Started(stream)); + Ok(array) + } + None => on_finish(), + } + }; + + let owned = ptr::read(ptr); + try_or_default(err, || match owned { + VxPartitionScan::Pending(partition) => on_stream(partition.execute()?), + VxPartitionScan::Started(stream) => on_stream(stream), + VxPartitionScan::Finished => on_finish(), + }) + } +} + +#[unsafe(no_mangle)] +/// Scan progress between 0.0 and 1.0 +pub unsafe extern "C-unwind" fn vx_scan_progress(_scan: *const vx_scan) -> f64 { + 0.0 +} diff --git a/vortex-ffi/src/session.rs b/vortex-ffi/src/session.rs index 8950abb90d4..721b5a74322 100644 --- a/vortex-ffi/src/session.rs +++ b/vortex-ffi/src/session.rs @@ -24,3 +24,12 @@ pub unsafe extern "C-unwind" fn vx_session_new() -> *mut vx_session { VortexSession::default().with_handle(RUNTIME.handle()), )) } + +#[unsafe(no_mangle)] +/// Clone a Vortex session, returning an owned copy. +/// +/// The caller is responsible for freeing the session with [`vx_session_free`]. +pub unsafe extern "C-unwind" fn vx_session_clone(session: *mut vx_session) -> *mut vx_session { + let session = vx_session::as_mut(session); + vx_session::new(Box::new(session.clone())) +}