diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 125660dc9..58d169fc6 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -8,8 +8,11 @@ All notable changes to this project will be documented in this file. - BREAKING: Add support to gracefully shutdown `EosChecker`. `EosChecker::run` now requires passing a shutdown signal, which is any `Future` ([#1146]). +- Add `SignalWatcher` which can be used to watch signals and multiply them to gracefully shutdown + multiple concurrent tasks/futures ([#1147]). [#1146]: https://github.com/stackabletech/operator-rs/pull/1146 +[#1147]: https://github.com/stackabletech/operator-rs/pull/1147 ## [0.104.0] - 2026-01-26 diff --git a/crates/stackable-operator/src/utils/mod.rs b/crates/stackable-operator/src/utils/mod.rs index a08b24a43..7f51eda2c 100644 --- a/crates/stackable-operator/src/utils/mod.rs +++ b/crates/stackable-operator/src/utils/mod.rs @@ -3,6 +3,8 @@ pub mod cluster_info; pub mod crds; pub mod kubelet; pub mod logging; +pub mod signal; + mod option; mod url; diff --git a/crates/stackable-operator/src/utils/signal.rs b/crates/stackable-operator/src/utils/signal.rs new file mode 100644 index 000000000..59c68e17f --- /dev/null +++ b/crates/stackable-operator/src/utils/signal.rs @@ -0,0 +1,73 @@ +use snafu::{ResultExt, Snafu}; +use tokio::{ + signal::unix::{SignalKind, signal}, + sync::watch, +}; + +#[derive(Debug, Snafu)] +#[snafu(display("failed to construct signal watcher"))] +pub struct SignalError { + source: std::io::Error, +} + +/// Watches for the incoming signal and multiplies it by sending it to all acquired handles. +pub struct SignalWatcher +where + T: Send + Sync + 'static, +{ + watch_rx: watch::Receiver, +} + +impl SignalWatcher +where + T: Default + Send + Sync + 'static, +{ + /// Watches the provided `signal` and multiplies the signal by sending it to all acquired handles + /// constructed through [`SignalWatcher::handle`]. + pub fn new(signal: F) -> Self + where + F: Future + Send + Sync + 'static, + { + let (watch_tx, watch_rx) = watch::channel(T::default()); + + tokio::spawn(async move { + let value = signal.await; + watch_tx.send(value) + }); + + Self { watch_rx } + } + + /// Acquire a new handle which will complete once a `SIGTERM` signal is received. + /// + /// This handle can be cheaply cloned to be able to gracefully shutdown multiple concurrent + /// tasks. + pub fn handle(&self) -> impl Future + use { + let mut watch_rx = self.watch_rx.clone(); + + async move { + watch_rx.changed().await.ok(); + } + } +} + +impl SignalWatcher<()> { + /// Watches the `SIGTERM` signal and multiplies the signal by sending it to all acquired handlers + /// constructed through [`SignalWatcher::handle`]. + // + // NOTE (@Techassi): Note Accepting a generic Future here is possible, but + // `Signal::recv` borrows instead of consuming which clashes with the 'static lifetime + // requirement of `tokio::spawn`. That's why I opted for watching for a particular signal + // internally instead of requiring users to pass the signal to this function. + pub fn sigterm() -> Result { + let mut sigterm = signal(SignalKind::terminate()).context(SignalSnafu)?; + let (watch_tx, watch_rx) = watch::channel(()); + + tokio::spawn(async move { + sigterm.recv().await; + watch_tx.send(()) + }); + + Ok(Self { watch_rx }) + } +}