diff --git a/ext/duckdb/duckdb.c b/ext/duckdb/duckdb.c index cdb9b492..753e503e 100644 --- a/ext/duckdb/duckdb.c +++ b/ext/duckdb/duckdb.c @@ -56,6 +56,7 @@ Init_duckdb_native(void) { rbduckdb_init_duckdb_extracted_statements(); rbduckdb_init_duckdb_instance_cache(); rbduckdb_init_duckdb_value_impl(); + rbduckdb_init_executor(); rbduckdb_init_duckdb_scalar_function(); rbduckdb_init_duckdb_scalar_function_set(); rbduckdb_init_duckdb_expression(); diff --git a/ext/duckdb/executor.c b/ext/duckdb/executor.c new file mode 100644 index 00000000..b3dac283 --- /dev/null +++ b/ext/duckdb/executor.c @@ -0,0 +1,495 @@ +#include "ruby-duckdb.h" + +/* + * Cross-platform threading primitives. + * MSVC (mswin) does not provide . + * MinGW-w64 (mingw, ucrt) provides via winpthreads. + */ +#ifdef _MSC_VER +#include +#else +#include +#endif + +#include "executor.h" + +/* + * Thread detection functions (available since Ruby 2.3). + * Used by scalar_function.c and table_function.c to determine dispatch path. + * Declared here so both can use them without duplicating the extern. + */ +extern int ruby_thread_has_gvl_p(void); +extern int ruby_native_thread_p(void); + +/* ============================================================================ + * Global Executor Thread + * ============================================================================ + * + * A single Ruby thread that processes callback requests from non-Ruby threads. + * DuckDB worker threads enqueue requests and block until completion. + * + * Modeled after FFI gem's async callback dispatcher: + * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c + */ + +/* Per-callback request, stack-allocated on the DuckDB worker thread */ +struct executor_request { + rbduckdb_callback_fn callback_func; + void *callback_data; + int request_done; +#ifdef _MSC_VER + CRITICAL_SECTION request_done_lock; + CONDITION_VARIABLE request_done_cond; +#else + pthread_mutex_t request_done_mutex; + pthread_cond_t request_done_cond; +#endif + struct executor_request *next; +}; + +/* Global executor state */ +#ifdef _MSC_VER +static CRITICAL_SECTION g_executor_lock; +static CONDITION_VARIABLE g_executor_cond; +static int g_sync_initialized = 0; +#else +static pthread_mutex_t g_executor_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t g_executor_cond = PTHREAD_COND_INITIALIZER; +#endif +static struct executor_request *g_request_list = NULL; +static VALUE g_executor_thread = Qnil; +static int g_executor_started = 0; + +/* GC protection array for proxy Ruby threads */ +static VALUE g_proxy_threads = Qnil; + +/* Data passed to the executor wait function */ +struct executor_wait_data { + struct executor_request *request; + int stop; +}; + +/* Runs without GVL: blocks on condvar waiting for a callback request */ +static void *executor_wait_func(void *data) { + struct executor_wait_data *w = (struct executor_wait_data *)data; + + w->request = NULL; + +#ifdef _MSC_VER + EnterCriticalSection(&g_executor_lock); + while (!w->stop && g_request_list == NULL) { + SleepConditionVariableCS(&g_executor_cond, &g_executor_lock, INFINITE); + } + if (g_request_list != NULL) { + w->request = g_request_list; + g_request_list = g_request_list->next; + } + LeaveCriticalSection(&g_executor_lock); +#else + pthread_mutex_lock(&g_executor_mutex); + while (!w->stop && g_request_list == NULL) { + pthread_cond_wait(&g_executor_cond, &g_executor_mutex); + } + if (g_request_list != NULL) { + w->request = g_request_list; + g_request_list = g_request_list->next; + } + pthread_mutex_unlock(&g_executor_mutex); +#endif + + return NULL; +} + +/* Unblock function: called by Ruby to interrupt the executor (e.g., VM shutdown) */ +static void executor_stop_func(void *data) { + struct executor_wait_data *w = (struct executor_wait_data *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&g_executor_lock); + w->stop = 1; + WakeConditionVariable(&g_executor_cond); + LeaveCriticalSection(&g_executor_lock); +#else + pthread_mutex_lock(&g_executor_mutex); + w->stop = 1; + pthread_cond_signal(&g_executor_cond); + pthread_mutex_unlock(&g_executor_mutex); +#endif +} + +/* The executor thread main loop (Ruby thread) */ +static VALUE executor_thread_func(void *data) { + struct executor_wait_data w; + w.stop = 0; + + while (!w.stop) { + /* Release GVL and wait for a callback request */ + rb_thread_call_without_gvl(executor_wait_func, &w, executor_stop_func, &w); + + if (w.request != NULL) { + struct executor_request *req = w.request; + + /* Execute the callback with the GVL */ + req->callback_func(req->callback_data); + + /* Signal the DuckDB worker thread that the callback is done */ +#ifdef _MSC_VER + EnterCriticalSection(&req->request_done_lock); + req->request_done = 1; + WakeConditionVariable(&req->request_done_cond); + LeaveCriticalSection(&req->request_done_lock); +#else + pthread_mutex_lock(&req->request_done_mutex); + req->request_done = 1; + pthread_cond_signal(&req->request_done_cond); + pthread_mutex_unlock(&req->request_done_mutex); +#endif + } + } + + return Qnil; +} + +/* + * Start the global executor thread (must be called with GVL held). + * + * Thread safety: This function is called from Ruby methods that always run + * with the GVL held. The GVL serializes all calls, so the g_executor_started + * check-then-set is safe without an extra mutex. + */ +void rbduckdb_executor_ensure_started(void) { + if (g_executor_started) return; + +#ifdef _MSC_VER + if (!g_sync_initialized) { + InitializeCriticalSection(&g_executor_lock); + InitializeConditionVariable(&g_executor_cond); + g_sync_initialized = 1; + } +#endif + + g_executor_thread = rb_thread_create(executor_thread_func, NULL); + rb_global_variable(&g_executor_thread); + g_executor_started = 1; +} + +/* + * Dispatch a callback to the global executor thread. + * Called from a non-Ruby thread. Blocks until the callback completes. + */ +void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data) { + struct executor_request req; + + req.callback_func = callback_func; + req.callback_data = callback_data; + req.request_done = 0; + req.next = NULL; + +#ifdef _MSC_VER + InitializeCriticalSection(&req.request_done_lock); + InitializeConditionVariable(&req.request_done_cond); + + /* Enqueue the request */ + EnterCriticalSection(&g_executor_lock); + req.next = g_request_list; + g_request_list = &req; + WakeConditionVariable(&g_executor_cond); + LeaveCriticalSection(&g_executor_lock); + + /* Wait for the executor to process our callback */ + EnterCriticalSection(&req.request_done_lock); + while (!req.request_done) { + SleepConditionVariableCS(&req.request_done_cond, &req.request_done_lock, INFINITE); + } + LeaveCriticalSection(&req.request_done_lock); + + DeleteCriticalSection(&req.request_done_lock); +#else + pthread_mutex_init(&req.request_done_mutex, NULL); + pthread_cond_init(&req.request_done_cond, NULL); + + /* Enqueue the request */ + pthread_mutex_lock(&g_executor_mutex); + req.next = g_request_list; + g_request_list = &req; + pthread_cond_signal(&g_executor_cond); + pthread_mutex_unlock(&g_executor_mutex); + + /* Wait for the executor to process our callback */ + pthread_mutex_lock(&req.request_done_mutex); + while (!req.request_done) { + pthread_cond_wait(&req.request_done_cond, &req.request_done_mutex); + } + pthread_mutex_unlock(&req.request_done_mutex); + + pthread_cond_destroy(&req.request_done_cond); + pthread_mutex_destroy(&req.request_done_mutex); +#endif +} + +/* ============================================================================ + * Per-Worker Proxy Threads + * ============================================================================ + * + * Each DuckDB worker thread can be assigned a dedicated Ruby proxy thread. + * The proxy waits for callback requests via OS condvar, acquires the GVL, + * executes the callback, and signals completion. + * + * This eliminates the global executor bottleneck — each worker has its own + * condvar and its own Ruby thread, so GVL acquisition is distributed. + */ + +struct worker_proxy { + VALUE ruby_thread; + volatile int stop_requested; + rbduckdb_callback_fn callback_func; + void *callback_data; + volatile int has_request; + volatile int request_done; + volatile int thread_exited; +#ifdef _MSC_VER + CRITICAL_SECTION lock; + CONDITION_VARIABLE request_cond; + CONDITION_VARIABLE request_done_cond; + CONDITION_VARIABLE thread_exit_cond; +#else + pthread_mutex_t lock; + pthread_cond_t request_cond; + pthread_cond_t request_done_cond; + pthread_cond_t thread_exit_cond; +#endif +}; + +/* Runs without GVL: proxy waits for a callback request */ +static void *proxy_wait_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + while (!proxy->stop_requested && !proxy->has_request) { + SleepConditionVariableCS(&proxy->request_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + while (!proxy->stop_requested && !proxy->has_request) { + pthread_cond_wait(&proxy->request_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif + + return NULL; +} + +/* Unblock function for proxy thread (VM shutdown or Thread#kill) */ +static void proxy_stop_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop_requested = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* The proxy thread main loop (Ruby thread) */ +static VALUE proxy_thread_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + + while (!proxy->stop_requested) { + /* Release GVL and wait for a request */ + rb_thread_call_without_gvl(proxy_wait_func, proxy, proxy_stop_func, proxy); + + if (proxy->stop_requested) break; + + if (proxy->has_request) { + /* Execute the callback with the GVL held */ + proxy->callback_func(proxy->callback_data); + + /* Signal completion to the DuckDB worker thread */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->has_request = 0; + proxy->request_done = 1; + WakeConditionVariable(&proxy->request_done_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->has_request = 0; + proxy->request_done = 1; + pthread_cond_signal(&proxy->request_done_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + } + } + + /* Remove ourselves from the GC protection array */ + if (g_proxy_threads != Qnil) { + rb_ary_delete(g_proxy_threads, proxy->ruby_thread); + } + + /* + * Signal that this thread has finished and no longer references + * the proxy struct. After this signal, rbduckdb_worker_proxy_destroy + * may free the struct. + */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->thread_exited = 1; + WakeConditionVariable(&proxy->thread_exit_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->thread_exited = 1; + pthread_cond_signal(&proxy->thread_exit_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + + return Qnil; +} + +/* + * Create a per-worker proxy thread. + * Must be called with GVL held (e.g., from the global executor callback). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void) { + /* + * Use calloc (not xcalloc) because rbduckdb_worker_proxy_destroy + * frees the struct from a non-Ruby thread where xfree is unsafe. + */ + struct worker_proxy *proxy = calloc(1, sizeof(struct worker_proxy)); + if (proxy == NULL) { + rb_raise(rb_eNoMemError, "failed to allocate worker_proxy"); + } + + proxy->stop_requested = 0; + proxy->has_request = 0; + proxy->request_done = 0; + proxy->thread_exited = 0; + +#ifdef _MSC_VER + InitializeCriticalSection(&proxy->lock); + InitializeConditionVariable(&proxy->request_cond); + InitializeConditionVariable(&proxy->request_done_cond); + InitializeConditionVariable(&proxy->thread_exit_cond); +#else + pthread_mutex_init(&proxy->lock, NULL); + pthread_cond_init(&proxy->request_cond, NULL); + pthread_cond_init(&proxy->request_done_cond, NULL); + pthread_cond_init(&proxy->thread_exit_cond, NULL); +#endif + + proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy); + + /* Protect from GC */ + if (g_proxy_threads != Qnil) { + rb_ary_push(g_proxy_threads, proxy->ruby_thread); + } + + return proxy; +} + +/* + * Dispatch a callback through a per-worker proxy. + * Called from the DuckDB worker thread (non-Ruby thread). + * Blocks until the proxy completes the callback. + */ +void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, + rbduckdb_callback_fn callback_func, + void *callback_data) { +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->callback_func = callback_func; + proxy->callback_data = callback_data; + proxy->request_done = 0; + proxy->has_request = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + /* Wait for completion */ + EnterCriticalSection(&proxy->lock); + while (!proxy->request_done) { + SleepConditionVariableCS(&proxy->request_done_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->callback_func = callback_func; + proxy->callback_data = callback_data; + proxy->request_done = 0; + proxy->has_request = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + /* Wait for completion */ + pthread_mutex_lock(&proxy->lock); + while (!proxy->request_done) { + pthread_cond_wait(&proxy->request_done_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* + * Destroy a per-worker proxy. + * Compatible with duckdb_delete_callback_t: void (*)(void *). + * Safe to call from non-Ruby threads — uses only OS primitives. + * + * Blocks until the proxy thread has exited and no longer references the + * struct, then destroys OS synchronization primitives and frees memory. + */ +void rbduckdb_worker_proxy_destroy(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + if (proxy == NULL) return; + + /* Signal the proxy thread to stop */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + /* Wait for the proxy thread to finish */ + EnterCriticalSection(&proxy->lock); + while (!proxy->thread_exited) { + SleepConditionVariableCS(&proxy->thread_exit_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); + + DeleteCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop_requested = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + /* Wait for the proxy thread to finish */ + pthread_mutex_lock(&proxy->lock); + while (!proxy->thread_exited) { + pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); + + pthread_cond_destroy(&proxy->thread_exit_cond); + pthread_cond_destroy(&proxy->request_done_cond); + pthread_cond_destroy(&proxy->request_cond); + pthread_mutex_destroy(&proxy->lock); +#endif + + free(proxy); +} + +/* + * Initialize the executor subsystem. + * Called once from Init_duckdb_native. + */ +void rbduckdb_init_executor(void) { + g_proxy_threads = rb_ary_new(); + rb_global_variable(&g_proxy_threads); +} diff --git a/ext/duckdb/executor.h b/ext/duckdb/executor.h new file mode 100644 index 00000000..50fa15d3 --- /dev/null +++ b/ext/duckdb/executor.h @@ -0,0 +1,63 @@ +#ifndef RUBY_DUCKDB_EXECUTOR_H +#define RUBY_DUCKDB_EXECUTOR_H + +/* + * Shared executor infrastructure for dispatching callbacks from non-Ruby + * threads (DuckDB worker threads) to Ruby threads that can safely hold + * the GVL. + * + * Two mechanisms are provided: + * + * 1. Global executor thread — a single Ruby thread that processes callbacks + * from a shared queue. Used as bootstrap and fallback. + * + * 2. Per-worker proxy threads — each DuckDB worker thread gets a dedicated + * Ruby thread. Created via init_local_state and stored in DuckDB's + * per-thread local state. Eliminates the global executor bottleneck. + */ + +/* Generic callback function signature */ +typedef void (*rbduckdb_callback_fn)(void *data); + +/* Initialize the executor subsystem (call from Init_duckdb_native) */ +void rbduckdb_init_executor(void); + +/* Ensure the global executor thread is running (call with GVL held) */ +void rbduckdb_executor_ensure_started(void); + +/* + * Dispatch a callback to the global executor thread. + * Called from a non-Ruby thread. Blocks until the callback completes. + */ +void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data); + +/* + * Per-worker proxy thread. + * Opaque structure — callers use the functions below. + */ +struct worker_proxy; + +/* + * Create a per-worker proxy thread. + * Must be called from a context where GVL can be acquired (typically + * dispatched through the global executor from init_local_state). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void); + +/* + * Dispatch a callback through a per-worker proxy. + * Called from the DuckDB worker thread that owns this proxy. + * Blocks until the callback completes. + */ +void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, + rbduckdb_callback_fn callback_func, + void *callback_data); + +/* + * Destroy a per-worker proxy. + * Compatible with duckdb_delete_callback_t signature (void (*)(void *)). + * Safe to call from non-Ruby threads — only sets flags and signals OS condvar. + */ +void rbduckdb_worker_proxy_destroy(void *proxy); + +#endif diff --git a/ext/duckdb/ruby-duckdb.h b/ext/duckdb/ruby-duckdb.h index 76594592..703ae70d 100644 --- a/ext/duckdb/ruby-duckdb.h +++ b/ext/duckdb/ruby-duckdb.h @@ -41,6 +41,7 @@ #include "./data_chunk.h" #include "./memory_helper.h" #include "./table_function.h" +#include "./executor.h" extern VALUE mDuckDB; extern VALUE cDuckDBDatabase; diff --git a/ext/duckdb/scalar_function.c b/ext/duckdb/scalar_function.c index d02188e6..1e5ddc4c 100644 --- a/ext/duckdb/scalar_function.c +++ b/ext/duckdb/scalar_function.c @@ -1,19 +1,5 @@ #include "ruby-duckdb.h" -/* - * Cross-platform threading primitives. - * MSVC (mswin) does not provide . - * MinGW-w64 (mingw, ucrt) provides via winpthreads. - * - * See also: FFI gem's approach in ext/ffi_c/Function.c - * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c - */ -#ifdef _MSC_VER -#include -#else -#include -#endif - /* * Thread detection functions (available since Ruby 2.3). * Used to determine the correct dispatch path for scalar function callbacks. @@ -57,106 +43,6 @@ static VALUE process_rows(VALUE arg); static VALUE process_no_param_rows(VALUE arg); static VALUE cleanup_callback(VALUE arg); -/* - * ============================================================================ - * Global Executor Thread - * ============================================================================ - * - * DuckDB calls scalar function callbacks from its own worker threads, which - * are NOT Ruby threads. Ruby's GVL (Global VM Lock) cannot be acquired from - * non-Ruby threads (rb_thread_call_with_gvl crashes with rb_bug). - * - * Solution (modeled after FFI gem's async callback dispatcher): - * - A global Ruby "executor" thread waits for callback requests. - * - DuckDB worker threads enqueue requests via pthread mutex/condvar and block. - * - The executor thread processes callbacks with the GVL, then signals completion. - * - * When the callback is invoked from a Ruby thread (e.g., threads=1 where DuckDB - * uses the calling thread), we use rb_thread_call_with_gvl directly, avoiding - * the executor overhead. - */ - -/* Per-callback request, stack-allocated on the DuckDB worker thread */ -struct callback_request { - struct callback_arg *cb_arg; - int done; -#ifdef _MSC_VER - CRITICAL_SECTION done_lock; - CONDITION_VARIABLE done_cond; -#else - pthread_mutex_t done_mutex; - pthread_cond_t done_cond; -#endif - struct callback_request *next; -}; - -/* Global executor state */ -#ifdef _MSC_VER -static CRITICAL_SECTION g_executor_lock; -static CONDITION_VARIABLE g_executor_cond; -static int g_sync_initialized = 0; -#else -static pthread_mutex_t g_executor_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t g_executor_cond = PTHREAD_COND_INITIALIZER; -#endif -static struct callback_request *g_request_list = NULL; -static VALUE g_executor_thread = Qnil; -static int g_executor_started = 0; - -/* Data passed to the executor wait function */ -struct executor_wait_data { - struct callback_request *request; - int stop; -}; - -/* Runs without GVL: blocks on condvar waiting for a callback request */ -static void *executor_wait_func(void *data) { - struct executor_wait_data *w = (struct executor_wait_data *)data; - - w->request = NULL; - -#ifdef _MSC_VER - EnterCriticalSection(&g_executor_lock); - while (!w->stop && g_request_list == NULL) { - SleepConditionVariableCS(&g_executor_cond, &g_executor_lock, INFINITE); - } - if (g_request_list != NULL) { - w->request = g_request_list; - g_request_list = g_request_list->next; - } - LeaveCriticalSection(&g_executor_lock); -#else - pthread_mutex_lock(&g_executor_mutex); - while (!w->stop && g_request_list == NULL) { - pthread_cond_wait(&g_executor_cond, &g_executor_mutex); - } - if (g_request_list != NULL) { - w->request = g_request_list; - g_request_list = g_request_list->next; - } - pthread_mutex_unlock(&g_executor_mutex); -#endif - - return NULL; -} - -/* Unblock function: called by Ruby to interrupt the executor (e.g., VM shutdown) */ -static void executor_stop_func(void *data) { - struct executor_wait_data *w = (struct executor_wait_data *)data; - -#ifdef _MSC_VER - EnterCriticalSection(&g_executor_lock); - w->stop = 1; - WakeConditionVariable(&g_executor_cond); - LeaveCriticalSection(&g_executor_lock); -#else - pthread_mutex_lock(&g_executor_mutex); - w->stop = 1; - pthread_cond_signal(&g_executor_cond); - pthread_mutex_unlock(&g_executor_mutex); -#endif -} - /* Execute a callback (called with GVL held) */ static VALUE execute_callback(VALUE varg) { struct callback_arg *arg = (struct callback_arg *)varg; @@ -188,115 +74,12 @@ static void execute_callback_protected(struct callback_arg *arg) { } } -/* The executor thread main loop (Ruby thread) */ -static VALUE executor_thread_func(void *data) { - struct executor_wait_data w; - w.stop = 0; - - while (!w.stop) { - /* Release GVL and wait for a callback request */ - rb_thread_call_without_gvl(executor_wait_func, &w, executor_stop_func, &w); - - if (w.request != NULL) { - struct callback_request *req = w.request; - - /* Execute the Ruby callback with the GVL */ - execute_callback_protected(req->cb_arg); - - /* Signal the DuckDB worker thread that the callback is done */ -#ifdef _MSC_VER - EnterCriticalSection(&req->done_lock); - req->done = 1; - WakeConditionVariable(&req->done_cond); - LeaveCriticalSection(&req->done_lock); -#else - pthread_mutex_lock(&req->done_mutex); - req->done = 1; - pthread_cond_signal(&req->done_cond); - pthread_mutex_unlock(&req->done_mutex); -#endif - } - } - - return Qnil; -} - -/* - * Start the global executor thread (must be called from a Ruby thread). - * - * Thread safety: This function is only called from - * rbduckdb_scalar_function_set_function(), which is a Ruby method and - * always runs with the GVL held. The GVL serializes all calls, so the - * g_executor_started check-then-set is safe without an extra mutex. - */ -static void ensure_executor_started(void) { - if (g_executor_started) return; - -#ifdef _MSC_VER - if (!g_sync_initialized) { - InitializeCriticalSection(&g_executor_lock); - InitializeConditionVariable(&g_executor_cond); - g_sync_initialized = 1; - } -#endif - - g_executor_thread = rb_thread_create(executor_thread_func, NULL); - rb_global_variable(&g_executor_thread); - g_executor_started = 1; -} - /* - * Dispatch a callback to the global executor thread. - * Called from a DuckDB worker thread (non-Ruby thread). - * The caller blocks until the callback is processed. + * Wrapper for dispatching through the shared executor. + * Adapts the generic callback signature to scalar function's callback_arg. */ -static void dispatch_callback_to_executor(struct callback_arg *arg) { - struct callback_request req; - - req.cb_arg = arg; - req.done = 0; - req.next = NULL; - -#ifdef _MSC_VER - InitializeCriticalSection(&req.done_lock); - InitializeConditionVariable(&req.done_cond); - - /* Enqueue the request */ - EnterCriticalSection(&g_executor_lock); - req.next = g_request_list; - g_request_list = &req; - WakeConditionVariable(&g_executor_cond); - LeaveCriticalSection(&g_executor_lock); - - /* Wait for the executor to process our callback */ - EnterCriticalSection(&req.done_lock); - while (!req.done) { - SleepConditionVariableCS(&req.done_cond, &req.done_lock, INFINITE); - } - LeaveCriticalSection(&req.done_lock); - - DeleteCriticalSection(&req.done_lock); -#else - pthread_mutex_init(&req.done_mutex, NULL); - pthread_cond_init(&req.done_cond, NULL); - - /* Enqueue the request */ - pthread_mutex_lock(&g_executor_mutex); - req.next = g_request_list; - g_request_list = &req; - pthread_cond_signal(&g_executor_cond); - pthread_mutex_unlock(&g_executor_mutex); - - /* Wait for the executor to process our callback */ - pthread_mutex_lock(&req.done_mutex); - while (!req.done) { - pthread_cond_wait(&req.done_cond, &req.done_mutex); - } - pthread_mutex_unlock(&req.done_mutex); - - pthread_cond_destroy(&req.done_cond); - pthread_mutex_destroy(&req.done_mutex); -#endif +static void scalar_execute_via_executor(void *data) { + execute_callback_protected((struct callback_arg *)data); } /* @@ -308,10 +91,6 @@ static void *callback_with_gvl(void *data) { return NULL; } -/* ============================================================================ - * End of Executor Thread - * ============================================================================ */ - static const rb_data_type_t scalar_function_data_type = { "DuckDB/ScalarFunction", {mark, deallocate, memsize, compact}, @@ -419,13 +198,49 @@ static VALUE rbduckdb_scalar_function_add_parameter(VALUE self, VALUE logical_ty return self; } +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +/* + * init_local_state callback for scalar functions. + * + * Called by DuckDB once per worker thread before the first UDF invocation. + * For non-Ruby threads, creates a per-worker proxy thread so that callbacks + * can be dispatched without going through the global executor bottleneck. + * + * Requires DuckDB >= 1.5.0 (duckdb_scalar_function_set_init). + */ +struct proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void create_proxy_callback(void *data) { + struct proxy_create_arg *arg = (struct proxy_create_arg *)data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void scalar_function_init_local_state(duckdb_init_info info) { + if (ruby_native_thread_p()) { + /* Ruby thread — no proxy needed (Case 1/2 handles it) */ + return; + } + + /* Non-Ruby thread — create a proxy via the global executor */ + struct proxy_create_arg arg; + arg.proxy = NULL; + rbduckdb_executor_dispatch(create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_scalar_function_init_set_state(info, arg.proxy, rbduckdb_worker_proxy_destroy); + } +} +#endif + /* * The DuckDB callback entry point. * * Three dispatch paths (modeled after FFI gem): * 1. Ruby thread WITH GVL -> call directly * 2. Ruby thread WITHOUT GVL -> rb_thread_call_with_gvl - * 3. Non-Ruby thread -> dispatch to global executor thread + * 3. Non-Ruby thread -> use per-worker proxy (or fallback to global executor) */ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chunk input, duckdb_vector output) { rubyDuckDBScalarFunction *ctx; @@ -467,8 +282,18 @@ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chun rb_thread_call_with_gvl(callback_with_gvl, &arg); } } else { - /* Case 3: Non-Ruby thread - dispatch to executor */ - dispatch_callback_to_executor(&arg); + /* Case 3: Non-Ruby thread */ +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Use per-worker proxy if available (DuckDB >= 1.5.0) */ + struct worker_proxy *proxy = (struct worker_proxy *)duckdb_scalar_function_get_state(info); + if (proxy) { + rbduckdb_worker_proxy_dispatch(proxy, scalar_execute_via_executor, &arg); + } else { + rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); + } +#else + rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); +#endif } } @@ -693,6 +518,9 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_extra_info(p->scalar_function, p, NULL); duckdb_scalar_function_set_function(p->scalar_function, scalar_function_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + duckdb_scalar_function_set_init(p->scalar_function, scalar_function_init_local_state); +#endif /* * Mark as volatile to prevent constant folding during query optimization. @@ -701,7 +529,7 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_volatile(p->scalar_function); /* Ensure the global executor thread is running for multi-thread dispatch */ - ensure_executor_started(); + rbduckdb_executor_ensure_started(); return self; } diff --git a/ext/duckdb/table_function.c b/ext/duckdb/table_function.c index 5391481b..32ceaed8 100644 --- a/ext/duckdb/table_function.c +++ b/ext/duckdb/table_function.c @@ -1,5 +1,11 @@ #include "ruby-duckdb.h" +/* + * Thread detection functions (available since Ruby 2.3). + */ +extern int ruby_thread_has_gvl_p(void); +extern int ruby_native_thread_p(void); + VALUE cDuckDBTableFunction; extern VALUE cDuckDBTableFunctionBindInfo; extern VALUE cDuckDBTableFunctionInitInfo; @@ -21,6 +27,9 @@ static VALUE rbduckdb_table_function_set_init(VALUE self); static void table_function_init_callback(duckdb_init_info info); static VALUE rbduckdb_table_function_set_execute(VALUE self); static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +static void table_function_local_init_callback(duckdb_init_info info); +#endif static const rb_data_type_t table_function_data_type = { "DuckDB/TableFunction", @@ -217,32 +226,55 @@ static VALUE call_bind_proc(VALUE arg) { return rb_funcall(args[0], rb_intern("call"), 1, args[1]); } -static void table_function_bind_callback(duckdb_bind_info info) { +/* Bind callback core logic — must be called with GVL held */ +struct table_bind_arg { + duckdb_bind_info info; rubyDuckDBTableFunction *ctx; +}; + +static void table_bind_with_gvl(void *data) { + struct table_bind_arg *arg = (struct table_bind_arg *)data; rubyDuckDBBindInfo *bind_info_ctx; VALUE bind_info_obj; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info); - if (!ctx || ctx->bind_proc == Qnil) { + if (!arg->ctx || arg->ctx->bind_proc == Qnil) { return; } - // Create BindInfo wrapper bind_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionBindInfo); bind_info_ctx = get_struct_bind_info(bind_info_obj); - bind_info_ctx->bind_info = info; + bind_info_ctx->bind_info = arg->info; - // Call Ruby block with exception protection - VALUE call_args[2] = { ctx->bind_proc, bind_info_obj }; + VALUE call_args[2] = { arg->ctx->bind_proc, bind_info_obj }; rb_protect(call_bind_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_bind_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_bind_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_bind_gvl_wrapper(void *data) { + table_bind_with_gvl(data); + return NULL; +} + +static void table_function_bind_callback(duckdb_bind_info info) { + struct table_bind_arg arg; + arg.info = info; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_bind_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_bind_gvl_wrapper, &arg); + } + } else { + rbduckdb_executor_dispatch(table_bind_with_gvl, &arg); } } @@ -281,32 +313,55 @@ static VALUE call_init_proc(VALUE args_val) { return rb_funcall(args[0], rb_intern("call"), 1, args[1]); } -static void table_function_init_callback(duckdb_init_info info) { +/* Init callback core logic — must be called with GVL held */ +struct table_init_arg { + duckdb_init_info info; rubyDuckDBTableFunction *ctx; +}; + +static void table_init_with_gvl(void *data) { + struct table_init_arg *arg = (struct table_init_arg *)data; VALUE init_info_obj; rubyDuckDBInitInfo *init_info_ctx; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info); - if (!ctx || ctx->init_proc == Qnil) { + if (!arg->ctx || arg->ctx->init_proc == Qnil) { return; } - // Create InitInfo wrapper init_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionInitInfo); init_info_ctx = get_struct_init_info(init_info_obj); - init_info_ctx->info = info; + init_info_ctx->info = arg->info; - // Call Ruby block with exception protection - VALUE call_args[2] = { ctx->init_proc, init_info_obj }; + VALUE call_args[2] = { arg->ctx->init_proc, init_info_obj }; rb_protect(call_init_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_init_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_init_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_init_gvl_wrapper(void *data) { + table_init_with_gvl(data); + return NULL; +} + +static void table_function_init_callback(duckdb_init_info info) { + struct table_init_arg arg; + arg.info = info; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_init_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_init_gvl_wrapper, &arg); + } + } else { + rbduckdb_executor_dispatch(table_init_with_gvl, &arg); } } @@ -334,6 +389,12 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) { ctx->execute_proc = rb_block_proc(); duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + duckdb_table_function_set_local_init(ctx->table_function, table_function_local_init_callback); +#endif + + /* Ensure the global executor thread is running for multi-thread dispatch */ + rbduckdb_executor_ensure_started(); return self; } @@ -343,41 +404,106 @@ static VALUE call_execute_proc(VALUE args_val) { return rb_funcall(args[0], rb_intern("call"), 2, args[1], args[2]); } -static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { +/* Execute callback core logic — must be called with GVL held */ +struct table_execute_arg { + duckdb_function_info info; + duckdb_data_chunk output; rubyDuckDBTableFunction *ctx; +}; + +static void table_execute_with_gvl(void *data) { + struct table_execute_arg *arg = (struct table_execute_arg *)data; VALUE func_info_obj; VALUE data_chunk_obj; rubyDuckDBFunctionInfo *func_info_ctx; rubyDuckDBDataChunk *data_chunk_ctx; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); - if (!ctx || ctx->execute_proc == Qnil) { + if (!arg->ctx || arg->ctx->execute_proc == Qnil) { return; } - // Create FunctionInfo wrapper func_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionFunctionInfo); func_info_ctx = get_struct_function_info(func_info_obj); - func_info_ctx->info = info; + func_info_ctx->info = arg->info; - // Create DataChunk wrapper data_chunk_obj = rb_class_new_instance(0, NULL, cDuckDBDataChunk); data_chunk_ctx = get_struct_data_chunk(data_chunk_obj); - data_chunk_ctx->data_chunk = output; + data_chunk_ctx->data_chunk = arg->output; - // Call Ruby block with exception protection - VALUE call_args[3] = { ctx->execute_proc, func_info_obj, data_chunk_obj }; + VALUE call_args[3] = { arg->ctx->execute_proc, func_info_obj, data_chunk_obj }; rb_protect(call_execute_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_function_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_function_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_execute_gvl_wrapper(void *data) { + table_execute_with_gvl(data); + return NULL; +} + +static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { + struct table_execute_arg arg; + arg.info = info; + arg.output = output; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_execute_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_execute_gvl_wrapper, &arg); + } + } else { + /* Non-Ruby thread */ +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Use per-worker proxy if available (DuckDB >= 1.5.0) */ + struct worker_proxy *proxy = (struct worker_proxy *)duckdb_function_get_local_init_data(info); + if (proxy) { + rbduckdb_worker_proxy_dispatch(proxy, table_execute_with_gvl, &arg); + } else { + rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); + } +#else + rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); +#endif + } +} + +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +/* + * local_init callback for table functions. + * Creates a per-worker proxy for non-Ruby threads. + * Requires DuckDB >= 1.5.0 (duckdb_table_function_set_local_init). + */ +struct table_proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void table_create_proxy_callback(void *data) { + struct table_proxy_create_arg *arg = (struct table_proxy_create_arg *)data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void table_function_local_init_callback(duckdb_init_info info) { + if (ruby_native_thread_p()) { + return; + } + + struct table_proxy_create_arg arg; + arg.proxy = NULL; + rbduckdb_executor_dispatch(table_create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_init_set_init_data(info, arg.proxy, rbduckdb_worker_proxy_destroy); } } +#endif rubyDuckDBTableFunction *get_struct_table_function(VALUE self) { rubyDuckDBTableFunction *ctx; diff --git a/lib/duckdb/connection.rb b/lib/duckdb/connection.rb index a29f2bc0..cd8b135a 100644 --- a/lib/duckdb/connection.rb +++ b/lib/duckdb/connection.rb @@ -236,12 +236,11 @@ def register_table_function(table_function) # @param columns [Hash{String => DuckDB::LogicalType}, nil] optional column schema override; # if omitted, the adapter determines the columns (e.g. from headers or inference) # @raise [ArgumentError] if no adapter is registered for the object's class - # @raise [DuckDB::Error] if threads setting is not 1 + # @raise [DuckDB::Error] if threads > 1 on DuckDB < 1.5.0 # @return [void] # # @example Expose a CSV as a table # require 'csv' - # con.execute('SET threads=1') # DuckDB::TableFunction.add_table_adapter(CSV, CSVTableAdapter.new) # csv = CSV.new(File.read('data.csv'), headers: true) # con.expose_as_table(csv, 'csv_table') @@ -263,16 +262,22 @@ def expose_as_table(object, name, columns: nil) private + # DuckDB >= 1.5.0 provides per-worker proxy threads via init_local_state, + # making table function callbacks thread-safe with multiple DuckDB threads. + # On older versions, the global executor serializes all callbacks and can + # deadlock under concurrent workloads, so we enforce threads=1. def check_threads + return if Gem::Version.new(LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + result = execute("SELECT current_setting('threads')") thread_count = result.first.first.to_i return unless thread_count > 1 raise DuckDB::Error, - 'Functions with Ruby callbacks require single-threaded execution. ' \ - "Current threads setting: #{thread_count}. " \ - "Execute 'SET threads=1' before registering functions." + 'Table functions with Ruby callbacks require single-threaded execution ' \ + "on DuckDB < 1.5.0. Current threads setting: #{thread_count}. " \ + "Execute 'SET threads=1' before registering table functions." end def run_appender_block(appender, &) diff --git a/test/duckdb_test/gc_stress_test.rb b/test/duckdb_test/gc_stress_test.rb index 00013716..4d11bb47 100644 --- a/test/duckdb_test/gc_stress_test.rb +++ b/test/duckdb_test/gc_stress_test.rb @@ -83,7 +83,7 @@ def test_table_function_with_gc_compaction skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? - @con.execute('SET threads=1') # Table functions still require single-threaded execution + @con.execute('SET threads=1') # Capture local variables multiplier = 3 @@ -136,7 +136,7 @@ def test_mixed_functions_gc_stress skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? - @con.execute('SET threads=1') # Table functions still require single-threaded execution + @con.execute('SET threads=1') # Register both scalar and table functions @con.register_scalar_function(DuckDB::ScalarFunction.new.tap do |sf| diff --git a/test/duckdb_test/scalar_function_test.rb b/test/duckdb_test/scalar_function_test.rb index 96c78cab..2fc697a3 100644 --- a/test/duckdb_test/scalar_function_test.rb +++ b/test/duckdb_test/scalar_function_test.rb @@ -674,7 +674,11 @@ def test_create_with_different_types # rubocop:disable Metrics/MethodLength assert_equal 'Hello - World', rows[0][0] end - def test_scalar_function_with_multithread + def test_scalar_function_with_multithread # rubocop:disable Metrics/MethodLength + unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + skip 'per-worker proxy requires duckdb >= 1.5.0' + end + @con.execute('SET threads=4') @con.execute('CREATE TABLE large_test AS SELECT range::INTEGER AS value FROM range(10000)') diff --git a/test/duckdb_test/table_function_csv_test.rb b/test/duckdb_test/table_function_csv_test.rb index a93ece42..bd4f2dd8 100644 --- a/test/duckdb_test/table_function_csv_test.rb +++ b/test/duckdb_test/table_function_csv_test.rb @@ -50,7 +50,7 @@ def csv_to_duckdb_data(csv, output) def setup @db = DuckDB::Database.open @con = @db.connect - @con.execute('SET threads=1') # Required for Ruby callbacks + @con.execute('SET threads=1') end def teardown diff --git a/test/duckdb_test/table_function_integration_test.rb b/test/duckdb_test/table_function_integration_test.rb index 88eb0825..619a6f83 100644 --- a/test/duckdb_test/table_function_integration_test.rb +++ b/test/duckdb_test/table_function_integration_test.rb @@ -7,7 +7,7 @@ class TableFunctionIntegrationTest < Minitest::Test def setup @database = DuckDB::Database.open @connection = @database.connect - @connection.execute('SET threads=1') # Required for Ruby callbacks + @connection.execute('SET threads=1') end def teardown diff --git a/test/duckdb_test/table_function_test.rb b/test/duckdb_test/table_function_test.rb index 249c0fa2..16399986 100644 --- a/test/duckdb_test/table_function_test.rb +++ b/test/duckdb_test/table_function_test.rb @@ -206,6 +206,48 @@ def test_symbol_columns end # rubocop:enable Metrics/AbcSize, Metrics/MethodLength + # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + def test_table_function_with_multithread + unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + skip 'per-worker proxy requires duckdb >= 1.5.0' + end + + db = DuckDB::Database.open + conn = db.connect + conn.execute('SET threads=4') + + row_count = 100 + called = 0 + tf = DuckDB::TableFunction.new + tf.name = 'mt_generate' + + tf.bind do |bind_info| + bind_info.add_result_column('value', DuckDB::LogicalType::BIGINT) + end + + tf.init { |_init_info| nil } + + tf.execute do |_func_info, output| + called += 1 + if called > 1 + output.size = 0 + else + row_count.times { |i| output.set_value(0, i, i * 2) } + output.size = row_count + end + end + + conn.register_table_function(tf) + result = conn.execute('SELECT SUM(value) FROM mt_generate()') + expected_sum = (0...row_count).sum { |i| i * 2 } + + assert_equal expected_sum, result.first.first + + conn.disconnect + db.close + end + # rubocop:enable Metrics/AbcSize, Metrics/MethodLength + private def setup_incomplete_function