Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions embedded-service/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ pub mod mctp {
type ResultEnumType: for<'buf> mctp_rs::MctpMessageTrait<'buf, Header = Self::HeaderType>
+ RelayResponse<Self::ServiceIdType, Self::HeaderType>;

/// Returns a reference to the notification `Listener`.
fn notification_listener(&self) -> &crate::relay::notifications::Listener<'_>;

/// Process the provided request and yield a result.
fn process_request<'a>(
&'a self,
Expand Down Expand Up @@ -449,18 +452,21 @@ pub mod mctp {


pub struct $relay_type_name<'hw> {
listener: $crate::relay::notifications::Listener<'hw>,
$(
[<$service_name:snake _handler>]: &'hw $service_handler_type,
)+
}

impl<'hw> $relay_type_name<'hw> {
pub fn new(
listener: $crate::relay::notifications::Listener<'hw>,
$(
[<$service_name:snake _handler>]: &'hw $service_handler_type,
)+
) -> Self {
Self {
listener,
$(
[<$service_name:snake _handler>],
)+
Expand All @@ -474,6 +480,10 @@ pub mod mctp {
type RequestEnumType = HostRequest;
type ResultEnumType = HostResult;

fn notification_listener(&self) -> & $crate::relay::notifications::Listener<'_> {
&self.listener
}

fn process_request<'a>(
&'a self,
message: HostRequest,
Expand Down Expand Up @@ -501,3 +511,129 @@ pub mod mctp {

pub use impl_odp_mctp_relay_handler;
}

/// Relay service notification support.
pub mod notifications {
use crate::GlobalRawMutex;
use core::sync::atomic::{AtomicBool, Ordering};
use embassy_sync::channel;

/// Notification error.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error {
/// A notification is currently being processed.
Busy,
/// A [`Listener`] has already been instantiated.
ListenerInstantiated,
}

/// Notifier.
///
/// Used by services to send notifications to the relay service.
pub struct Notifier<'ch> {
id: u8,
sender: channel::Sender<'ch, GlobalRawMutex, u8, 1>,
}

impl<'ch> Notifier<'ch> {
/// Notify the relay service that the service holding this [`Notifier`] needs attention from the host.
///
/// This will wait if a different notification is currently being processed.
pub async fn notify(&self) {
self.sender.send(self.id).await
}

/// Try to notify the relay service that the service holding this [`Notifier`] needs attention from the host.
///
/// # Errors
///
/// Returns [`Error::Busy`] if a different notification is currently being processed.
pub fn try_notify(&self) -> Result<(), Error> {
self.sender.try_send(self.id).map_err(|_| Error::Busy)
}
}

/// Listener.
pub struct Listener<'ch> {
receiver: channel::Receiver<'ch, GlobalRawMutex, u8, 1>,
}

impl<'ch> Listener<'ch> {
/// Wait for a notification from any service, then returns the id associated with that notification.
pub async fn listen(&self) -> u8 {
self.receiver.receive().await
}

/// Try to listen for a notification from any service.
///
/// Returns [`None`] if no notification is currently available.
pub fn try_listen(&self) -> Option<u8> {
self.receiver.try_receive().ok()
}
}

/// Notification handler.
pub struct NotificationHandler {
// The channel size is fixed to 1 (and not exposed as a configurable generic) so that
// it does not need to be exposed to consumers of `Notifier` and `Listener`.
//
// This is reasonable because notifications are expected to be handled quickly,
// and it is unlikely that multiple services will need to notify the host simultaneously.
//
// In the event that they do, the channel still provides backpressure so no notifications
// will be lost and the expected latency is minimal.
channel: channel::Channel<GlobalRawMutex, u8, 1>,
listener_instantiated: AtomicBool,
}

impl Default for NotificationHandler {
fn default() -> Self {
Self::new()
}
}

impl NotificationHandler {
/// Create a new [`NotificationHandler`] instance.
///
/// This handler allows as many [`Notifier`]s to be created as needed,
/// but only one [`Listener`] may be created.
pub fn new() -> Self {
Self {
channel: channel::Channel::new(),
listener_instantiated: AtomicBool::new(false),
}
}

/// Create a new [`Notifier`] instance with given id.
///
/// This [`Notifier`] is then typically passed to services upon instantiation.
///
/// The meaning of the id is platform specific and is opaque to services,
/// but expectation is that the relay service understands how to interpret the id.
///
/// Thus, the caller should ensure the given id matches what the relay service expects.
pub fn new_notifier(&self, id: u8) -> Notifier<'_> {
Notifier {
id,
sender: self.channel.sender(),
}
}

/// Create a new [`Listener`] instance.
///
/// # Errors
///
/// Only one [`Listener`] may exist.
/// Attempting to create another will return [`Error::ListenerInstantiated`].
pub fn new_listener(&self) -> Result<Listener<'_>, Error> {
if !self.listener_instantiated.swap(true, Ordering::Relaxed) {
Ok(Listener {
receiver: self.channel.receiver(),
})
} else {
Err(Error::ListenerInstantiated)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd to me - it seems like this would prevent other services from also receiving these notifications, right? Why do we have this limitation?

Copy link
Contributor Author

@kurtjd kurtjd Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I talked about this briefly in the PR description/doc strings. I decided to limit this to only relay services because there is no payload and the id is only meaningful to the relay services. So since we only have one relay service at a time, we limit this to only one listener. My idea was if services also need to notify other services, that would be through a separate stream since it is difficult to handle all that here (such as needing to expose the number of listeners/publishers in the interface like you pointed out) and because they would likely want to send a payload along with the notification to the other service.

Copy link
Contributor Author

@kurtjd kurtjd Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in, if TAS needs to notify the host AND notify another service, it would use this interface to notify the host then rely on some mechanism established explicitly between the TAS and that other service.

If we are trying to make this generic enough to send any arbitrary payload to any other service it feels like we are just reimplementing a message passing system, so I'm not sure that's the route we are trying to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted about this offline, but for the benefit of other reviewers - I think the plan is to look into doing something where notifications go over something like a PubSubChannel and then extend the RelayServiceHandlerTypes / RelayServiceHandler traits to describe how to translate notifications on that PubSubChannel into something that goes over the relay bus. This should allow individual services to use the same notifications for host communication and service-to-service communication without caring where they're going, and it isolates knowledge of what the host does and doesn't care about in the same place as all the serialization/deserialization logic

}
}
}
25 changes: 13 additions & 12 deletions espi-service/src/espi_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::slice;

use embassy_futures::select::select;
use embassy_futures::select::select3;
use embassy_imxrt::espi;
use embassy_sync::channel::Channel;
use embassy_sync::mutex::Mutex;
Expand Down Expand Up @@ -56,31 +56,32 @@ impl<'hw, RelayHandler: embedded_services::relay::mctp::RelayHandler> Service<'h
pub(crate) async fn run_service(&self) -> ! {
let mut espi = self.espi.lock().await;
loop {
let event = select(espi.wait_for_event(), self.host_tx_queue.receive()).await;
let event = select3(
espi.wait_for_event(),
self.host_tx_queue.receive(),
self.relay_handler.notification_listener().listen(),
)
.await;

match event {
embassy_futures::select::Either::First(controller_event) => {
embassy_futures::select::Either3::First(controller_event) => {
self.process_controller_event(&mut espi, controller_event)
.await
.unwrap_or_else(|e| {
error!("Critical error processing eSPI controller event: {:?}", e);
});
}
embassy_futures::select::Either::Second(host_msg) => {
embassy_futures::select::Either3::Second(host_msg) => {
self.process_response_to_host(&mut espi, host_msg).await
}
embassy_futures::select::Either3::Third(id) => {
espi.irq_push(id).await;
info!("espi: Notification id {} sent to Host!", id);
}
}
}
}

// TODO The notification system was not actually used, so this is currently dead code.
// We need to implement some interface for triggering notifications from other subsystems, and it may do something like this:
//
// async fn process_notification_to_host(&self, espi: &mut espi::Espi<'_>, notification: &NotificationMsg) {
// espi.irq_push(notification.offset).await;
// info!("espi: Notification id {} sent to Host!", notification.offset);
// }

fn write_to_hw(&self, espi: &mut espi::Espi<'hw>, packet: &[u8]) -> Result<(), embassy_imxrt::espi::Error> {
// Send packet via your transport medium
// SAFETY: Safe as the access to espi is protected by a mut reference.
Expand Down
9 changes: 9 additions & 0 deletions thermal-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#![allow(clippy::unwrap_used)]

use embedded_sensors_hal_async::temperature::DegreesCelsius;
use embedded_services::relay::notifications::Notifier;
use thermal_service_messages::{ThermalRequest, ThermalResult};

mod context;
Expand Down Expand Up @@ -36,21 +37,29 @@ pub enum Event {

pub struct Service<'hw> {
context: context::Context<'hw>,
notifier: Notifier<'hw>,
}

impl<'hw> Service<'hw> {
pub async fn init(
service_storage: &'hw embassy_sync::once_lock::OnceLock<Service<'hw>>,
sensors: &'hw [&'hw sensor::Device],
fans: &'hw [&'hw fan::Device],
notifier: Notifier<'hw>,
) -> &'hw Self {
service_storage.get_or_init(|| Self {
context: context::Context::new(sensors, fans),
notifier,
})
}

/// Send a thermal event
pub async fn send_event(&self, event: Event) {
// If a threshold event is triggered, we want to notify host
if matches!(event, Event::ThresholdExceeded(_, _, _) | Event::ThresholdCleared(_, _)) {
self.notifier.notify().await;
}

self.context.send_event(event).await
}

Expand Down
Loading