diff --git a/src/config.rs b/src/config.rs index 046de4f7..6221e3dd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -238,6 +238,12 @@ pub struct Config { /// Enable additional metrics for the sqlite. pub enable_sqlite_status_metrics: bool, + + /// The number of worker threads to use for the consumer runtime. + pub consumer_runtime_threads: usize, + + /// The number of worker threads to use for the server runtime. + pub server_runtime_threads: usize, } impl Default for Config { @@ -307,6 +313,8 @@ impl Default for Config { full_vacuum_on_upkeep: true, vacuum_interval_ms: 30000, enable_sqlite_status_metrics: true, + consumer_runtime_threads: 0, + server_runtime_threads: 0, } } } diff --git a/src/main.rs b/src/main.rs index 7970939d..b8416a4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -151,8 +151,21 @@ async fn main() -> Result<(), Error> { } }); + let tokio_worker_threads = std::env::var("TOKIO_WORKER_THREADS") + .unwrap_or("0".to_string()) + .parse() + .unwrap_or(0); + let mut consumer_builder = tokio::runtime::Builder::new_multi_thread(); + consumer_builder.thread_name("consumer-worker"); + if config.consumer_runtime_threads > 0 { + consumer_builder.worker_threads(config.consumer_runtime_threads); + } else if tokio_worker_threads > 0 { + consumer_builder.worker_threads(tokio_worker_threads); + } + let consumer_runtime = consumer_builder.enable_all().build().unwrap(); + // Consumer from kafka - let consumer_task = tokio::spawn({ + let consumer_task = consumer_runtime.spawn({ let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); @@ -189,7 +202,16 @@ async fn main() -> Result<(), Error> { }); // GRPC server - let grpc_server_task = tokio::spawn({ + let mut server_builder = tokio::runtime::Builder::new_multi_thread(); + server_builder.thread_name("server-worker"); + if config.server_runtime_threads > 0 { + server_builder.worker_threads(config.server_runtime_threads); + } else if tokio_worker_threads > 0 { + server_builder.worker_threads(tokio_worker_threads); + } + let server_runtime = server_builder.enable_all().build().unwrap(); + + let grpc_server_task = server_runtime.spawn({ let grpc_store = store.clone(); let grpc_config = config.clone(); async move {