Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ pub struct Config {

/// Enable additional metrics for the sqlite.
pub enable_sqlite_status_metrics: bool,

/// The number of worker threads to use for the consumer runtime.
pub consumer_runtime_threads: usize,

/// The number of worker threads to use for the server runtime.
pub server_runtime_threads: usize,
}

impl Default for Config {
Expand Down Expand Up @@ -307,6 +313,8 @@ impl Default for Config {
full_vacuum_on_upkeep: true,
vacuum_interval_ms: 30000,
enable_sqlite_status_metrics: true,
consumer_runtime_threads: 0,
server_runtime_threads: 0,
}
}
}
Expand Down
26 changes: 24 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,21 @@ async fn main() -> Result<(), Error> {
}
});

let tokio_worker_threads = std::env::var("TOKIO_WORKER_THREADS")
.unwrap_or("0".to_string())
.parse()
.unwrap_or(0);
let mut consumer_builder = tokio::runtime::Builder::new_multi_thread();
consumer_builder.thread_name("consumer-worker");
if config.consumer_runtime_threads > 0 {
consumer_builder.worker_threads(config.consumer_runtime_threads);
} else if tokio_worker_threads > 0 {
consumer_builder.worker_threads(tokio_worker_threads);
}
let consumer_runtime = consumer_builder.enable_all().build().unwrap();

// Consumer from kafka
let consumer_task = tokio::spawn({
let consumer_task = consumer_runtime.spawn({
let consumer_store = store.clone();
let consumer_config = config.clone();
let runtime_config_manager = runtime_config_manager.clone();
Expand Down Expand Up @@ -189,7 +202,16 @@ async fn main() -> Result<(), Error> {
});

// GRPC server
let grpc_server_task = tokio::spawn({
let mut server_builder = tokio::runtime::Builder::new_multi_thread();
server_builder.thread_name("server-worker");
if config.server_runtime_threads > 0 {
server_builder.worker_threads(config.server_runtime_threads);
} else if tokio_worker_threads > 0 {
server_builder.worker_threads(tokio_worker_threads);
}
let server_runtime = server_builder.enable_all().build().unwrap();

let grpc_server_task = server_runtime.spawn({
let grpc_store = store.clone();
let grpc_config = config.clone();
async move {
Expand Down
Loading