From 0692ab188147440edad89b0f5ec3b5b98348cdf9 Mon Sep 17 00:00:00 2001 From: Techassi Date: Thu, 29 Jan 2026 16:28:01 +0100 Subject: [PATCH 1/6] feat(webhook): Add graceful shutdown --- crates/stackable-webhook/src/lib.rs | 74 ++++++------------- .../src/webhooks/conversion_webhook.rs | 5 +- 2 files changed, 25 insertions(+), 54 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 43b16d816..7e087f52c 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -15,15 +15,11 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use ::x509_cert::Certificate; use axum::{Router, routing::get}; -use futures_util::{FutureExt as _, TryFutureExt, select}; +use futures_util::TryFutureExt; use k8s_openapi::ByteString; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; -use tokio::{ - signal::unix::{SignalKind, signal}, - sync::mpsc, - try_join, -}; +use tokio::{sync::mpsc, try_join}; use tower::ServiceBuilder; use webhooks::{Webhook, WebhookError}; use x509_cert::der::{EncodePem, pem::LineEnding}; @@ -59,6 +55,7 @@ pub enum WebhookServerError { /// /// ``` /// use stackable_webhook::{WebhookServer, WebhookServerOptions, webhooks::Webhook}; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// let mut webhooks: Vec> = vec![]; @@ -69,8 +66,9 @@ pub enum WebhookServerError { /// webhook_service_name: "my-operator".to_owned(), /// }; /// let webhook_server = WebhookServer::new(webhooks, webhook_options).await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); /// -/// webhook_server.run().await.unwrap(); +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// ``` pub struct WebhookServer { @@ -154,52 +152,16 @@ impl WebhookServer { }) } - /// Runs the Webhook server and sets up signal handlers for shutting down. + /// Runs the [`WebhookServer`] and handles underlying certificate rotations of the [`TlsServer`]. /// - /// This does not implement graceful shutdown of the underlying server. Additionally, the server - /// is never started in cases where no [`Webhook`] is registered. Callers of this function need - /// to ensure to choose the correct joining mechanism for their use-case to for example avoid - /// unexpected shutdowns of the whole Kubernetes controller. - pub async fn run(self) -> Result<()> { - let future_server = self.run_server(); - let future_signal = async { - let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener"); - let mut sigterm = signal(SignalKind::terminate()).expect("create SIGTERM listener"); - - tracing::debug!("created unix signal handlers"); - - select! { - signal = sigint.recv().fuse() => { - if signal.is_some() { - tracing::debug!( "received SIGINT"); - } - }, - signal = sigterm.recv().fuse() => { - if signal.is_some() { - tracing::debug!( "received SIGTERM"); - } - }, - }; - }; - - // select requires Future + Unpin - tokio::pin!(future_server); - tokio::pin!(future_signal); - - tokio::select! { - res = &mut future_server => { - // If the server future errors, propagate the error - res?; - } - _ = &mut future_signal => { - tracing::info!("shutdown signal received, stopping webhook server"); - } - } - - Ok(()) - } - - async fn run_server(self) -> Result<()> { + /// It should be noted that the server is never started in cases where no [`Webhook`] is + /// registered. Callers of this function need to ensure to choose the correct joining mechanism + /// for their use-case to for example avoid unexpected shutdowns of the whole Kubernetes + /// controller. + pub async fn run(self, shutdown_signal: F) -> Result<()> + where + F: Future, + { tracing::debug!("run webhook server"); let Self { @@ -217,10 +179,14 @@ impl WebhookServer { } let tls_server = tls_server - .run() + .run(shutdown_signal) .map_err(|err| WebhookServerError::RunTlsServer { source: err }); let cert_update_loop = async { + // Once the shutdown signal is triggered, the TlsServer above should be dropped as the + // run associated function consumes self. This in turn means that when the receiver is + // polled, it will return `Ok(Ready(None))`, which will cause this while loop to break + // and the future to complete. while let Some(cert) = cert_rx.recv().await { // The caBundle needs to be provided as a base64-encoded PEM envelope. let ca_bundle = cert @@ -243,6 +209,8 @@ impl WebhookServer { Ok(()) }; + // This either returns if one of the two futures complete with Err(_) or when both complete + // with Ok(_). Both futures complete with Ok(_) when a shutdown signal is received. try_join!(cert_update_loop, tls_server).map(|_| ()) } } diff --git a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs index c0f11b7d0..f7870e41c 100644 --- a/crates/stackable-webhook/src/webhooks/conversion_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs @@ -51,6 +51,7 @@ pub enum ConversionWebhookError { /// WebhookServer, /// webhooks::{ConversionWebhook, ConversionWebhookOptions}, /// }; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// // The Kubernetes client @@ -75,7 +76,9 @@ pub enum ConversionWebhookError { /// let webhook_server = WebhookServer::new(vec![Box::new(conversion_webhook)], webhook_options) /// .await /// .unwrap(); -/// webhook_server.run().await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); +/// +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// ``` pub struct ConversionWebhook { From 1a57c358822d1996dee8c5bcf3fe9a17d72d7130 Mon Sep 17 00:00:00 2001 From: Techassi Date: Thu, 29 Jan 2026 16:40:30 +0100 Subject: [PATCH 2/6] test(webhook): Update doc tests --- crates/stackable-webhook/src/tls/mod.rs | 56 +++++++++++++------ .../src/webhooks/mutating_webhook.rs | 5 +- 2 files changed, 44 insertions(+), 17 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index fa493159d..3118392fe 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -7,9 +7,10 @@ use axum::{ extract::{ConnectInfo, Request}, middleware::AddExtension, }; +use futures_util::FutureExt as _; use hyper::{body::Incoming, service::service_fn}; use hyper_util::rt::{TokioExecutor, TokioIo}; -use opentelemetry::trace::{FutureExt, SpanKind}; +use opentelemetry::trace::{FutureExt as _, SpanKind}; use opentelemetry_semantic_conventions as semconv; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_shared::time::Duration; @@ -138,17 +139,27 @@ impl TlsServer { /// router. /// /// It also starts a background task to rotate the certificate as needed. - pub async fn run(self) -> Result<()> { + /// + /// The `shutdown_signal` can be used to notify the [`TlsServer`] to + /// gracefully shutdown. + pub async fn run(self, shutdown_signal: F) -> Result<()> + where + F: Future, + { + let Self { + cert_resolver, + socket_addr, + config, + router, + } = self; + let start = tokio::time::Instant::now() + *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL; let mut interval = tokio::time::interval_at(start, *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL); - let tls_acceptor = TlsAcceptor::from(Arc::new(self.config)); - let tcp_listener = - TcpListener::bind(self.socket_addr) - .await - .context(BindTcpListenerSnafu { - socket_addr: self.socket_addr, - })?; + let tls_acceptor = TlsAcceptor::from(Arc::new(config)); + let tcp_listener = TcpListener::bind(socket_addr) + .await + .context(BindTcpListenerSnafu { socket_addr })?; // To be able to extract the connect info from incoming requests, it is // required to turn the router into a Tower service which is capable of @@ -161,24 +172,35 @@ impl TlsServer { // - https://github.com/tokio-rs/axum/discussions/2397 // - https://github.com/tokio-rs/axum/blob/b02ce307371a973039018a13fa012af14775948c/examples/serve-with-hyper/src/main.rs#L98 - let mut router = self - .router - .into_make_service_with_connect_info::(); + let mut router = router.into_make_service_with_connect_info::(); + + // Fuse the future so that it only yields `Poll::Ready` once. The future + // additionally needs to be pinned to be able to be used in the select! + // macro below. + let shutdown_signal = shutdown_signal.fuse(); + tokio::pin!(shutdown_signal); loop { let tls_acceptor = tls_acceptor.clone(); // Wait for either a new TCP connection or the certificate rotation interval tick tokio::select! { - // We opt for a biased execution of arms to make sure we always check if the - // certificate needs rotation based on the interval. This ensures, we always use - // a valid certificate for the TLS connection. + // We opt for a biased execution of arms to make sure we always check if a + // shutdown signal was received or the certificate needs rotation based on the + // interval. This ensures, we always use a valid certificate for the TLS connection. biased; + // As soon as this future because `Poll::Ready`, break out of the loop which cancels + // the certification rotation interval and stops accepting new TCP connections. + _ = &mut shutdown_signal => { + tracing::trace!("received shutdown signal"); + break; + } + // This is cancellation-safe. If this branch is cancelled, the tick is NOT consumed. // As such, we will not miss rotating the certificate. _ = interval.tick() => { - self.cert_resolver + cert_resolver .rotate_certificate() .await .context(RotateCertificateSnafu)? @@ -210,6 +232,8 @@ impl TlsServer { } }; } + + Ok(()) } async fn handle_request( diff --git a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs index 1beb12cde..67172fc80 100644 --- a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs @@ -51,6 +51,7 @@ pub enum MutatingWebhookError { /// WebhookServer, /// webhooks::{MutatingWebhook, MutatingWebhookOptions}, /// }; +/// use tokio::time::{Duration, sleep}; /// /// # async fn docs() { /// // The Kubernetes client @@ -76,7 +77,9 @@ pub enum MutatingWebhookError { /// let webhook_server = WebhookServer::new(vec![mutating_webhook], webhook_options) /// .await /// .unwrap(); -/// webhook_server.run().await.unwrap(); +/// let shutdown_signal = sleep(Duration::from_millis(100)); +/// +/// webhook_server.run(shutdown_signal).await.unwrap(); /// # } /// /// fn get_mutating_webhook_configuration() -> MutatingWebhookConfiguration { From 44c93c9f1c8f72800de3bb15216506be09d56633 Mon Sep 17 00:00:00 2001 From: Techassi Date: Thu, 29 Jan 2026 16:45:46 +0100 Subject: [PATCH 3/6] chore: Add changelog entry --- crates/stackable-webhook/CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index cd9d460c1..a8fe9fe2e 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer` ([#1144]). + Both `WebhookServer::run` and `TlsServer::run` now require passing a shutdown signal, which is any + `Future`. + +[#1144]: https://github.com/stackabletech/operator-rs/pull/1144 + ## [0.8.1] - 2026-01-07 ### Fixed From 1bc7e67358d0d479c452d7bbe2b9403bdd4cddfc Mon Sep 17 00:00:00 2001 From: Techassi Date: Fri, 30 Jan 2026 13:49:20 +0100 Subject: [PATCH 4/6] chore(webhhok): Adjust changelog --- crates/stackable-webhook/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index a8fe9fe2e..e48c77b18 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -6,9 +6,9 @@ All notable changes to this project will be documented in this file. ### Added -- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer` ([#1144]). +- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer`. Both `WebhookServer::run` and `TlsServer::run` now require passing a shutdown signal, which is any - `Future`. + `Future` ([#1144]). [#1144]: https://github.com/stackabletech/operator-rs/pull/1144 From cafe46dbc714e3ab4c2248e061e1dbefa470a3b0 Mon Sep 17 00:00:00 2001 From: Techassi Date: Tue, 3 Feb 2026 08:47:58 +0100 Subject: [PATCH 5/6] chore: Adjust dev comment about try_join! Co-authored-by: Malte Sander --- crates/stackable-webhook/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 7e087f52c..00dfaba7c 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -209,7 +209,7 @@ impl WebhookServer { Ok(()) }; - // This either returns if one of the two futures complete with Err(_) or when both complete + // This either returns if one of the two futures completes with Err(_) or when both complete // with Ok(_). Both futures complete with Ok(_) when a shutdown signal is received. try_join!(cert_update_loop, tls_server).map(|_| ()) } From 2242930a5c6f527eca618b4cd575252eeea4589e Mon Sep 17 00:00:00 2001 From: Techassi Date: Tue, 3 Feb 2026 08:52:24 +0100 Subject: [PATCH 6/6] chore: Update comment about shutdown signal --- crates/stackable-webhook/src/tls/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 3118392fe..3c5d36482 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -190,8 +190,9 @@ impl TlsServer { // interval. This ensures, we always use a valid certificate for the TLS connection. biased; - // As soon as this future because `Poll::Ready`, break out of the loop which cancels - // the certification rotation interval and stops accepting new TCP connections. + // Once a shutdown signal is received (this future becomes `Poll::Ready`), break out + // of the main loop which cancels the certification rotation interval and stops + // accepting new TCP connections. _ = &mut shutdown_signal => { tracing::trace!("received shutdown signal"); break;