Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- BREAKING: Add support to gracefully shutdown `WebhookServer` and `TlsServer`.
Both `WebhookServer::run` and `TlsServer::run` now require passing a shutdown signal, which is any
`Future<Output = ()>` ([#1144]).

[#1144]: https://github.com/stackabletech/operator-rs/pull/1144

## [0.8.1] - 2026-01-07

### Fixed
Expand Down
74 changes: 21 additions & 53 deletions crates/stackable-webhook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use ::x509_cert::Certificate;
use axum::{Router, routing::get};
use futures_util::{FutureExt as _, TryFutureExt, select};
use futures_util::TryFutureExt;
use k8s_openapi::ByteString;
use snafu::{ResultExt, Snafu};
use stackable_telemetry::AxumTraceLayer;
use tokio::{
signal::unix::{SignalKind, signal},
sync::mpsc,
try_join,
};
use tokio::{sync::mpsc, try_join};
use tower::ServiceBuilder;
use webhooks::{Webhook, WebhookError};
use x509_cert::der::{EncodePem, pem::LineEnding};
Expand Down Expand Up @@ -59,6 +55,7 @@ pub enum WebhookServerError {
///
/// ```
/// use stackable_webhook::{WebhookServer, WebhookServerOptions, webhooks::Webhook};
/// use tokio::time::{Duration, sleep};
///
/// # async fn docs() {
/// let mut webhooks: Vec<Box<dyn Webhook>> = vec![];
Expand All @@ -69,8 +66,9 @@ pub enum WebhookServerError {
/// webhook_service_name: "my-operator".to_owned(),
/// };
/// let webhook_server = WebhookServer::new(webhooks, webhook_options).await.unwrap();
/// let shutdown_signal = sleep(Duration::from_millis(100));
///
/// webhook_server.run().await.unwrap();
/// webhook_server.run(shutdown_signal).await.unwrap();
/// # }
/// ```
pub struct WebhookServer {
Expand Down Expand Up @@ -154,52 +152,16 @@ impl WebhookServer {
})
}

/// Runs the Webhook server and sets up signal handlers for shutting down.
/// Runs the [`WebhookServer`] and handles underlying certificate rotations of the [`TlsServer`].
///
/// This does not implement graceful shutdown of the underlying server. Additionally, the server
/// is never started in cases where no [`Webhook`] is registered. Callers of this function need
/// to ensure to choose the correct joining mechanism for their use-case to for example avoid
/// unexpected shutdowns of the whole Kubernetes controller.
pub async fn run(self) -> Result<()> {
let future_server = self.run_server();
let future_signal = async {
let mut sigint = signal(SignalKind::interrupt()).expect("create SIGINT listener");
let mut sigterm = signal(SignalKind::terminate()).expect("create SIGTERM listener");

tracing::debug!("created unix signal handlers");

select! {
signal = sigint.recv().fuse() => {
if signal.is_some() {
tracing::debug!( "received SIGINT");
}
},
signal = sigterm.recv().fuse() => {
if signal.is_some() {
tracing::debug!( "received SIGTERM");
}
},
};
};

// select requires Future + Unpin
tokio::pin!(future_server);
tokio::pin!(future_signal);

tokio::select! {
res = &mut future_server => {
// If the server future errors, propagate the error
res?;
}
_ = &mut future_signal => {
tracing::info!("shutdown signal received, stopping webhook server");
}
}

Ok(())
}

async fn run_server(self) -> Result<()> {
/// It should be noted that the server is never started in cases where no [`Webhook`] is
/// registered. Callers of this function need to ensure to choose the correct joining mechanism
/// for their use-case to for example avoid unexpected shutdowns of the whole Kubernetes
/// controller.
pub async fn run<F>(self, shutdown_signal: F) -> Result<()>
where
F: Future<Output = ()>,
{
tracing::debug!("run webhook server");

let Self {
Expand All @@ -217,10 +179,14 @@ impl WebhookServer {
}

let tls_server = tls_server
.run()
.run(shutdown_signal)
.map_err(|err| WebhookServerError::RunTlsServer { source: err });

let cert_update_loop = async {
// Once the shutdown signal is triggered, the TlsServer above should be dropped as the
// run associated function consumes self. This in turn means that when the receiver is
// polled, it will return `Ok(Ready(None))`, which will cause this while loop to break
// and the future to complete.
while let Some(cert) = cert_rx.recv().await {
// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = cert
Expand All @@ -243,6 +209,8 @@ impl WebhookServer {
Ok(())
};

// This either returns if one of the two futures complete with Err(_) or when both complete
// with Ok(_). Both futures complete with Ok(_) when a shutdown signal is received.
try_join!(cert_update_loop, tls_server).map(|_| ())
}
}
56 changes: 40 additions & 16 deletions crates/stackable-webhook/src/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use axum::{
extract::{ConnectInfo, Request},
middleware::AddExtension,
};
use futures_util::FutureExt as _;
use hyper::{body::Incoming, service::service_fn};
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::trace::{FutureExt, SpanKind};
use opentelemetry::trace::{FutureExt as _, SpanKind};
use opentelemetry_semantic_conventions as semconv;
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_shared::time::Duration;
Expand Down Expand Up @@ -138,17 +139,27 @@ impl TlsServer {
/// router.
///
/// It also starts a background task to rotate the certificate as needed.
pub async fn run(self) -> Result<()> {
///
/// The `shutdown_signal` can be used to notify the [`TlsServer`] to
/// gracefully shutdown.
pub async fn run<F>(self, shutdown_signal: F) -> Result<()>
where
F: Future<Output = ()>,
{
let Self {
cert_resolver,
socket_addr,
config,
router,
} = self;

let start = tokio::time::Instant::now() + *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL;
let mut interval = tokio::time::interval_at(start, *WEBHOOK_CERTIFICATE_ROTATION_INTERVAL);

let tls_acceptor = TlsAcceptor::from(Arc::new(self.config));
let tcp_listener =
TcpListener::bind(self.socket_addr)
.await
.context(BindTcpListenerSnafu {
socket_addr: self.socket_addr,
})?;
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let tcp_listener = TcpListener::bind(socket_addr)
.await
.context(BindTcpListenerSnafu { socket_addr })?;

// To be able to extract the connect info from incoming requests, it is
// required to turn the router into a Tower service which is capable of
Expand All @@ -161,24 +172,35 @@ impl TlsServer {
// - https://github.com/tokio-rs/axum/discussions/2397
// - https://github.com/tokio-rs/axum/blob/b02ce307371a973039018a13fa012af14775948c/examples/serve-with-hyper/src/main.rs#L98

let mut router = self
.router
.into_make_service_with_connect_info::<SocketAddr>();
let mut router = router.into_make_service_with_connect_info::<SocketAddr>();

// Fuse the future so that it only yields `Poll::Ready` once. The future
// additionally needs to be pinned to be able to be used in the select!
// macro below.
let shutdown_signal = shutdown_signal.fuse();
tokio::pin!(shutdown_signal);

loop {
let tls_acceptor = tls_acceptor.clone();

// Wait for either a new TCP connection or the certificate rotation interval tick
tokio::select! {
// We opt for a biased execution of arms to make sure we always check if the
// certificate needs rotation based on the interval. This ensures, we always use
// a valid certificate for the TLS connection.
// We opt for a biased execution of arms to make sure we always check if a
// shutdown signal was received or the certificate needs rotation based on the
// interval. This ensures, we always use a valid certificate for the TLS connection.
biased;

// As soon as this future because `Poll::Ready`, break out of the loop which cancels
// the certification rotation interval and stops accepting new TCP connections.
_ = &mut shutdown_signal => {
tracing::trace!("received shutdown signal");
break;
}

// This is cancellation-safe. If this branch is cancelled, the tick is NOT consumed.
// As such, we will not miss rotating the certificate.
_ = interval.tick() => {
self.cert_resolver
cert_resolver
.rotate_certificate()
.await
.context(RotateCertificateSnafu)?
Expand Down Expand Up @@ -210,6 +232,8 @@ impl TlsServer {
}
};
}

Ok(())
}

async fn handle_request(
Expand Down
5 changes: 4 additions & 1 deletion crates/stackable-webhook/src/webhooks/conversion_webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum ConversionWebhookError {
/// WebhookServer,
/// webhooks::{ConversionWebhook, ConversionWebhookOptions},
/// };
/// use tokio::time::{Duration, sleep};
///
/// # async fn docs() {
/// // The Kubernetes client
Expand All @@ -75,7 +76,9 @@ pub enum ConversionWebhookError {
/// let webhook_server = WebhookServer::new(vec![Box::new(conversion_webhook)], webhook_options)
/// .await
/// .unwrap();
/// webhook_server.run().await.unwrap();
/// let shutdown_signal = sleep(Duration::from_millis(100));
///
/// webhook_server.run(shutdown_signal).await.unwrap();
/// # }
/// ```
pub struct ConversionWebhook<H> {
Expand Down
5 changes: 4 additions & 1 deletion crates/stackable-webhook/src/webhooks/mutating_webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum MutatingWebhookError {
/// WebhookServer,
/// webhooks::{MutatingWebhook, MutatingWebhookOptions},
/// };
/// use tokio::time::{Duration, sleep};
///
/// # async fn docs() {
/// // The Kubernetes client
Expand All @@ -76,7 +77,9 @@ pub enum MutatingWebhookError {
/// let webhook_server = WebhookServer::new(vec![mutating_webhook], webhook_options)
/// .await
/// .unwrap();
/// webhook_server.run().await.unwrap();
/// let shutdown_signal = sleep(Duration::from_millis(100));
///
/// webhook_server.run(shutdown_signal).await.unwrap();
/// # }
///
/// fn get_mutating_webhook_configuration() -> MutatingWebhookConfiguration {
Expand Down
Loading