From c2c76565afeeb8b79b7507145c0e72a614ea8a66 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 19 Feb 2026 16:05:38 -0500 Subject: [PATCH 1/3] feat: Separate runtimes for taskbroker components Create a separate tokio runtime for each of the consumer and gRPC server. The maintenance and upkeep threads will continue to use the default runtime. The number of threads allocated to each runtime is either specified by the TOKIO_WORKER_THREADS environment variable or defaults to the number of CPUs. --- src/main.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7970939d..e4e559e4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use taskbroker::kafka::inflight_activation_batcher::{ ActivationBatcherConfig, InflightActivationBatcher, }; use taskbroker::upkeep::upkeep; +use tokio::runtime::Runtime; use tokio::signal::unix::SignalKind; use tokio::task::JoinHandle; use tokio::{select, time}; @@ -151,8 +152,19 @@ async fn main() -> Result<(), Error> { } }); + let consumer_runtime_threads = std::env::var("TOKIO_WORKER_THREADS") + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0); + let mut consumer_builder = tokio::runtime::Builder::new_multi_thread(); + consumer_builder.thread_name("consumer-worker"); + if consumer_runtime_threads > 0 { + consumer_builder.worker_threads(consumer_runtime_threads); + } + let consumer_runtime = consumer_builder.build().unwrap(); + // Consumer from kafka - let consumer_task = tokio::spawn({ + let consumer_task = consumer_runtime.spawn({ let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); @@ -189,7 +201,18 @@ async fn main() -> Result<(), Error> { }); // GRPC server - let grpc_server_task = tokio::spawn({ + let server_runtime_threads = std::env::var("TOKIO_WORKER_THREADS") + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0); + let mut server_builder = tokio::runtime::Builder::new_multi_thread(); + server_builder.thread_name("server-worker"); + if server_runtime_threads > 0 { + server_builder.worker_threads(server_runtime_threads); + } + let server_runtime = server_builder.build().unwrap(); + + let grpc_server_task = server_runtime.spawn({ let grpc_store = store.clone(); let grpc_config = config.clone(); async move { From a34dba1f0c98f011dd75c51e093a559fca5d461e Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 20 Feb 2026 10:54:52 -0500 Subject: [PATCH 2/3] use config --- src/config.rs | 8 ++++++++ src/main.rs | 22 +++++++++++----------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index 046de4f7..6221e3dd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -238,6 +238,12 @@ pub struct Config { /// Enable additional metrics for the sqlite. pub enable_sqlite_status_metrics: bool, + + /// The number of worker threads to use for the consumer runtime. + pub consumer_runtime_threads: usize, + + /// The number of worker threads to use for the server runtime. + pub server_runtime_threads: usize, } impl Default for Config { @@ -307,6 +313,8 @@ impl Default for Config { full_vacuum_on_upkeep: true, vacuum_interval_ms: 30000, enable_sqlite_status_metrics: true, + consumer_runtime_threads: 0, + server_runtime_threads: 0, } } } diff --git a/src/main.rs b/src/main.rs index e4e559e4..183a5790 100644 --- a/src/main.rs +++ b/src/main.rs @@ -152,16 +152,18 @@ async fn main() -> Result<(), Error> { } }); - let consumer_runtime_threads = std::env::var("TOKIO_WORKER_THREADS") + let tokio_worker_threads = std::env::var("TOKIO_WORKER_THREADS") .unwrap_or("0".to_string()) .parse() .unwrap_or(0); let mut consumer_builder = tokio::runtime::Builder::new_multi_thread(); consumer_builder.thread_name("consumer-worker"); - if consumer_runtime_threads > 0 { - consumer_builder.worker_threads(consumer_runtime_threads); + if config.consumer_runtime_threads > 0 { + consumer_builder.worker_threads(config.consumer_runtime_threads); + } else if tokio_worker_threads > 0 { + consumer_builder.worker_threads(tokio_worker_threads); } - let consumer_runtime = consumer_builder.build().unwrap(); + let consumer_runtime = consumer_builder.enable_all().build().unwrap(); // Consumer from kafka let consumer_task = consumer_runtime.spawn({ @@ -201,16 +203,14 @@ async fn main() -> Result<(), Error> { }); // GRPC server - let server_runtime_threads = std::env::var("TOKIO_WORKER_THREADS") - .unwrap_or("0".to_string()) - .parse() - .unwrap_or(0); let mut server_builder = tokio::runtime::Builder::new_multi_thread(); server_builder.thread_name("server-worker"); - if server_runtime_threads > 0 { - server_builder.worker_threads(server_runtime_threads); + if config.server_runtime_threads > 0 { + server_builder.worker_threads(config.server_runtime_threads); + } else if tokio_worker_threads > 0 { + server_builder.worker_threads(tokio_worker_threads); } - let server_runtime = server_builder.build().unwrap(); + let server_runtime = server_builder.enable_all().build().unwrap(); let grpc_server_task = server_runtime.spawn({ let grpc_store = store.clone(); From c8195732a287b714f4fb9bc86d6fc3defd6383a2 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 20 Feb 2026 10:56:55 -0500 Subject: [PATCH 3/3] lint --- src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 183a5790..b8416a4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,6 @@ use taskbroker::kafka::inflight_activation_batcher::{ ActivationBatcherConfig, InflightActivationBatcher, }; use taskbroker::upkeep::upkeep; -use tokio::runtime::Runtime; use tokio::signal::unix::SignalKind; use tokio::task::JoinHandle; use tokio::{select, time};