From 87c62ec033ec2888952a8c8a9c683ac08320c9ec Mon Sep 17 00:00:00 2001 From: Techassi Date: Fri, 30 Jan 2026 16:31:12 +0100 Subject: [PATCH 1/3] feat(operator): Add SignalWatcher --- crates/stackable-operator/src/utils/mod.rs | 2 + crates/stackable-operator/src/utils/signal.rs | 73 +++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 crates/stackable-operator/src/utils/signal.rs diff --git a/crates/stackable-operator/src/utils/mod.rs b/crates/stackable-operator/src/utils/mod.rs index a08b24a43..7f51eda2c 100644 --- a/crates/stackable-operator/src/utils/mod.rs +++ b/crates/stackable-operator/src/utils/mod.rs @@ -3,6 +3,8 @@ pub mod cluster_info; pub mod crds; pub mod kubelet; pub mod logging; +pub mod signal; + mod option; mod url; diff --git a/crates/stackable-operator/src/utils/signal.rs b/crates/stackable-operator/src/utils/signal.rs new file mode 100644 index 000000000..c585e1ea7 --- /dev/null +++ b/crates/stackable-operator/src/utils/signal.rs @@ -0,0 +1,73 @@ +use snafu::{ResultExt, Snafu}; +use tokio::{ + signal::unix::{SignalKind, signal}, + sync::watch, +}; + +#[derive(Debug, Snafu)] +#[snafu(display("failed to construct signal listener"))] +pub struct SignalError { + source: std::io::Error, +} + +/// Watches for the incoming signal and multiplies it by sending it to all acquired handles. +pub struct SignalWatcher +where + T: Send + Sync + 'static, +{ + watch_rx: watch::Receiver, +} + +impl SignalWatcher +where + T: Default + Send + Sync + 'static, +{ + /// Watches the provided `signal` and multiplies the signal by sending it to all acquired handles + /// constructed through [`SignalWatcher::handle`]. + pub fn new(signal: F) -> Self + where + F: Future + Send + Sync + 'static, + { + let (watch_tx, watch_rx) = watch::channel(T::default()); + + tokio::spawn(async move { + let value = signal.await; + watch_tx.send(value) + }); + + Self { watch_rx } + } + + /// Acquire a new handle which will complete once a `SIGTERM` signal is received. + /// + /// This handle can be cheaply cloned to be able to gracefully shutdown multiple concurrent + /// tasks. + pub fn handle(&self) -> impl Future + use { + let mut watch_rx = self.watch_rx.clone(); + + async move { + watch_rx.changed().await.ok(); + } + } +} + +impl SignalWatcher<()> { + /// Watches the `SIGTERM` signal and multiplies the signal by sending it to all acquired handlers + /// constructed through [`SignalWatcher::handle`]. + // + // NOTE (@Techassi): Note Accepting a generic Future here is possible, but + // `Signal::recv` borrows instead of consuming which clashes with the 'static lifetime + // requirement of `tokio::spawn`. That's why I opted for watching for a particular signal + // internally instead of requiring users to pass the signal to this function. + pub fn sigterm() -> Result { + let mut sigterm = signal(SignalKind::terminate()).context(SignalSnafu)?; + let (watch_tx, watch_rx) = watch::channel(()); + + tokio::spawn(async move { + sigterm.recv().await; + watch_tx.send(()) + }); + + Ok(Self { watch_rx }) + } +} From 9d62ad422499c338a7e348180728334008479589 Mon Sep 17 00:00:00 2001 From: Techassi Date: Fri, 30 Jan 2026 16:35:18 +0100 Subject: [PATCH 2/3] chore(operator): Add changelog entry --- crates/stackable-operator/CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 1739c9a9c..c07b7bbde 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Add `SignalWatcher` which can be used to watch signals and multiply them to gracefully shutdown + multiple concurrent tasks/futures ([#1147]). + +[#1147]: https://github.com/stackabletech/operator-rs/pull/1147 + ## [0.104.0] - 2026-01-26 ### Added From 9ccb2e486b41a61c3a8172f9c84cdc8197c5132a Mon Sep 17 00:00:00 2001 From: Techassi Date: Tue, 3 Feb 2026 09:44:06 +0100 Subject: [PATCH 3/3] chore: Apply suggestion Co-authored-by: Malte Sander --- crates/stackable-operator/src/utils/signal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-operator/src/utils/signal.rs b/crates/stackable-operator/src/utils/signal.rs index c585e1ea7..59c68e17f 100644 --- a/crates/stackable-operator/src/utils/signal.rs +++ b/crates/stackable-operator/src/utils/signal.rs @@ -5,7 +5,7 @@ use tokio::{ }; #[derive(Debug, Snafu)] -#[snafu(display("failed to construct signal listener"))] +#[snafu(display("failed to construct signal watcher"))] pub struct SignalError { source: std::io::Error, }