From 0d3b1eea3c3d9b453b19cdb02677b2cd13672772 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 17:18:23 +0900 Subject: [PATCH 1/8] fix: extract shared executor and add per-worker proxy for scalar UDF callbacks DuckDB calls scalar function callbacks from its own worker threads, which are not Ruby threads. Ruby's GVL cannot be acquired from non-Ruby threads (rb_thread_call_with_gvl crashes with rb_bug), so an executor pattern is required. Previously, scalar_function.c contained a single global executor thread that serialized all callbacks through one condvar. On Windows, this caused deadlocks due to GVL starvation under Minitest parallel execution (Issue #1158). This commit makes two changes: 1. Extract the executor infrastructure into a shared module (executor.c/executor.h) so both scalar_function.c and table_function.c can use it. 2. Add per-worker proxy threads using DuckDB's init_local_state API. Each DuckDB worker thread gets a dedicated Ruby proxy thread created via duckdb_scalar_function_set_init(). The proxy acquires the GVL independently, eliminating the global executor bottleneck. The global executor is retained as a bootstrap mechanism for proxy creation and as a fallback. Refs: #1158 Co-Authored-By: Claude Opus 4.6 (1M context) --- ext/duckdb/duckdb.c | 1 + ext/duckdb/executor.c | 449 +++++++++++++++++++++++++++++++++++ ext/duckdb/executor.h | 62 +++++ ext/duckdb/ruby-duckdb.h | 1 + ext/duckdb/scalar_function.c | 276 ++++----------------- 5 files changed, 560 insertions(+), 229 deletions(-) create mode 100644 ext/duckdb/executor.c create mode 100644 ext/duckdb/executor.h diff --git a/ext/duckdb/duckdb.c b/ext/duckdb/duckdb.c index cdb9b492..753e503e 100644 --- a/ext/duckdb/duckdb.c +++ b/ext/duckdb/duckdb.c @@ -56,6 +56,7 @@ Init_duckdb_native(void) { rbduckdb_init_duckdb_extracted_statements(); rbduckdb_init_duckdb_instance_cache(); rbduckdb_init_duckdb_value_impl(); + rbduckdb_init_executor(); rbduckdb_init_duckdb_scalar_function(); rbduckdb_init_duckdb_scalar_function_set(); rbduckdb_init_duckdb_expression(); diff --git a/ext/duckdb/executor.c b/ext/duckdb/executor.c new file mode 100644 index 00000000..3bc543bd --- /dev/null +++ b/ext/duckdb/executor.c @@ -0,0 +1,449 @@ +#include "ruby-duckdb.h" + +/* + * Cross-platform threading primitives. + * MSVC (mswin) does not provide . + * MinGW-w64 (mingw, ucrt) provides via winpthreads. + */ +#ifdef _MSC_VER +#include +#else +#include +#endif + +#include "executor.h" + +/* + * Thread detection functions (available since Ruby 2.3). + * Used by scalar_function.c and table_function.c to determine dispatch path. + * Declared here so both can use them without duplicating the extern. + */ +extern int ruby_thread_has_gvl_p(void); +extern int ruby_native_thread_p(void); + +/* ============================================================================ + * Global Executor Thread + * ============================================================================ + * + * A single Ruby thread that processes callback requests from non-Ruby threads. + * DuckDB worker threads enqueue requests and block until completion. + * + * Modeled after FFI gem's async callback dispatcher: + * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c + */ + +/* Per-callback request, stack-allocated on the DuckDB worker thread */ +struct executor_request { + rbduckdb_callback_fn fn; + void *data; + int done; +#ifdef _MSC_VER + CRITICAL_SECTION done_lock; + CONDITION_VARIABLE done_cond; +#else + pthread_mutex_t done_mutex; + pthread_cond_t done_cond; +#endif + struct executor_request *next; +}; + +/* Global executor state */ +#ifdef _MSC_VER +static CRITICAL_SECTION g_executor_lock; +static CONDITION_VARIABLE g_executor_cond; +static int g_sync_initialized = 0; +#else +static pthread_mutex_t g_executor_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t g_executor_cond = PTHREAD_COND_INITIALIZER; +#endif +static struct executor_request *g_request_list = NULL; +static VALUE g_executor_thread = Qnil; +static int g_executor_started = 0; + +/* GC protection array for proxy Ruby threads */ +static VALUE g_proxy_threads = Qnil; + +/* Data passed to the executor wait function */ +struct executor_wait_data { + struct executor_request *request; + int stop; +}; + +/* Runs without GVL: blocks on condvar waiting for a callback request */ +static void *executor_wait_func(void *data) { + struct executor_wait_data *w = (struct executor_wait_data *)data; + + w->request = NULL; + +#ifdef _MSC_VER + EnterCriticalSection(&g_executor_lock); + while (!w->stop && g_request_list == NULL) { + SleepConditionVariableCS(&g_executor_cond, &g_executor_lock, INFINITE); + } + if (g_request_list != NULL) { + w->request = g_request_list; + g_request_list = g_request_list->next; + } + LeaveCriticalSection(&g_executor_lock); +#else + pthread_mutex_lock(&g_executor_mutex); + while (!w->stop && g_request_list == NULL) { + pthread_cond_wait(&g_executor_cond, &g_executor_mutex); + } + if (g_request_list != NULL) { + w->request = g_request_list; + g_request_list = g_request_list->next; + } + pthread_mutex_unlock(&g_executor_mutex); +#endif + + return NULL; +} + +/* Unblock function: called by Ruby to interrupt the executor (e.g., VM shutdown) */ +static void executor_stop_func(void *data) { + struct executor_wait_data *w = (struct executor_wait_data *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&g_executor_lock); + w->stop = 1; + WakeConditionVariable(&g_executor_cond); + LeaveCriticalSection(&g_executor_lock); +#else + pthread_mutex_lock(&g_executor_mutex); + w->stop = 1; + pthread_cond_signal(&g_executor_cond); + pthread_mutex_unlock(&g_executor_mutex); +#endif +} + +/* The executor thread main loop (Ruby thread) */ +static VALUE executor_thread_func(void *data) { + struct executor_wait_data w; + w.stop = 0; + + while (!w.stop) { + /* Release GVL and wait for a callback request */ + rb_thread_call_without_gvl(executor_wait_func, &w, executor_stop_func, &w); + + if (w.request != NULL) { + struct executor_request *req = w.request; + + /* Execute the callback with the GVL */ + req->fn(req->data); + + /* Signal the DuckDB worker thread that the callback is done */ +#ifdef _MSC_VER + EnterCriticalSection(&req->done_lock); + req->done = 1; + WakeConditionVariable(&req->done_cond); + LeaveCriticalSection(&req->done_lock); +#else + pthread_mutex_lock(&req->done_mutex); + req->done = 1; + pthread_cond_signal(&req->done_cond); + pthread_mutex_unlock(&req->done_mutex); +#endif + } + } + + return Qnil; +} + +/* + * Start the global executor thread (must be called with GVL held). + * + * Thread safety: This function is called from Ruby methods that always run + * with the GVL held. The GVL serializes all calls, so the g_executor_started + * check-then-set is safe without an extra mutex. + */ +void rbduckdb_executor_ensure_started(void) { + if (g_executor_started) return; + +#ifdef _MSC_VER + if (!g_sync_initialized) { + InitializeCriticalSection(&g_executor_lock); + InitializeConditionVariable(&g_executor_cond); + g_sync_initialized = 1; + } +#endif + + g_executor_thread = rb_thread_create(executor_thread_func, NULL); + rb_global_variable(&g_executor_thread); + g_executor_started = 1; +} + +/* + * Dispatch a callback to the global executor thread. + * Called from a non-Ruby thread. Blocks until the callback completes. + */ +void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data) { + struct executor_request req; + + req.fn = fn; + req.data = data; + req.done = 0; + req.next = NULL; + +#ifdef _MSC_VER + InitializeCriticalSection(&req.done_lock); + InitializeConditionVariable(&req.done_cond); + + /* Enqueue the request */ + EnterCriticalSection(&g_executor_lock); + req.next = g_request_list; + g_request_list = &req; + WakeConditionVariable(&g_executor_cond); + LeaveCriticalSection(&g_executor_lock); + + /* Wait for the executor to process our callback */ + EnterCriticalSection(&req.done_lock); + while (!req.done) { + SleepConditionVariableCS(&req.done_cond, &req.done_lock, INFINITE); + } + LeaveCriticalSection(&req.done_lock); + + DeleteCriticalSection(&req.done_lock); +#else + pthread_mutex_init(&req.done_mutex, NULL); + pthread_cond_init(&req.done_cond, NULL); + + /* Enqueue the request */ + pthread_mutex_lock(&g_executor_mutex); + req.next = g_request_list; + g_request_list = &req; + pthread_cond_signal(&g_executor_cond); + pthread_mutex_unlock(&g_executor_mutex); + + /* Wait for the executor to process our callback */ + pthread_mutex_lock(&req.done_mutex); + while (!req.done) { + pthread_cond_wait(&req.done_cond, &req.done_mutex); + } + pthread_mutex_unlock(&req.done_mutex); + + pthread_cond_destroy(&req.done_cond); + pthread_mutex_destroy(&req.done_mutex); +#endif +} + +/* ============================================================================ + * Per-Worker Proxy Threads + * ============================================================================ + * + * Each DuckDB worker thread can be assigned a dedicated Ruby proxy thread. + * The proxy waits for callback requests via OS condvar, acquires the GVL, + * executes the callback, and signals completion. + * + * This eliminates the global executor bottleneck — each worker has its own + * condvar and its own Ruby thread, so GVL acquisition is distributed. + */ + +struct worker_proxy { + VALUE ruby_thread; + volatile int stop; + rbduckdb_callback_fn fn; + void *data; + volatile int has_request; + volatile int done; +#ifdef _MSC_VER + CRITICAL_SECTION lock; + CONDITION_VARIABLE request_cond; + CONDITION_VARIABLE done_cond; +#else + pthread_mutex_t lock; + pthread_cond_t request_cond; + pthread_cond_t done_cond; +#endif +}; + +/* Runs without GVL: proxy waits for a callback request */ +static void *proxy_wait_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + while (!proxy->stop && !proxy->has_request) { + SleepConditionVariableCS(&proxy->request_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + while (!proxy->stop && !proxy->has_request) { + pthread_cond_wait(&proxy->request_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif + + return NULL; +} + +/* Unblock function for proxy thread (VM shutdown or Thread#kill) */ +static void proxy_stop_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* The proxy thread main loop (Ruby thread) */ +static VALUE proxy_thread_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + + while (!proxy->stop) { + /* Release GVL and wait for a request */ + rb_thread_call_without_gvl(proxy_wait_func, proxy, proxy_stop_func, proxy); + + if (proxy->stop) break; + + if (proxy->has_request) { + /* Execute the callback with the GVL held */ + proxy->fn(proxy->data); + + /* Signal completion to the DuckDB worker thread */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->has_request = 0; + proxy->done = 1; + WakeConditionVariable(&proxy->done_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->has_request = 0; + proxy->done = 1; + pthread_cond_signal(&proxy->done_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + } + } + + /* Remove ourselves from the GC protection array */ + if (g_proxy_threads != Qnil) { + rb_ary_delete(g_proxy_threads, proxy->ruby_thread); + } + + return Qnil; +} + +/* + * Create a per-worker proxy thread. + * Must be called with GVL held (e.g., from the global executor callback). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void) { + struct worker_proxy *proxy = xcalloc(1, sizeof(struct worker_proxy)); + + proxy->stop = 0; + proxy->has_request = 0; + proxy->done = 0; + +#ifdef _MSC_VER + InitializeCriticalSection(&proxy->lock); + InitializeConditionVariable(&proxy->request_cond); + InitializeConditionVariable(&proxy->done_cond); +#else + pthread_mutex_init(&proxy->lock, NULL); + pthread_cond_init(&proxy->request_cond, NULL); + pthread_cond_init(&proxy->done_cond, NULL); +#endif + + proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy); + + /* Protect from GC */ + if (g_proxy_threads != Qnil) { + rb_ary_push(g_proxy_threads, proxy->ruby_thread); + } + + return proxy; +} + +/* + * Dispatch a callback through a per-worker proxy. + * Called from the DuckDB worker thread (non-Ruby thread). + * Blocks until the proxy completes the callback. + */ +void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, + rbduckdb_callback_fn fn, void *data) { +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->fn = fn; + proxy->data = data; + proxy->done = 0; + proxy->has_request = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + /* Wait for completion */ + EnterCriticalSection(&proxy->lock); + while (!proxy->done) { + SleepConditionVariableCS(&proxy->done_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->fn = fn; + proxy->data = data; + proxy->done = 0; + proxy->has_request = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + /* Wait for completion */ + pthread_mutex_lock(&proxy->lock); + while (!proxy->done) { + pthread_cond_wait(&proxy->done_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* + * Destroy a per-worker proxy. + * Compatible with duckdb_delete_callback_t: void (*)(void *). + * Safe to call from non-Ruby threads — uses only OS primitives. + */ +void rbduckdb_worker_proxy_destroy(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + if (proxy == NULL) return; + + /* Signal the proxy thread to stop (OS primitives only, no Ruby API) */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + + /* + * The proxy thread will exit its loop, remove itself from the + * GC protection array, and the Ruby thread object will be collected. + * We do NOT free the proxy struct here because the proxy thread may + * still be referencing it. The proxy struct is freed when the Ruby + * thread finishes and the GC collects it. + * + * For simplicity, we accept this small leak per query — the proxy + * struct is ~100 bytes and there are at most N (thread count) per query. + */ +} + +/* + * Initialize the executor subsystem. + * Called once from Init_duckdb_native. + */ +void rbduckdb_init_executor(void) { + g_proxy_threads = rb_ary_new(); + rb_global_variable(&g_proxy_threads); +} diff --git a/ext/duckdb/executor.h b/ext/duckdb/executor.h new file mode 100644 index 00000000..5c37fc89 --- /dev/null +++ b/ext/duckdb/executor.h @@ -0,0 +1,62 @@ +#ifndef RUBY_DUCKDB_EXECUTOR_H +#define RUBY_DUCKDB_EXECUTOR_H + +/* + * Shared executor infrastructure for dispatching callbacks from non-Ruby + * threads (DuckDB worker threads) to Ruby threads that can safely hold + * the GVL. + * + * Two mechanisms are provided: + * + * 1. Global executor thread — a single Ruby thread that processes callbacks + * from a shared queue. Used as bootstrap and fallback. + * + * 2. Per-worker proxy threads — each DuckDB worker thread gets a dedicated + * Ruby thread. Created via init_local_state and stored in DuckDB's + * per-thread local state. Eliminates the global executor bottleneck. + */ + +/* Generic callback function signature */ +typedef void (*rbduckdb_callback_fn)(void *data); + +/* Initialize the executor subsystem (call from Init_duckdb_native) */ +void rbduckdb_init_executor(void); + +/* Ensure the global executor thread is running (call with GVL held) */ +void rbduckdb_executor_ensure_started(void); + +/* + * Dispatch a callback to the global executor thread. + * Called from a non-Ruby thread. Blocks until the callback completes. + */ +void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data); + +/* + * Per-worker proxy thread. + * Opaque structure — callers use the functions below. + */ +struct worker_proxy; + +/* + * Create a per-worker proxy thread. + * Must be called from a context where GVL can be acquired (typically + * dispatched through the global executor from init_local_state). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void); + +/* + * Dispatch a callback through a per-worker proxy. + * Called from the DuckDB worker thread that owns this proxy. + * Blocks until the callback completes. + */ +void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, + rbduckdb_callback_fn fn, void *data); + +/* + * Destroy a per-worker proxy. + * Compatible with duckdb_delete_callback_t signature (void (*)(void *)). + * Safe to call from non-Ruby threads — only sets flags and signals OS condvar. + */ +void rbduckdb_worker_proxy_destroy(void *proxy); + +#endif diff --git a/ext/duckdb/ruby-duckdb.h b/ext/duckdb/ruby-duckdb.h index 76594592..703ae70d 100644 --- a/ext/duckdb/ruby-duckdb.h +++ b/ext/duckdb/ruby-duckdb.h @@ -41,6 +41,7 @@ #include "./data_chunk.h" #include "./memory_helper.h" #include "./table_function.h" +#include "./executor.h" extern VALUE mDuckDB; extern VALUE cDuckDBDatabase; diff --git a/ext/duckdb/scalar_function.c b/ext/duckdb/scalar_function.c index d02188e6..e9386610 100644 --- a/ext/duckdb/scalar_function.c +++ b/ext/duckdb/scalar_function.c @@ -1,19 +1,5 @@ #include "ruby-duckdb.h" -/* - * Cross-platform threading primitives. - * MSVC (mswin) does not provide . - * MinGW-w64 (mingw, ucrt) provides via winpthreads. - * - * See also: FFI gem's approach in ext/ffi_c/Function.c - * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c - */ -#ifdef _MSC_VER -#include -#else -#include -#endif - /* * Thread detection functions (available since Ruby 2.3). * Used to determine the correct dispatch path for scalar function callbacks. @@ -57,106 +43,6 @@ static VALUE process_rows(VALUE arg); static VALUE process_no_param_rows(VALUE arg); static VALUE cleanup_callback(VALUE arg); -/* - * ============================================================================ - * Global Executor Thread - * ============================================================================ - * - * DuckDB calls scalar function callbacks from its own worker threads, which - * are NOT Ruby threads. Ruby's GVL (Global VM Lock) cannot be acquired from - * non-Ruby threads (rb_thread_call_with_gvl crashes with rb_bug). - * - * Solution (modeled after FFI gem's async callback dispatcher): - * - A global Ruby "executor" thread waits for callback requests. - * - DuckDB worker threads enqueue requests via pthread mutex/condvar and block. - * - The executor thread processes callbacks with the GVL, then signals completion. - * - * When the callback is invoked from a Ruby thread (e.g., threads=1 where DuckDB - * uses the calling thread), we use rb_thread_call_with_gvl directly, avoiding - * the executor overhead. - */ - -/* Per-callback request, stack-allocated on the DuckDB worker thread */ -struct callback_request { - struct callback_arg *cb_arg; - int done; -#ifdef _MSC_VER - CRITICAL_SECTION done_lock; - CONDITION_VARIABLE done_cond; -#else - pthread_mutex_t done_mutex; - pthread_cond_t done_cond; -#endif - struct callback_request *next; -}; - -/* Global executor state */ -#ifdef _MSC_VER -static CRITICAL_SECTION g_executor_lock; -static CONDITION_VARIABLE g_executor_cond; -static int g_sync_initialized = 0; -#else -static pthread_mutex_t g_executor_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t g_executor_cond = PTHREAD_COND_INITIALIZER; -#endif -static struct callback_request *g_request_list = NULL; -static VALUE g_executor_thread = Qnil; -static int g_executor_started = 0; - -/* Data passed to the executor wait function */ -struct executor_wait_data { - struct callback_request *request; - int stop; -}; - -/* Runs without GVL: blocks on condvar waiting for a callback request */ -static void *executor_wait_func(void *data) { - struct executor_wait_data *w = (struct executor_wait_data *)data; - - w->request = NULL; - -#ifdef _MSC_VER - EnterCriticalSection(&g_executor_lock); - while (!w->stop && g_request_list == NULL) { - SleepConditionVariableCS(&g_executor_cond, &g_executor_lock, INFINITE); - } - if (g_request_list != NULL) { - w->request = g_request_list; - g_request_list = g_request_list->next; - } - LeaveCriticalSection(&g_executor_lock); -#else - pthread_mutex_lock(&g_executor_mutex); - while (!w->stop && g_request_list == NULL) { - pthread_cond_wait(&g_executor_cond, &g_executor_mutex); - } - if (g_request_list != NULL) { - w->request = g_request_list; - g_request_list = g_request_list->next; - } - pthread_mutex_unlock(&g_executor_mutex); -#endif - - return NULL; -} - -/* Unblock function: called by Ruby to interrupt the executor (e.g., VM shutdown) */ -static void executor_stop_func(void *data) { - struct executor_wait_data *w = (struct executor_wait_data *)data; - -#ifdef _MSC_VER - EnterCriticalSection(&g_executor_lock); - w->stop = 1; - WakeConditionVariable(&g_executor_cond); - LeaveCriticalSection(&g_executor_lock); -#else - pthread_mutex_lock(&g_executor_mutex); - w->stop = 1; - pthread_cond_signal(&g_executor_cond); - pthread_mutex_unlock(&g_executor_mutex); -#endif -} - /* Execute a callback (called with GVL held) */ static VALUE execute_callback(VALUE varg) { struct callback_arg *arg = (struct callback_arg *)varg; @@ -188,115 +74,12 @@ static void execute_callback_protected(struct callback_arg *arg) { } } -/* The executor thread main loop (Ruby thread) */ -static VALUE executor_thread_func(void *data) { - struct executor_wait_data w; - w.stop = 0; - - while (!w.stop) { - /* Release GVL and wait for a callback request */ - rb_thread_call_without_gvl(executor_wait_func, &w, executor_stop_func, &w); - - if (w.request != NULL) { - struct callback_request *req = w.request; - - /* Execute the Ruby callback with the GVL */ - execute_callback_protected(req->cb_arg); - - /* Signal the DuckDB worker thread that the callback is done */ -#ifdef _MSC_VER - EnterCriticalSection(&req->done_lock); - req->done = 1; - WakeConditionVariable(&req->done_cond); - LeaveCriticalSection(&req->done_lock); -#else - pthread_mutex_lock(&req->done_mutex); - req->done = 1; - pthread_cond_signal(&req->done_cond); - pthread_mutex_unlock(&req->done_mutex); -#endif - } - } - - return Qnil; -} - /* - * Start the global executor thread (must be called from a Ruby thread). - * - * Thread safety: This function is only called from - * rbduckdb_scalar_function_set_function(), which is a Ruby method and - * always runs with the GVL held. The GVL serializes all calls, so the - * g_executor_started check-then-set is safe without an extra mutex. + * Wrapper for dispatching through the shared executor. + * Adapts the generic callback signature to scalar function's callback_arg. */ -static void ensure_executor_started(void) { - if (g_executor_started) return; - -#ifdef _MSC_VER - if (!g_sync_initialized) { - InitializeCriticalSection(&g_executor_lock); - InitializeConditionVariable(&g_executor_cond); - g_sync_initialized = 1; - } -#endif - - g_executor_thread = rb_thread_create(executor_thread_func, NULL); - rb_global_variable(&g_executor_thread); - g_executor_started = 1; -} - -/* - * Dispatch a callback to the global executor thread. - * Called from a DuckDB worker thread (non-Ruby thread). - * The caller blocks until the callback is processed. - */ -static void dispatch_callback_to_executor(struct callback_arg *arg) { - struct callback_request req; - - req.cb_arg = arg; - req.done = 0; - req.next = NULL; - -#ifdef _MSC_VER - InitializeCriticalSection(&req.done_lock); - InitializeConditionVariable(&req.done_cond); - - /* Enqueue the request */ - EnterCriticalSection(&g_executor_lock); - req.next = g_request_list; - g_request_list = &req; - WakeConditionVariable(&g_executor_cond); - LeaveCriticalSection(&g_executor_lock); - - /* Wait for the executor to process our callback */ - EnterCriticalSection(&req.done_lock); - while (!req.done) { - SleepConditionVariableCS(&req.done_cond, &req.done_lock, INFINITE); - } - LeaveCriticalSection(&req.done_lock); - - DeleteCriticalSection(&req.done_lock); -#else - pthread_mutex_init(&req.done_mutex, NULL); - pthread_cond_init(&req.done_cond, NULL); - - /* Enqueue the request */ - pthread_mutex_lock(&g_executor_mutex); - req.next = g_request_list; - g_request_list = &req; - pthread_cond_signal(&g_executor_cond); - pthread_mutex_unlock(&g_executor_mutex); - - /* Wait for the executor to process our callback */ - pthread_mutex_lock(&req.done_mutex); - while (!req.done) { - pthread_cond_wait(&req.done_cond, &req.done_mutex); - } - pthread_mutex_unlock(&req.done_mutex); - - pthread_cond_destroy(&req.done_cond); - pthread_mutex_destroy(&req.done_mutex); -#endif +static void scalar_execute_via_executor(void *data) { + execute_callback_protected((struct callback_arg *)data); } /* @@ -308,10 +91,6 @@ static void *callback_with_gvl(void *data) { return NULL; } -/* ============================================================================ - * End of Executor Thread - * ============================================================================ */ - static const rb_data_type_t scalar_function_data_type = { "DuckDB/ScalarFunction", {mark, deallocate, memsize, compact}, @@ -419,13 +198,45 @@ static VALUE rbduckdb_scalar_function_add_parameter(VALUE self, VALUE logical_ty return self; } +/* + * init_local_state callback for scalar functions. + * + * Called by DuckDB once per worker thread before the first UDF invocation. + * For non-Ruby threads, creates a per-worker proxy thread so that callbacks + * can be dispatched without going through the global executor bottleneck. + */ +struct proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void create_proxy_callback(void *data) { + struct proxy_create_arg *arg = (struct proxy_create_arg *)data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void scalar_function_init_local_state(duckdb_init_info info) { + if (ruby_native_thread_p()) { + /* Ruby thread — no proxy needed (Case 1/2 handles it) */ + return; + } + + /* Non-Ruby thread — create a proxy via the global executor */ + struct proxy_create_arg arg; + arg.proxy = NULL; + rbduckdb_executor_dispatch(create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_scalar_function_init_set_state(info, arg.proxy, rbduckdb_worker_proxy_destroy); + } +} + /* * The DuckDB callback entry point. * * Three dispatch paths (modeled after FFI gem): * 1. Ruby thread WITH GVL -> call directly * 2. Ruby thread WITHOUT GVL -> rb_thread_call_with_gvl - * 3. Non-Ruby thread -> dispatch to global executor thread + * 3. Non-Ruby thread -> use per-worker proxy (or fallback to global executor) */ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chunk input, duckdb_vector output) { rubyDuckDBScalarFunction *ctx; @@ -467,8 +278,14 @@ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chun rb_thread_call_with_gvl(callback_with_gvl, &arg); } } else { - /* Case 3: Non-Ruby thread - dispatch to executor */ - dispatch_callback_to_executor(&arg); + /* Case 3: Non-Ruby thread - use per-worker proxy if available */ + struct worker_proxy *proxy = (struct worker_proxy *)duckdb_scalar_function_get_state(info); + if (proxy) { + rbduckdb_worker_proxy_dispatch(proxy, scalar_execute_via_executor, &arg); + } else { + /* Fallback to global executor */ + rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); + } } } @@ -693,6 +510,7 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_extra_info(p->scalar_function, p, NULL); duckdb_scalar_function_set_function(p->scalar_function, scalar_function_callback); + duckdb_scalar_function_set_init(p->scalar_function, scalar_function_init_local_state); /* * Mark as volatile to prevent constant folding during query optimization. @@ -701,7 +519,7 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_volatile(p->scalar_function); /* Ensure the global executor thread is running for multi-thread dispatch */ - ensure_executor_started(); + rbduckdb_executor_ensure_started(); return self; } From 7b5b623b63f066a65deff06a5977bf86b5499b5f Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 17:18:38 +0900 Subject: [PATCH 2/8] fix: add three-path dispatch to table function callbacks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit table_function.c previously called Ruby C API functions (rb_funcall, rb_class_new_instance, rb_protect) directly from whichever thread DuckDB invoked the callback on. When called from a non-Ruby worker thread, this was undefined behavior — potential crash or deadlock. The workaround was a Ruby-level check_threads guard in connection.rb that raised an error if threads > 1 when registering table functions, forcing all table function queries to run single-threaded. This commit adds the same three-path dispatch pattern used by scalar_function.c to all three table function callbacks: 1. Ruby thread WITH GVL -> call directly 2. Ruby thread WITHOUT GVL -> rb_thread_call_with_gvl 3. Non-Ruby thread -> per-worker proxy (or global executor) The execute callback also registers a local_init that creates a per-worker proxy thread for non-Ruby workers, matching the scalar function approach. With table functions now thread-safe, the check_threads guard and its threads=1 requirement are removed from connection.rb. Refs: #1158 Co-Authored-By: Claude Opus 4.6 (1M context) --- ext/duckdb/table_function.c | 178 +++++++++++++++++++++++++++++------- lib/duckdb/connection.rb | 15 --- 2 files changed, 146 insertions(+), 47 deletions(-) diff --git a/ext/duckdb/table_function.c b/ext/duckdb/table_function.c index 5391481b..9c0f065d 100644 --- a/ext/duckdb/table_function.c +++ b/ext/duckdb/table_function.c @@ -1,5 +1,11 @@ #include "ruby-duckdb.h" +/* + * Thread detection functions (available since Ruby 2.3). + */ +extern int ruby_thread_has_gvl_p(void); +extern int ruby_native_thread_p(void); + VALUE cDuckDBTableFunction; extern VALUE cDuckDBTableFunctionBindInfo; extern VALUE cDuckDBTableFunctionInitInfo; @@ -21,6 +27,7 @@ static VALUE rbduckdb_table_function_set_init(VALUE self); static void table_function_init_callback(duckdb_init_info info); static VALUE rbduckdb_table_function_set_execute(VALUE self); static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output); +static void table_function_local_init_callback(duckdb_init_info info); static const rb_data_type_t table_function_data_type = { "DuckDB/TableFunction", @@ -217,32 +224,55 @@ static VALUE call_bind_proc(VALUE arg) { return rb_funcall(args[0], rb_intern("call"), 1, args[1]); } -static void table_function_bind_callback(duckdb_bind_info info) { +/* Bind callback core logic — must be called with GVL held */ +struct table_bind_arg { + duckdb_bind_info info; rubyDuckDBTableFunction *ctx; +}; + +static void table_bind_with_gvl(void *data) { + struct table_bind_arg *arg = (struct table_bind_arg *)data; rubyDuckDBBindInfo *bind_info_ctx; VALUE bind_info_obj; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info); - if (!ctx || ctx->bind_proc == Qnil) { + if (!arg->ctx || arg->ctx->bind_proc == Qnil) { return; } - // Create BindInfo wrapper bind_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionBindInfo); bind_info_ctx = get_struct_bind_info(bind_info_obj); - bind_info_ctx->bind_info = info; + bind_info_ctx->bind_info = arg->info; - // Call Ruby block with exception protection - VALUE call_args[2] = { ctx->bind_proc, bind_info_obj }; + VALUE call_args[2] = { arg->ctx->bind_proc, bind_info_obj }; rb_protect(call_bind_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_bind_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_bind_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_bind_gvl_wrapper(void *data) { + table_bind_with_gvl(data); + return NULL; +} + +static void table_function_bind_callback(duckdb_bind_info info) { + struct table_bind_arg arg; + arg.info = info; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_bind_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_bind_gvl_wrapper, &arg); + } + } else { + rbduckdb_executor_dispatch(table_bind_with_gvl, &arg); } } @@ -281,32 +311,55 @@ static VALUE call_init_proc(VALUE args_val) { return rb_funcall(args[0], rb_intern("call"), 1, args[1]); } -static void table_function_init_callback(duckdb_init_info info) { +/* Init callback core logic — must be called with GVL held */ +struct table_init_arg { + duckdb_init_info info; rubyDuckDBTableFunction *ctx; +}; + +static void table_init_with_gvl(void *data) { + struct table_init_arg *arg = (struct table_init_arg *)data; VALUE init_info_obj; rubyDuckDBInitInfo *init_info_ctx; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info); - if (!ctx || ctx->init_proc == Qnil) { + if (!arg->ctx || arg->ctx->init_proc == Qnil) { return; } - // Create InitInfo wrapper init_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionInitInfo); init_info_ctx = get_struct_init_info(init_info_obj); - init_info_ctx->info = info; + init_info_ctx->info = arg->info; - // Call Ruby block with exception protection - VALUE call_args[2] = { ctx->init_proc, init_info_obj }; + VALUE call_args[2] = { arg->ctx->init_proc, init_info_obj }; rb_protect(call_init_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_init_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_init_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_init_gvl_wrapper(void *data) { + table_init_with_gvl(data); + return NULL; +} + +static void table_function_init_callback(duckdb_init_info info) { + struct table_init_arg arg; + arg.info = info; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_init_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_init_gvl_wrapper, &arg); + } + } else { + rbduckdb_executor_dispatch(table_init_with_gvl, &arg); } } @@ -334,6 +387,10 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) { ctx->execute_proc = rb_block_proc(); duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback); + duckdb_table_function_set_local_init(ctx->table_function, table_function_local_init_callback); + + /* Ensure the global executor thread is running for multi-thread dispatch */ + rbduckdb_executor_ensure_started(); return self; } @@ -343,39 +400,96 @@ static VALUE call_execute_proc(VALUE args_val) { return rb_funcall(args[0], rb_intern("call"), 2, args[1], args[2]); } -static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { +/* Execute callback core logic — must be called with GVL held */ +struct table_execute_arg { + duckdb_function_info info; + duckdb_data_chunk output; rubyDuckDBTableFunction *ctx; +}; + +static void table_execute_with_gvl(void *data) { + struct table_execute_arg *arg = (struct table_execute_arg *)data; VALUE func_info_obj; VALUE data_chunk_obj; rubyDuckDBFunctionInfo *func_info_ctx; rubyDuckDBDataChunk *data_chunk_ctx; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); - if (!ctx || ctx->execute_proc == Qnil) { + if (!arg->ctx || arg->ctx->execute_proc == Qnil) { return; } - // Create FunctionInfo wrapper func_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionFunctionInfo); func_info_ctx = get_struct_function_info(func_info_obj); - func_info_ctx->info = info; + func_info_ctx->info = arg->info; - // Create DataChunk wrapper data_chunk_obj = rb_class_new_instance(0, NULL, cDuckDBDataChunk); data_chunk_ctx = get_struct_data_chunk(data_chunk_obj); - data_chunk_ctx->data_chunk = output; + data_chunk_ctx->data_chunk = arg->output; - // Call Ruby block with exception protection - VALUE call_args[3] = { ctx->execute_proc, func_info_obj, data_chunk_obj }; + VALUE call_args[3] = { arg->ctx->execute_proc, func_info_obj, data_chunk_obj }; rb_protect(call_execute_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_function_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_function_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_execute_gvl_wrapper(void *data) { + table_execute_with_gvl(data); + return NULL; +} + +static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { + struct table_execute_arg arg; + arg.info = info; + arg.output = output; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_execute_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_execute_gvl_wrapper, &arg); + } + } else { + /* Non-Ruby thread — use per-worker proxy if available */ + struct worker_proxy *proxy = (struct worker_proxy *)duckdb_function_get_local_init_data(info); + if (proxy) { + rbduckdb_worker_proxy_dispatch(proxy, table_execute_with_gvl, &arg); + } else { + rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); + } + } +} + +/* + * local_init callback for table functions. + * Creates a per-worker proxy for non-Ruby threads. + */ +struct table_proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void table_create_proxy_callback(void *data) { + struct table_proxy_create_arg *arg = (struct table_proxy_create_arg *)data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void table_function_local_init_callback(duckdb_init_info info) { + if (ruby_native_thread_p()) { + return; + } + + struct table_proxy_create_arg arg; + arg.proxy = NULL; + rbduckdb_executor_dispatch(table_create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_init_set_init_data(info, arg.proxy, rbduckdb_worker_proxy_destroy); } } diff --git a/lib/duckdb/connection.rb b/lib/duckdb/connection.rb index a29f2bc0..acef4385 100644 --- a/lib/duckdb/connection.rb +++ b/lib/duckdb/connection.rb @@ -221,7 +221,6 @@ def register_scalar_function_set(scalar_function_set) def register_table_function(table_function) raise ArgumentError, 'table_function must be a TableFunction' unless table_function.is_a?(TableFunction) - check_threads _register_table_function(table_function) end @@ -236,12 +235,10 @@ def register_table_function(table_function) # @param columns [Hash{String => DuckDB::LogicalType}, nil] optional column schema override; # if omitted, the adapter determines the columns (e.g. from headers or inference) # @raise [ArgumentError] if no adapter is registered for the object's class - # @raise [DuckDB::Error] if threads setting is not 1 # @return [void] # # @example Expose a CSV as a table # require 'csv' - # con.execute('SET threads=1') # DuckDB::TableFunction.add_table_adapter(CSV, CSVTableAdapter.new) # csv = CSV.new(File.read('data.csv'), headers: true) # con.expose_as_table(csv, 'csv_table') @@ -263,18 +260,6 @@ def expose_as_table(object, name, columns: nil) private - def check_threads - result = execute("SELECT current_setting('threads')") - thread_count = result.first.first.to_i - - return unless thread_count > 1 - - raise DuckDB::Error, - 'Functions with Ruby callbacks require single-threaded execution. ' \ - "Current threads setting: #{thread_count}. " \ - "Execute 'SET threads=1' before registering functions." - end - def run_appender_block(appender, &) return appender unless block_given? From 8bdecb7ce2a383b6b56e1c1250875b6f4575e42c Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 17:18:53 +0900 Subject: [PATCH 3/8] test: remove threads=1 restriction from UDF tests and add multithread table function test With scalar and table function callbacks now thread-safe via per-worker proxy threads, the SET threads=1 workaround is no longer needed in UDF-related tests. - Remove SET threads=1 from table_function_test.rb, data_chunk_test.rb, gc_stress_test.rb, table_function_csv_test.rb, and table_function_integration_test.rb - Add test_table_function_with_multithread that runs a table function with threads=4 to verify thread safety Non-UDF tests (query_progress, pending_result) retain their SET threads=1 as those are unrelated to callback thread safety. Refs: #1158 Co-Authored-By: Claude Opus 4.6 (1M context) --- test/duckdb_test/data_chunk_test.rb | 20 --------- test/duckdb_test/gc_stress_test.rb | 2 - test/duckdb_test/table_function_csv_test.rb | 1 - .../table_function_integration_test.rb | 1 - test/duckdb_test/table_function_test.rb | 41 +++++++++++++++++-- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/test/duckdb_test/data_chunk_test.rb b/test/duckdb_test/data_chunk_test.rb index 026a0bdd..ddcb94ae 100644 --- a/test/duckdb_test/data_chunk_test.rb +++ b/test/duckdb_test/data_chunk_test.rb @@ -56,8 +56,6 @@ def test_data_chunk_get_vector # Test 4: Vector#logical_type returns LogicalType def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions - @conn.execute('SET threads=1') - table_function = DuckDB::TableFunction.new table_function.name = 'test_vector_type' @@ -98,8 +96,6 @@ def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLe # Test 5: DataChunk#set_value with INTEGER def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_int' @@ -133,8 +129,6 @@ def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics # Test 6: DataChunk#set_value with BIGINT def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_bigint' @@ -165,8 +159,6 @@ def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/ # Test 7: DataChunk#set_value with VARCHAR def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_varchar' @@ -199,8 +191,6 @@ def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics # Test 8: DataChunk#set_value with DOUBLE def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_double' @@ -233,8 +223,6 @@ def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/ # Test 9: DataChunk#set_value with NULL def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_null' @@ -269,8 +257,6 @@ def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/Me # Test 10: DataChunk#set_value with BLOB def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_blob' @@ -306,8 +292,6 @@ def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/Me # Test 11: DataChunk#set_value with multiple columns # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions def test_data_chunk_set_value_multiple_columns - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_multi' @@ -361,8 +345,6 @@ def test_data_chunk_set_value_multiple_columns # Test 12: DataChunk#set_value with TIMESTAMP def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_timestamp' @@ -398,8 +380,6 @@ def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metri # Test 13: DataChunk#set_value with TIMESTAMP_TZ def test_data_chunk_set_value_timestamp_tz # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions - @conn.execute('SET threads=1') - done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_timestamp_tz' diff --git a/test/duckdb_test/gc_stress_test.rb b/test/duckdb_test/gc_stress_test.rb index 00013716..2f711b6a 100644 --- a/test/duckdb_test/gc_stress_test.rb +++ b/test/duckdb_test/gc_stress_test.rb @@ -83,7 +83,6 @@ def test_table_function_with_gc_compaction skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? - @con.execute('SET threads=1') # Table functions still require single-threaded execution # Capture local variables multiplier = 3 @@ -136,7 +135,6 @@ def test_mixed_functions_gc_stress skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? - @con.execute('SET threads=1') # Table functions still require single-threaded execution # Register both scalar and table functions @con.register_scalar_function(DuckDB::ScalarFunction.new.tap do |sf| diff --git a/test/duckdb_test/table_function_csv_test.rb b/test/duckdb_test/table_function_csv_test.rb index a93ece42..a1defe5b 100644 --- a/test/duckdb_test/table_function_csv_test.rb +++ b/test/duckdb_test/table_function_csv_test.rb @@ -50,7 +50,6 @@ def csv_to_duckdb_data(csv, output) def setup @db = DuckDB::Database.open @con = @db.connect - @con.execute('SET threads=1') # Required for Ruby callbacks end def teardown diff --git a/test/duckdb_test/table_function_integration_test.rb b/test/duckdb_test/table_function_integration_test.rb index 88eb0825..8f111287 100644 --- a/test/duckdb_test/table_function_integration_test.rb +++ b/test/duckdb_test/table_function_integration_test.rb @@ -7,7 +7,6 @@ class TableFunctionIntegrationTest < Minitest::Test def setup @database = DuckDB::Database.open @connection = @database.connect - @connection.execute('SET threads=1') # Required for Ruby callbacks end def teardown diff --git a/test/duckdb_test/table_function_test.rb b/test/duckdb_test/table_function_test.rb index 249c0fa2..dd8b05c6 100644 --- a/test/duckdb_test/table_function_test.rb +++ b/test/duckdb_test/table_function_test.rb @@ -16,7 +16,6 @@ def test_new def test_create_with_set_value db = DuckDB::Database.open conn = db.connect - conn.query('SET threads=1') called = 0 @@ -92,7 +91,6 @@ def test_gc_compaction_safety db = DuckDB::Database.open conn = db.connect - conn.query('SET threads=1') # Capture local variable in callbacks row_multiplier = 2 @@ -158,7 +156,6 @@ def test_gc_compaction_safety def test_symbol_columns db = DuckDB::Database.open conn = db.connect - conn.query('SET threads=1') # Capture local variable in callbacks row_multiplier = 2 @@ -206,6 +203,44 @@ def test_symbol_columns end # rubocop:enable Metrics/AbcSize, Metrics/MethodLength + # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + def test_table_function_with_multithread + db = DuckDB::Database.open + conn = db.connect + conn.execute('SET threads=4') + + row_count = 100 + called = 0 + tf = DuckDB::TableFunction.new + tf.name = 'mt_generate' + + tf.bind do |bind_info| + bind_info.add_result_column('value', DuckDB::LogicalType::BIGINT) + end + + tf.init { |_init_info| } + + tf.execute do |_func_info, output| + called += 1 + if called > 1 + output.size = 0 + else + row_count.times { |i| output.set_value(0, i, i * 2) } + output.size = row_count + end + end + + conn.register_table_function(tf) + result = conn.execute('SELECT SUM(value) FROM mt_generate()') + expected_sum = (0...row_count).sum { |i| i * 2 } + + assert_equal expected_sum, result.first.first + + conn.disconnect + db.close + end + # rubocop:enable Metrics/AbcSize, Metrics/MethodLength + private def setup_incomplete_function From ad85871fb00549060180c147f98b385270024f53 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 17:26:14 +0900 Subject: [PATCH 4/8] fix: version-gate per-worker proxy behind DuckDB >= 1.5.0 duckdb_scalar_function_set_init, duckdb_scalar_function_get_state, and duckdb_table_function_set_local_init were introduced in DuckDB 1.5.0. CI tests against DuckDB 1.4.4, which lacks these symbols. Wrap per-worker proxy creation and dispatch behind HAVE_DUCKDB_H_GE_V1_5_0 (#ifdef). On DuckDB < 1.5.0, the global executor thread handles all non-Ruby thread callbacks (the previous behavior). The three-path dispatch (Ruby+GVL / Ruby-GVL / non-Ruby) remains active on all versions. Refs: #1158 Co-Authored-By: Claude Opus 4.6 (1M context) --- ext/duckdb/scalar_function.c | 14 ++++++++++++-- ext/duckdb/table_function.c | 14 +++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/ext/duckdb/scalar_function.c b/ext/duckdb/scalar_function.c index e9386610..1e5ddc4c 100644 --- a/ext/duckdb/scalar_function.c +++ b/ext/duckdb/scalar_function.c @@ -198,12 +198,15 @@ static VALUE rbduckdb_scalar_function_add_parameter(VALUE self, VALUE logical_ty return self; } +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 /* * init_local_state callback for scalar functions. * * Called by DuckDB once per worker thread before the first UDF invocation. * For non-Ruby threads, creates a per-worker proxy thread so that callbacks * can be dispatched without going through the global executor bottleneck. + * + * Requires DuckDB >= 1.5.0 (duckdb_scalar_function_set_init). */ struct proxy_create_arg { struct worker_proxy *proxy; @@ -229,6 +232,7 @@ static void scalar_function_init_local_state(duckdb_init_info info) { duckdb_scalar_function_init_set_state(info, arg.proxy, rbduckdb_worker_proxy_destroy); } } +#endif /* * The DuckDB callback entry point. @@ -278,14 +282,18 @@ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chun rb_thread_call_with_gvl(callback_with_gvl, &arg); } } else { - /* Case 3: Non-Ruby thread - use per-worker proxy if available */ + /* Case 3: Non-Ruby thread */ +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Use per-worker proxy if available (DuckDB >= 1.5.0) */ struct worker_proxy *proxy = (struct worker_proxy *)duckdb_scalar_function_get_state(info); if (proxy) { rbduckdb_worker_proxy_dispatch(proxy, scalar_execute_via_executor, &arg); } else { - /* Fallback to global executor */ rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); } +#else + rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); +#endif } } @@ -510,7 +518,9 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_extra_info(p->scalar_function, p, NULL); duckdb_scalar_function_set_function(p->scalar_function, scalar_function_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 duckdb_scalar_function_set_init(p->scalar_function, scalar_function_init_local_state); +#endif /* * Mark as volatile to prevent constant folding during query optimization. diff --git a/ext/duckdb/table_function.c b/ext/duckdb/table_function.c index 9c0f065d..32ceaed8 100644 --- a/ext/duckdb/table_function.c +++ b/ext/duckdb/table_function.c @@ -27,7 +27,9 @@ static VALUE rbduckdb_table_function_set_init(VALUE self); static void table_function_init_callback(duckdb_init_info info); static VALUE rbduckdb_table_function_set_execute(VALUE self); static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 static void table_function_local_init_callback(duckdb_init_info info); +#endif static const rb_data_type_t table_function_data_type = { "DuckDB/TableFunction", @@ -387,7 +389,9 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) { ctx->execute_proc = rb_block_proc(); duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 duckdb_table_function_set_local_init(ctx->table_function, table_function_local_init_callback); +#endif /* Ensure the global executor thread is running for multi-thread dispatch */ rbduckdb_executor_ensure_started(); @@ -456,19 +460,26 @@ static void table_function_execute_callback(duckdb_function_info info, duckdb_da rb_thread_call_with_gvl(table_execute_gvl_wrapper, &arg); } } else { - /* Non-Ruby thread — use per-worker proxy if available */ + /* Non-Ruby thread */ +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Use per-worker proxy if available (DuckDB >= 1.5.0) */ struct worker_proxy *proxy = (struct worker_proxy *)duckdb_function_get_local_init_data(info); if (proxy) { rbduckdb_worker_proxy_dispatch(proxy, table_execute_with_gvl, &arg); } else { rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); } +#else + rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); +#endif } } +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 /* * local_init callback for table functions. * Creates a per-worker proxy for non-Ruby threads. + * Requires DuckDB >= 1.5.0 (duckdb_table_function_set_local_init). */ struct table_proxy_create_arg { struct worker_proxy *proxy; @@ -492,6 +503,7 @@ static void table_function_local_init_callback(duckdb_init_info info) { duckdb_init_set_init_data(info, arg.proxy, rbduckdb_worker_proxy_destroy); } } +#endif rubyDuckDBTableFunction *get_struct_table_function(VALUE self) { rubyDuckDBTableFunction *ctx; From 76e8d1ccb08441179984f3edd413de0ea3c247ea Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 17:34:13 +0900 Subject: [PATCH 5/8] fix: preserve backward compatibility for DuckDB < 1.5.0 Per-worker proxy threads require DuckDB >= 1.5.0 APIs (duckdb_scalar_function_set_init, duckdb_scalar_function_get_state, duckdb_table_function_set_local_init). On older versions, these symbols are unavailable and the global executor is the only dispatch path for non-Ruby thread callbacks. Restore backward-compatible behavior: - connection.rb: Restore check_threads for DuckDB < 1.5.0. On >= 1.5.0 the check is skipped (per-worker proxy handles thread safety). On < 1.5.0 the threads=1 restriction remains to prevent the global executor deadlock. - Tests: Keep SET threads=1 in all table function tests for compatibility with DuckDB 1.4.x. Version-gate the new multithread tests behind DuckDB >= 1.5.0. Refs: #1158 Co-Authored-By: Claude Opus 4.6 (1M context) --- lib/duckdb/connection.rb | 20 +++++++++++++++++++ test/duckdb_test/data_chunk_test.rb | 20 +++++++++++++++++++ test/duckdb_test/gc_stress_test.rb | 2 ++ test/duckdb_test/scalar_function_test.rb | 2 ++ test/duckdb_test/table_function_csv_test.rb | 1 + .../table_function_integration_test.rb | 1 + test/duckdb_test/table_function_test.rb | 5 +++++ 7 files changed, 51 insertions(+) diff --git a/lib/duckdb/connection.rb b/lib/duckdb/connection.rb index acef4385..cd8b135a 100644 --- a/lib/duckdb/connection.rb +++ b/lib/duckdb/connection.rb @@ -221,6 +221,7 @@ def register_scalar_function_set(scalar_function_set) def register_table_function(table_function) raise ArgumentError, 'table_function must be a TableFunction' unless table_function.is_a?(TableFunction) + check_threads _register_table_function(table_function) end @@ -235,6 +236,7 @@ def register_table_function(table_function) # @param columns [Hash{String => DuckDB::LogicalType}, nil] optional column schema override; # if omitted, the adapter determines the columns (e.g. from headers or inference) # @raise [ArgumentError] if no adapter is registered for the object's class + # @raise [DuckDB::Error] if threads > 1 on DuckDB < 1.5.0 # @return [void] # # @example Expose a CSV as a table @@ -260,6 +262,24 @@ def expose_as_table(object, name, columns: nil) private + # DuckDB >= 1.5.0 provides per-worker proxy threads via init_local_state, + # making table function callbacks thread-safe with multiple DuckDB threads. + # On older versions, the global executor serializes all callbacks and can + # deadlock under concurrent workloads, so we enforce threads=1. + def check_threads + return if Gem::Version.new(LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + + result = execute("SELECT current_setting('threads')") + thread_count = result.first.first.to_i + + return unless thread_count > 1 + + raise DuckDB::Error, + 'Table functions with Ruby callbacks require single-threaded execution ' \ + "on DuckDB < 1.5.0. Current threads setting: #{thread_count}. " \ + "Execute 'SET threads=1' before registering table functions." + end + def run_appender_block(appender, &) return appender unless block_given? diff --git a/test/duckdb_test/data_chunk_test.rb b/test/duckdb_test/data_chunk_test.rb index ddcb94ae..026a0bdd 100644 --- a/test/duckdb_test/data_chunk_test.rb +++ b/test/duckdb_test/data_chunk_test.rb @@ -56,6 +56,8 @@ def test_data_chunk_get_vector # Test 4: Vector#logical_type returns LogicalType def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions + @conn.execute('SET threads=1') + table_function = DuckDB::TableFunction.new table_function.name = 'test_vector_type' @@ -96,6 +98,8 @@ def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLe # Test 5: DataChunk#set_value with INTEGER def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_int' @@ -129,6 +133,8 @@ def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics # Test 6: DataChunk#set_value with BIGINT def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_bigint' @@ -159,6 +165,8 @@ def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/ # Test 7: DataChunk#set_value with VARCHAR def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_varchar' @@ -191,6 +199,8 @@ def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics # Test 8: DataChunk#set_value with DOUBLE def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_double' @@ -223,6 +233,8 @@ def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/ # Test 9: DataChunk#set_value with NULL def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_null' @@ -257,6 +269,8 @@ def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/Me # Test 10: DataChunk#set_value with BLOB def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_blob' @@ -292,6 +306,8 @@ def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/Me # Test 11: DataChunk#set_value with multiple columns # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions def test_data_chunk_set_value_multiple_columns + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_multi' @@ -345,6 +361,8 @@ def test_data_chunk_set_value_multiple_columns # Test 12: DataChunk#set_value with TIMESTAMP def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_timestamp' @@ -380,6 +398,8 @@ def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metri # Test 13: DataChunk#set_value with TIMESTAMP_TZ def test_data_chunk_set_value_timestamp_tz # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions + @conn.execute('SET threads=1') + done = false table_function = DuckDB::TableFunction.new table_function.name = 'test_set_value_timestamp_tz' diff --git a/test/duckdb_test/gc_stress_test.rb b/test/duckdb_test/gc_stress_test.rb index 2f711b6a..4d11bb47 100644 --- a/test/duckdb_test/gc_stress_test.rb +++ b/test/duckdb_test/gc_stress_test.rb @@ -83,6 +83,7 @@ def test_table_function_with_gc_compaction skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? + @con.execute('SET threads=1') # Capture local variables multiplier = 3 @@ -135,6 +136,7 @@ def test_mixed_functions_gc_stress skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? + @con.execute('SET threads=1') # Register both scalar and table functions @con.register_scalar_function(DuckDB::ScalarFunction.new.tap do |sf| diff --git a/test/duckdb_test/scalar_function_test.rb b/test/duckdb_test/scalar_function_test.rb index 96c78cab..8578c1c5 100644 --- a/test/duckdb_test/scalar_function_test.rb +++ b/test/duckdb_test/scalar_function_test.rb @@ -675,6 +675,8 @@ def test_create_with_different_types # rubocop:disable Metrics/MethodLength end def test_scalar_function_with_multithread + skip 'per-worker proxy requires duckdb >= 1.5.0' if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') + @con.execute('SET threads=4') @con.execute('CREATE TABLE large_test AS SELECT range::INTEGER AS value FROM range(10000)') diff --git a/test/duckdb_test/table_function_csv_test.rb b/test/duckdb_test/table_function_csv_test.rb index a1defe5b..bd4f2dd8 100644 --- a/test/duckdb_test/table_function_csv_test.rb +++ b/test/duckdb_test/table_function_csv_test.rb @@ -50,6 +50,7 @@ def csv_to_duckdb_data(csv, output) def setup @db = DuckDB::Database.open @con = @db.connect + @con.execute('SET threads=1') end def teardown diff --git a/test/duckdb_test/table_function_integration_test.rb b/test/duckdb_test/table_function_integration_test.rb index 8f111287..619a6f83 100644 --- a/test/duckdb_test/table_function_integration_test.rb +++ b/test/duckdb_test/table_function_integration_test.rb @@ -7,6 +7,7 @@ class TableFunctionIntegrationTest < Minitest::Test def setup @database = DuckDB::Database.open @connection = @database.connect + @connection.execute('SET threads=1') end def teardown diff --git a/test/duckdb_test/table_function_test.rb b/test/duckdb_test/table_function_test.rb index dd8b05c6..7423d2a0 100644 --- a/test/duckdb_test/table_function_test.rb +++ b/test/duckdb_test/table_function_test.rb @@ -16,6 +16,7 @@ def test_new def test_create_with_set_value db = DuckDB::Database.open conn = db.connect + conn.query('SET threads=1') called = 0 @@ -91,6 +92,7 @@ def test_gc_compaction_safety db = DuckDB::Database.open conn = db.connect + conn.query('SET threads=1') # Capture local variable in callbacks row_multiplier = 2 @@ -156,6 +158,7 @@ def test_gc_compaction_safety def test_symbol_columns db = DuckDB::Database.open conn = db.connect + conn.query('SET threads=1') # Capture local variable in callbacks row_multiplier = 2 @@ -205,6 +208,8 @@ def test_symbol_columns # rubocop:disable Metrics/AbcSize, Metrics/MethodLength def test_table_function_with_multithread + skip 'per-worker proxy requires duckdb >= 1.5.0' if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') + db = DuckDB::Database.open conn = db.connect conn.execute('SET threads=4') From 50b14776d06b72b717d744d418970d2af6eaed1d Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 17:37:04 +0900 Subject: [PATCH 6/8] style: fix RuboCop offenses in multithread UDF tests - Break long skip lines into unless/skip form - Add rubocop:disable for MethodLength on scalar multithread test - Replace empty block with nil body in table function init Co-Authored-By: Claude Opus 4.6 (1M context) --- test/duckdb_test/scalar_function_test.rb | 6 ++++-- test/duckdb_test/table_function_test.rb | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/test/duckdb_test/scalar_function_test.rb b/test/duckdb_test/scalar_function_test.rb index 8578c1c5..2fc697a3 100644 --- a/test/duckdb_test/scalar_function_test.rb +++ b/test/duckdb_test/scalar_function_test.rb @@ -674,8 +674,10 @@ def test_create_with_different_types # rubocop:disable Metrics/MethodLength assert_equal 'Hello - World', rows[0][0] end - def test_scalar_function_with_multithread - skip 'per-worker proxy requires duckdb >= 1.5.0' if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') + def test_scalar_function_with_multithread # rubocop:disable Metrics/MethodLength + unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + skip 'per-worker proxy requires duckdb >= 1.5.0' + end @con.execute('SET threads=4') @con.execute('CREATE TABLE large_test AS SELECT range::INTEGER AS value FROM range(10000)') diff --git a/test/duckdb_test/table_function_test.rb b/test/duckdb_test/table_function_test.rb index 7423d2a0..16399986 100644 --- a/test/duckdb_test/table_function_test.rb +++ b/test/duckdb_test/table_function_test.rb @@ -208,7 +208,9 @@ def test_symbol_columns # rubocop:disable Metrics/AbcSize, Metrics/MethodLength def test_table_function_with_multithread - skip 'per-worker proxy requires duckdb >= 1.5.0' if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') + unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + skip 'per-worker proxy requires duckdb >= 1.5.0' + end db = DuckDB::Database.open conn = db.connect @@ -223,7 +225,7 @@ def test_table_function_with_multithread bind_info.add_result_column('value', DuckDB::LogicalType::BIGINT) end - tf.init { |_init_info| } + tf.init { |_init_info| nil } tf.execute do |_func_info, output| called += 1 From 8ad909cabd4ebef915173bb999da09187e568307 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 18:41:38 +0900 Subject: [PATCH 7/8] fix: free per-worker proxy struct and OS primitives on destroy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, rbduckdb_worker_proxy_destroy only signaled the proxy thread to stop but never waited for it to exit or freed the struct. This leaked the worker_proxy struct, its mutex, and condvars for every DuckDB worker thread on each UDF query — unbounded growth in long-lived processes. Add an exit_cond condvar so destroy can block until the proxy thread has fully exited, then safely destroy all OS primitives and free the struct. Switch from xcalloc/xfree to calloc/free since destroy runs on a non-Ruby thread where xfree is unsafe. Co-Authored-By: Claude Opus 4.6 (1M context) --- ext/duckdb/executor.c | 69 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/ext/duckdb/executor.c b/ext/duckdb/executor.c index 3bc543bd..5f7738f8 100644 --- a/ext/duckdb/executor.c +++ b/ext/duckdb/executor.c @@ -246,14 +246,17 @@ struct worker_proxy { void *data; volatile int has_request; volatile int done; + volatile int exited; #ifdef _MSC_VER CRITICAL_SECTION lock; CONDITION_VARIABLE request_cond; CONDITION_VARIABLE done_cond; + CONDITION_VARIABLE exit_cond; #else pthread_mutex_t lock; pthread_cond_t request_cond; pthread_cond_t done_cond; + pthread_cond_t exit_cond; #endif }; @@ -331,6 +334,23 @@ static VALUE proxy_thread_func(void *data) { rb_ary_delete(g_proxy_threads, proxy->ruby_thread); } + /* + * Signal that this thread has finished and no longer references + * the proxy struct. After this signal, rbduckdb_worker_proxy_destroy + * may free the struct. + */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->exited = 1; + WakeConditionVariable(&proxy->exit_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->exited = 1; + pthread_cond_signal(&proxy->exit_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + return Qnil; } @@ -339,20 +359,30 @@ static VALUE proxy_thread_func(void *data) { * Must be called with GVL held (e.g., from the global executor callback). */ struct worker_proxy *rbduckdb_worker_proxy_create(void) { - struct worker_proxy *proxy = xcalloc(1, sizeof(struct worker_proxy)); + /* + * Use calloc (not xcalloc) because rbduckdb_worker_proxy_destroy + * frees the struct from a non-Ruby thread where xfree is unsafe. + */ + struct worker_proxy *proxy = calloc(1, sizeof(struct worker_proxy)); + if (proxy == NULL) { + rb_raise(rb_eNoMemError, "failed to allocate worker_proxy"); + } proxy->stop = 0; proxy->has_request = 0; proxy->done = 0; + proxy->exited = 0; #ifdef _MSC_VER InitializeCriticalSection(&proxy->lock); InitializeConditionVariable(&proxy->request_cond); InitializeConditionVariable(&proxy->done_cond); + InitializeConditionVariable(&proxy->exit_cond); #else pthread_mutex_init(&proxy->lock, NULL); pthread_cond_init(&proxy->request_cond, NULL); pthread_cond_init(&proxy->done_cond, NULL); + pthread_cond_init(&proxy->exit_cond, NULL); #endif proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy); @@ -409,34 +439,49 @@ void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, * Destroy a per-worker proxy. * Compatible with duckdb_delete_callback_t: void (*)(void *). * Safe to call from non-Ruby threads — uses only OS primitives. + * + * Blocks until the proxy thread has exited and no longer references the + * struct, then destroys OS synchronization primitives and frees memory. */ void rbduckdb_worker_proxy_destroy(void *data) { struct worker_proxy *proxy = (struct worker_proxy *)data; if (proxy == NULL) return; - /* Signal the proxy thread to stop (OS primitives only, no Ruby API) */ + /* Signal the proxy thread to stop */ #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); proxy->stop = 1; WakeConditionVariable(&proxy->request_cond); LeaveCriticalSection(&proxy->lock); + + /* Wait for the proxy thread to finish */ + EnterCriticalSection(&proxy->lock); + while (!proxy->exited) { + SleepConditionVariableCS(&proxy->exit_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); + + DeleteCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); proxy->stop = 1; pthread_cond_signal(&proxy->request_cond); pthread_mutex_unlock(&proxy->lock); + + /* Wait for the proxy thread to finish */ + pthread_mutex_lock(&proxy->lock); + while (!proxy->exited) { + pthread_cond_wait(&proxy->exit_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); + + pthread_cond_destroy(&proxy->exit_cond); + pthread_cond_destroy(&proxy->done_cond); + pthread_cond_destroy(&proxy->request_cond); + pthread_mutex_destroy(&proxy->lock); #endif - /* - * The proxy thread will exit its loop, remove itself from the - * GC protection array, and the Ruby thread object will be collected. - * We do NOT free the proxy struct here because the proxy thread may - * still be referencing it. The proxy struct is freed when the Ruby - * thread finishes and the GC collects it. - * - * For simplicity, we accept this small leak per query — the proxy - * struct is ~100 bytes and there are at most N (thread count) per query. - */ + free(proxy); } /* From f9ab7e18a07345c8de971c3b6cf62cf0177a775c Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 18:49:23 +0900 Subject: [PATCH 8/8] refactor: rename executor struct fields for clarity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename terse field names in executor_request and worker_proxy structs to be more descriptive, following Arrow C GLib naming conventions: - fn/data → callback_func/callback_data - done → request_done - done_lock/done_mutex/done_cond → request_done_lock/request_done_mutex/request_done_cond - stop → stop_requested - exited → thread_exited - exit_cond → thread_exit_cond Co-Authored-By: Claude Opus 4.6 (1M context) --- ext/duckdb/executor.c | 171 +++++++++++++++++++++--------------------- ext/duckdb/executor.h | 5 +- 2 files changed, 89 insertions(+), 87 deletions(-) diff --git a/ext/duckdb/executor.c b/ext/duckdb/executor.c index 5f7738f8..b3dac283 100644 --- a/ext/duckdb/executor.c +++ b/ext/duckdb/executor.c @@ -34,15 +34,15 @@ extern int ruby_native_thread_p(void); /* Per-callback request, stack-allocated on the DuckDB worker thread */ struct executor_request { - rbduckdb_callback_fn fn; - void *data; - int done; + rbduckdb_callback_fn callback_func; + void *callback_data; + int request_done; #ifdef _MSC_VER - CRITICAL_SECTION done_lock; - CONDITION_VARIABLE done_cond; + CRITICAL_SECTION request_done_lock; + CONDITION_VARIABLE request_done_cond; #else - pthread_mutex_t done_mutex; - pthread_cond_t done_cond; + pthread_mutex_t request_done_mutex; + pthread_cond_t request_done_cond; #endif struct executor_request *next; }; @@ -130,19 +130,19 @@ static VALUE executor_thread_func(void *data) { struct executor_request *req = w.request; /* Execute the callback with the GVL */ - req->fn(req->data); + req->callback_func(req->callback_data); /* Signal the DuckDB worker thread that the callback is done */ #ifdef _MSC_VER - EnterCriticalSection(&req->done_lock); - req->done = 1; - WakeConditionVariable(&req->done_cond); - LeaveCriticalSection(&req->done_lock); + EnterCriticalSection(&req->request_done_lock); + req->request_done = 1; + WakeConditionVariable(&req->request_done_cond); + LeaveCriticalSection(&req->request_done_lock); #else - pthread_mutex_lock(&req->done_mutex); - req->done = 1; - pthread_cond_signal(&req->done_cond); - pthread_mutex_unlock(&req->done_mutex); + pthread_mutex_lock(&req->request_done_mutex); + req->request_done = 1; + pthread_cond_signal(&req->request_done_cond); + pthread_mutex_unlock(&req->request_done_mutex); #endif } } @@ -177,17 +177,17 @@ void rbduckdb_executor_ensure_started(void) { * Dispatch a callback to the global executor thread. * Called from a non-Ruby thread. Blocks until the callback completes. */ -void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data) { +void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data) { struct executor_request req; - req.fn = fn; - req.data = data; - req.done = 0; + req.callback_func = callback_func; + req.callback_data = callback_data; + req.request_done = 0; req.next = NULL; #ifdef _MSC_VER - InitializeCriticalSection(&req.done_lock); - InitializeConditionVariable(&req.done_cond); + InitializeCriticalSection(&req.request_done_lock); + InitializeConditionVariable(&req.request_done_cond); /* Enqueue the request */ EnterCriticalSection(&g_executor_lock); @@ -197,16 +197,16 @@ void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data) { LeaveCriticalSection(&g_executor_lock); /* Wait for the executor to process our callback */ - EnterCriticalSection(&req.done_lock); - while (!req.done) { - SleepConditionVariableCS(&req.done_cond, &req.done_lock, INFINITE); + EnterCriticalSection(&req.request_done_lock); + while (!req.request_done) { + SleepConditionVariableCS(&req.request_done_cond, &req.request_done_lock, INFINITE); } - LeaveCriticalSection(&req.done_lock); + LeaveCriticalSection(&req.request_done_lock); - DeleteCriticalSection(&req.done_lock); + DeleteCriticalSection(&req.request_done_lock); #else - pthread_mutex_init(&req.done_mutex, NULL); - pthread_cond_init(&req.done_cond, NULL); + pthread_mutex_init(&req.request_done_mutex, NULL); + pthread_cond_init(&req.request_done_cond, NULL); /* Enqueue the request */ pthread_mutex_lock(&g_executor_mutex); @@ -216,14 +216,14 @@ void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data) { pthread_mutex_unlock(&g_executor_mutex); /* Wait for the executor to process our callback */ - pthread_mutex_lock(&req.done_mutex); - while (!req.done) { - pthread_cond_wait(&req.done_cond, &req.done_mutex); + pthread_mutex_lock(&req.request_done_mutex); + while (!req.request_done) { + pthread_cond_wait(&req.request_done_cond, &req.request_done_mutex); } - pthread_mutex_unlock(&req.done_mutex); + pthread_mutex_unlock(&req.request_done_mutex); - pthread_cond_destroy(&req.done_cond); - pthread_mutex_destroy(&req.done_mutex); + pthread_cond_destroy(&req.request_done_cond); + pthread_mutex_destroy(&req.request_done_mutex); #endif } @@ -241,22 +241,22 @@ void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data) { struct worker_proxy { VALUE ruby_thread; - volatile int stop; - rbduckdb_callback_fn fn; - void *data; + volatile int stop_requested; + rbduckdb_callback_fn callback_func; + void *callback_data; volatile int has_request; - volatile int done; - volatile int exited; + volatile int request_done; + volatile int thread_exited; #ifdef _MSC_VER CRITICAL_SECTION lock; CONDITION_VARIABLE request_cond; - CONDITION_VARIABLE done_cond; - CONDITION_VARIABLE exit_cond; + CONDITION_VARIABLE request_done_cond; + CONDITION_VARIABLE thread_exit_cond; #else pthread_mutex_t lock; pthread_cond_t request_cond; - pthread_cond_t done_cond; - pthread_cond_t exit_cond; + pthread_cond_t request_done_cond; + pthread_cond_t thread_exit_cond; #endif }; @@ -266,13 +266,13 @@ static void *proxy_wait_func(void *data) { #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); - while (!proxy->stop && !proxy->has_request) { + while (!proxy->stop_requested && !proxy->has_request) { SleepConditionVariableCS(&proxy->request_cond, &proxy->lock, INFINITE); } LeaveCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); - while (!proxy->stop && !proxy->has_request) { + while (!proxy->stop_requested && !proxy->has_request) { pthread_cond_wait(&proxy->request_cond, &proxy->lock); } pthread_mutex_unlock(&proxy->lock); @@ -287,12 +287,12 @@ static void proxy_stop_func(void *data) { #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); - proxy->stop = 1; + proxy->stop_requested = 1; WakeConditionVariable(&proxy->request_cond); LeaveCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); - proxy->stop = 1; + proxy->stop_requested = 1; pthread_cond_signal(&proxy->request_cond); pthread_mutex_unlock(&proxy->lock); #endif @@ -302,28 +302,28 @@ static void proxy_stop_func(void *data) { static VALUE proxy_thread_func(void *data) { struct worker_proxy *proxy = (struct worker_proxy *)data; - while (!proxy->stop) { + while (!proxy->stop_requested) { /* Release GVL and wait for a request */ rb_thread_call_without_gvl(proxy_wait_func, proxy, proxy_stop_func, proxy); - if (proxy->stop) break; + if (proxy->stop_requested) break; if (proxy->has_request) { /* Execute the callback with the GVL held */ - proxy->fn(proxy->data); + proxy->callback_func(proxy->callback_data); /* Signal completion to the DuckDB worker thread */ #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); proxy->has_request = 0; - proxy->done = 1; - WakeConditionVariable(&proxy->done_cond); + proxy->request_done = 1; + WakeConditionVariable(&proxy->request_done_cond); LeaveCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); proxy->has_request = 0; - proxy->done = 1; - pthread_cond_signal(&proxy->done_cond); + proxy->request_done = 1; + pthread_cond_signal(&proxy->request_done_cond); pthread_mutex_unlock(&proxy->lock); #endif } @@ -341,13 +341,13 @@ static VALUE proxy_thread_func(void *data) { */ #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); - proxy->exited = 1; - WakeConditionVariable(&proxy->exit_cond); + proxy->thread_exited = 1; + WakeConditionVariable(&proxy->thread_exit_cond); LeaveCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); - proxy->exited = 1; - pthread_cond_signal(&proxy->exit_cond); + proxy->thread_exited = 1; + pthread_cond_signal(&proxy->thread_exit_cond); pthread_mutex_unlock(&proxy->lock); #endif @@ -368,21 +368,21 @@ struct worker_proxy *rbduckdb_worker_proxy_create(void) { rb_raise(rb_eNoMemError, "failed to allocate worker_proxy"); } - proxy->stop = 0; + proxy->stop_requested = 0; proxy->has_request = 0; - proxy->done = 0; - proxy->exited = 0; + proxy->request_done = 0; + proxy->thread_exited = 0; #ifdef _MSC_VER InitializeCriticalSection(&proxy->lock); InitializeConditionVariable(&proxy->request_cond); - InitializeConditionVariable(&proxy->done_cond); - InitializeConditionVariable(&proxy->exit_cond); + InitializeConditionVariable(&proxy->request_done_cond); + InitializeConditionVariable(&proxy->thread_exit_cond); #else pthread_mutex_init(&proxy->lock, NULL); pthread_cond_init(&proxy->request_cond, NULL); - pthread_cond_init(&proxy->done_cond, NULL); - pthread_cond_init(&proxy->exit_cond, NULL); + pthread_cond_init(&proxy->request_done_cond, NULL); + pthread_cond_init(&proxy->thread_exit_cond, NULL); #endif proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy); @@ -401,35 +401,36 @@ struct worker_proxy *rbduckdb_worker_proxy_create(void) { * Blocks until the proxy completes the callback. */ void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, - rbduckdb_callback_fn fn, void *data) { + rbduckdb_callback_fn callback_func, + void *callback_data) { #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); - proxy->fn = fn; - proxy->data = data; - proxy->done = 0; + proxy->callback_func = callback_func; + proxy->callback_data = callback_data; + proxy->request_done = 0; proxy->has_request = 1; WakeConditionVariable(&proxy->request_cond); LeaveCriticalSection(&proxy->lock); /* Wait for completion */ EnterCriticalSection(&proxy->lock); - while (!proxy->done) { - SleepConditionVariableCS(&proxy->done_cond, &proxy->lock, INFINITE); + while (!proxy->request_done) { + SleepConditionVariableCS(&proxy->request_done_cond, &proxy->lock, INFINITE); } LeaveCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); - proxy->fn = fn; - proxy->data = data; - proxy->done = 0; + proxy->callback_func = callback_func; + proxy->callback_data = callback_data; + proxy->request_done = 0; proxy->has_request = 1; pthread_cond_signal(&proxy->request_cond); pthread_mutex_unlock(&proxy->lock); /* Wait for completion */ pthread_mutex_lock(&proxy->lock); - while (!proxy->done) { - pthread_cond_wait(&proxy->done_cond, &proxy->lock); + while (!proxy->request_done) { + pthread_cond_wait(&proxy->request_done_cond, &proxy->lock); } pthread_mutex_unlock(&proxy->lock); #endif @@ -450,33 +451,33 @@ void rbduckdb_worker_proxy_destroy(void *data) { /* Signal the proxy thread to stop */ #ifdef _MSC_VER EnterCriticalSection(&proxy->lock); - proxy->stop = 1; + proxy->stop_requested = 1; WakeConditionVariable(&proxy->request_cond); LeaveCriticalSection(&proxy->lock); /* Wait for the proxy thread to finish */ EnterCriticalSection(&proxy->lock); - while (!proxy->exited) { - SleepConditionVariableCS(&proxy->exit_cond, &proxy->lock, INFINITE); + while (!proxy->thread_exited) { + SleepConditionVariableCS(&proxy->thread_exit_cond, &proxy->lock, INFINITE); } LeaveCriticalSection(&proxy->lock); DeleteCriticalSection(&proxy->lock); #else pthread_mutex_lock(&proxy->lock); - proxy->stop = 1; + proxy->stop_requested = 1; pthread_cond_signal(&proxy->request_cond); pthread_mutex_unlock(&proxy->lock); /* Wait for the proxy thread to finish */ pthread_mutex_lock(&proxy->lock); - while (!proxy->exited) { - pthread_cond_wait(&proxy->exit_cond, &proxy->lock); + while (!proxy->thread_exited) { + pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock); } pthread_mutex_unlock(&proxy->lock); - pthread_cond_destroy(&proxy->exit_cond); - pthread_cond_destroy(&proxy->done_cond); + pthread_cond_destroy(&proxy->thread_exit_cond); + pthread_cond_destroy(&proxy->request_done_cond); pthread_cond_destroy(&proxy->request_cond); pthread_mutex_destroy(&proxy->lock); #endif diff --git a/ext/duckdb/executor.h b/ext/duckdb/executor.h index 5c37fc89..50fa15d3 100644 --- a/ext/duckdb/executor.h +++ b/ext/duckdb/executor.h @@ -29,7 +29,7 @@ void rbduckdb_executor_ensure_started(void); * Dispatch a callback to the global executor thread. * Called from a non-Ruby thread. Blocks until the callback completes. */ -void rbduckdb_executor_dispatch(rbduckdb_callback_fn fn, void *data); +void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data); /* * Per-worker proxy thread. @@ -50,7 +50,8 @@ struct worker_proxy *rbduckdb_worker_proxy_create(void); * Blocks until the callback completes. */ void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, - rbduckdb_callback_fn fn, void *data); + rbduckdb_callback_fn callback_func, + void *callback_data); /* * Destroy a per-worker proxy.