From 07c4d4f4a0f4d0f89150bd256eea83bf74ea2938 Mon Sep 17 00:00:00 2001 From: Kurtis Dinelle Date: Wed, 4 Mar 2026 15:35:15 -0800 Subject: [PATCH] Add notification support for relay services --- embedded-service/src/relay/mod.rs | 136 ++++++++++++++++++++++++++++++ espi-service/src/espi_service.rs | 25 +++--- thermal-service/src/lib.rs | 9 ++ 3 files changed, 158 insertions(+), 12 deletions(-) diff --git a/embedded-service/src/relay/mod.rs b/embedded-service/src/relay/mod.rs index 3cc7092e..2cbf87df 100644 --- a/embedded-service/src/relay/mod.rs +++ b/embedded-service/src/relay/mod.rs @@ -164,6 +164,9 @@ pub mod mctp { type ResultEnumType: for<'buf> mctp_rs::MctpMessageTrait<'buf, Header = Self::HeaderType> + RelayResponse; + /// Returns a reference to the notification `Listener`. + fn notification_listener(&self) -> &crate::relay::notifications::Listener<'_>; + /// Process the provided request and yield a result. fn process_request<'a>( &'a self, @@ -449,6 +452,7 @@ pub mod mctp { pub struct $relay_type_name<'hw> { + listener: $crate::relay::notifications::Listener<'hw>, $( [<$service_name:snake _handler>]: &'hw $service_handler_type, )+ @@ -456,11 +460,13 @@ pub mod mctp { impl<'hw> $relay_type_name<'hw> { pub fn new( + listener: $crate::relay::notifications::Listener<'hw>, $( [<$service_name:snake _handler>]: &'hw $service_handler_type, )+ ) -> Self { Self { + listener, $( [<$service_name:snake _handler>], )+ @@ -474,6 +480,10 @@ pub mod mctp { type RequestEnumType = HostRequest; type ResultEnumType = HostResult; + fn notification_listener(&self) -> & $crate::relay::notifications::Listener<'_> { + &self.listener + } + fn process_request<'a>( &'a self, message: HostRequest, @@ -501,3 +511,129 @@ pub mod mctp { pub use impl_odp_mctp_relay_handler; } + +/// Relay service notification support. +pub mod notifications { + use crate::GlobalRawMutex; + use core::sync::atomic::{AtomicBool, Ordering}; + use embassy_sync::channel; + + /// Notification error. + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + #[cfg_attr(feature = "defmt", derive(defmt::Format))] + pub enum Error { + /// A notification is currently being processed. + Busy, + /// A [`Listener`] has already been instantiated. + ListenerInstantiated, + } + + /// Notifier. + /// + /// Used by services to send notifications to the relay service. + pub struct Notifier<'ch> { + id: u8, + sender: channel::Sender<'ch, GlobalRawMutex, u8, 1>, + } + + impl<'ch> Notifier<'ch> { + /// Notify the relay service that the service holding this [`Notifier`] needs attention from the host. + /// + /// This will wait if a different notification is currently being processed. + pub async fn notify(&self) { + self.sender.send(self.id).await + } + + /// Try to notify the relay service that the service holding this [`Notifier`] needs attention from the host. + /// + /// # Errors + /// + /// Returns [`Error::Busy`] if a different notification is currently being processed. + pub fn try_notify(&self) -> Result<(), Error> { + self.sender.try_send(self.id).map_err(|_| Error::Busy) + } + } + + /// Listener. + pub struct Listener<'ch> { + receiver: channel::Receiver<'ch, GlobalRawMutex, u8, 1>, + } + + impl<'ch> Listener<'ch> { + /// Wait for a notification from any service, then returns the id associated with that notification. + pub async fn listen(&self) -> u8 { + self.receiver.receive().await + } + + /// Try to listen for a notification from any service. + /// + /// Returns [`None`] if no notification is currently available. + pub fn try_listen(&self) -> Option { + self.receiver.try_receive().ok() + } + } + + /// Notification handler. + pub struct NotificationHandler { + // The channel size is fixed to 1 (and not exposed as a configurable generic) so that + // it does not need to be exposed to consumers of `Notifier` and `Listener`. + // + // This is reasonable because notifications are expected to be handled quickly, + // and it is unlikely that multiple services will need to notify the host simultaneously. + // + // In the event that they do, the channel still provides backpressure so no notifications + // will be lost and the expected latency is minimal. + channel: channel::Channel, + listener_instantiated: AtomicBool, + } + + impl Default for NotificationHandler { + fn default() -> Self { + Self::new() + } + } + + impl NotificationHandler { + /// Create a new [`NotificationHandler`] instance. + /// + /// This handler allows as many [`Notifier`]s to be created as needed, + /// but only one [`Listener`] may be created. + pub fn new() -> Self { + Self { + channel: channel::Channel::new(), + listener_instantiated: AtomicBool::new(false), + } + } + + /// Create a new [`Notifier`] instance with given id. + /// + /// This [`Notifier`] is then typically passed to services upon instantiation. + /// + /// The meaning of the id is platform specific and is opaque to services, + /// but expectation is that the relay service understands how to interpret the id. + /// + /// Thus, the caller should ensure the given id matches what the relay service expects. + pub fn new_notifier(&self, id: u8) -> Notifier<'_> { + Notifier { + id, + sender: self.channel.sender(), + } + } + + /// Create a new [`Listener`] instance. + /// + /// # Errors + /// + /// Only one [`Listener`] may exist. + /// Attempting to create another will return [`Error::ListenerInstantiated`]. + pub fn new_listener(&self) -> Result, Error> { + if !self.listener_instantiated.swap(true, Ordering::Relaxed) { + Ok(Listener { + receiver: self.channel.receiver(), + }) + } else { + Err(Error::ListenerInstantiated) + } + } + } +} diff --git a/espi-service/src/espi_service.rs b/espi-service/src/espi_service.rs index 0a7c1d93..fcff1ba0 100644 --- a/espi-service/src/espi_service.rs +++ b/espi-service/src/espi_service.rs @@ -1,6 +1,6 @@ use core::slice; -use embassy_futures::select::select; +use embassy_futures::select::select3; use embassy_imxrt::espi; use embassy_sync::channel::Channel; use embassy_sync::mutex::Mutex; @@ -56,31 +56,32 @@ impl<'hw, RelayHandler: embedded_services::relay::mctp::RelayHandler> Service<'h pub(crate) async fn run_service(&self) -> ! { let mut espi = self.espi.lock().await; loop { - let event = select(espi.wait_for_event(), self.host_tx_queue.receive()).await; + let event = select3( + espi.wait_for_event(), + self.host_tx_queue.receive(), + self.relay_handler.notification_listener().listen(), + ) + .await; match event { - embassy_futures::select::Either::First(controller_event) => { + embassy_futures::select::Either3::First(controller_event) => { self.process_controller_event(&mut espi, controller_event) .await .unwrap_or_else(|e| { error!("Critical error processing eSPI controller event: {:?}", e); }); } - embassy_futures::select::Either::Second(host_msg) => { + embassy_futures::select::Either3::Second(host_msg) => { self.process_response_to_host(&mut espi, host_msg).await } + embassy_futures::select::Either3::Third(id) => { + espi.irq_push(id).await; + info!("espi: Notification id {} sent to Host!", id); + } } } } - // TODO The notification system was not actually used, so this is currently dead code. - // We need to implement some interface for triggering notifications from other subsystems, and it may do something like this: - // - // async fn process_notification_to_host(&self, espi: &mut espi::Espi<'_>, notification: &NotificationMsg) { - // espi.irq_push(notification.offset).await; - // info!("espi: Notification id {} sent to Host!", notification.offset); - // } - fn write_to_hw(&self, espi: &mut espi::Espi<'hw>, packet: &[u8]) -> Result<(), embassy_imxrt::espi::Error> { // Send packet via your transport medium // SAFETY: Safe as the access to espi is protected by a mut reference. diff --git a/thermal-service/src/lib.rs b/thermal-service/src/lib.rs index dc8f70c9..021651f0 100644 --- a/thermal-service/src/lib.rs +++ b/thermal-service/src/lib.rs @@ -4,6 +4,7 @@ #![allow(clippy::unwrap_used)] use embedded_sensors_hal_async::temperature::DegreesCelsius; +use embedded_services::relay::notifications::Notifier; use thermal_service_messages::{ThermalRequest, ThermalResult}; mod context; @@ -36,6 +37,7 @@ pub enum Event { pub struct Service<'hw> { context: context::Context<'hw>, + notifier: Notifier<'hw>, } impl<'hw> Service<'hw> { @@ -43,14 +45,21 @@ impl<'hw> Service<'hw> { service_storage: &'hw embassy_sync::once_lock::OnceLock>, sensors: &'hw [&'hw sensor::Device], fans: &'hw [&'hw fan::Device], + notifier: Notifier<'hw>, ) -> &'hw Self { service_storage.get_or_init(|| Self { context: context::Context::new(sensors, fans), + notifier, }) } /// Send a thermal event pub async fn send_event(&self, event: Event) { + // If a threshold event is triggered, we want to notify host + if matches!(event, Event::ThresholdExceeded(_, _, _) | Event::ThresholdCleared(_, _)) { + self.notifier.notify().await; + } + self.context.send_event(event).await }