diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 078b4b8..f27a61b 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -357,7 +357,7 @@ where receipts: block.receipts.clone(), }; // Ignore send errors — no subscribers is fine. - let _ = self.chain.send_notification(notif); + let _ = self.chain.send_new_block(notif); } /// Update the status channel and block tags. This keeps the RPC node diff --git a/crates/rpc/src/config/chain_notifier.rs b/crates/rpc/src/config/chain_notifier.rs index 3ae8507..9d719ab 100644 --- a/crates/rpc/src/config/chain_notifier.rs +++ b/crates/rpc/src/config/chain_notifier.rs @@ -1,11 +1,14 @@ //! Shared chain state between the node and RPC layer. -use crate::{config::resolve::BlockTags, interest::NewBlockNotification}; +use crate::{ + config::resolve::BlockTags, + interest::{ChainEvent, NewBlockNotification, ReorgNotification}, +}; use tokio::sync::broadcast; /// Shared chain state between the node and RPC layer. /// -/// Combines block tag tracking and new-block notification into a single +/// Combines block tag tracking and chain event notification into a single /// unit that both the node and RPC context hold. Cloning is cheap — all /// fields are backed by `Arc`. /// @@ -23,7 +26,7 @@ use tokio::sync::broadcast; #[derive(Debug, Clone)] pub struct ChainNotifier { tags: BlockTags, - notif_tx: broadcast::Sender, + notif_tx: broadcast::Sender, } impl ChainNotifier { @@ -45,22 +48,43 @@ impl ChainNotifier { /// Returns `Ok(receiver_count)` or `Err` if there are no active /// receivers (which is not usually an error condition). #[allow(clippy::result_large_err)] - pub fn send_notification( + pub fn send_new_block( &self, notif: NewBlockNotification, - ) -> Result> { - self.notif_tx.send(notif) + ) -> Result> { + self.send_event(ChainEvent::NewBlock(Box::new(notif))) } - /// Subscribe to new block notifications. - pub fn subscribe(&self) -> broadcast::Receiver { + /// Send a reorg notification. + /// + /// Returns `Ok(receiver_count)` or `Err` if there are no active + /// receivers (which is not usually an error condition). + #[allow(clippy::result_large_err)] + pub fn send_reorg( + &self, + notif: ReorgNotification, + ) -> Result> { + self.send_event(ChainEvent::Reorg(notif)) + } + + /// Send a chain event to subscribers. + #[allow(clippy::result_large_err)] + fn send_event( + &self, + event: ChainEvent, + ) -> Result> { + self.notif_tx.send(event) + } + + /// Subscribe to chain events. + pub fn subscribe(&self) -> broadcast::Receiver { self.notif_tx.subscribe() } /// Get a clone of the broadcast sender. /// /// Used by the subscription manager to create its own receiver. - pub fn notif_sender(&self) -> broadcast::Sender { + pub fn notif_sender(&self) -> broadcast::Sender { self.notif_tx.clone() } } diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 644d64b..27a1d53 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -56,3 +56,26 @@ pub struct NewBlockNotification { /// Receipts for the block. pub receipts: Vec, } + +/// A chain event broadcast to subscribers. +/// +/// Wraps block notifications and reorg notifications into a single +/// enum so consumers can handle both through one channel. +#[derive(Debug, Clone)] +pub enum ChainEvent { + /// A new block has been added to the chain. + NewBlock(Box), + /// A chain reorganization has occurred. + Reorg(ReorgNotification), +} + +/// Notification sent when a chain reorganization is detected. +#[derive(Debug, Clone)] +pub struct ReorgNotification { + /// The block number of the common ancestor (last block still valid). + pub common_ancestor: u64, + /// Hashes of the removed blocks. + pub removed_hashes: Vec, + /// Logs from the removed blocks (needed for `removed: true` emission). + pub removed_logs: Vec, +} diff --git a/crates/rpc/src/interest/subs.rs b/crates/rpc/src/interest/subs.rs index 9c1bada..83edde1 100644 --- a/crates/rpc/src/interest/subs.rs +++ b/crates/rpc/src/interest/subs.rs @@ -1,7 +1,7 @@ //! Subscription management for `eth_subscribe` / `eth_unsubscribe`. use crate::interest::{ - InterestKind, NewBlockNotification, + ChainEvent, InterestKind, buffer::{EventBuffer, EventItem}, }; use ajj::HandlerCtx; @@ -71,7 +71,7 @@ impl SubscriptionManager { /// Instantiate a new subscription manager, start a task to clean up /// subscriptions cancelled by user disconnection. pub(crate) fn new( - notif_sender: broadcast::Sender, + notif_sender: broadcast::Sender, clean_interval: Duration, ) -> Self { let inner = Arc::new(SubscriptionManagerInner::new(notif_sender)); @@ -100,12 +100,12 @@ impl core::fmt::Debug for SubscriptionManager { pub(crate) struct SubscriptionManagerInner { next_id: AtomicU64, tasks: DashMap, - notif_sender: broadcast::Sender, + notif_sender: broadcast::Sender, } impl SubscriptionManagerInner { /// Create a new subscription manager. - fn new(notif_sender: broadcast::Sender) -> Self { + fn new(notif_sender: broadcast::Sender) -> Self { Self { next_id: AtomicU64::new(1), tasks: DashMap::new(), notif_sender } } @@ -154,7 +154,7 @@ struct SubscriptionTask { id: U64, filter: InterestKind, token: CancellationToken, - notifs: broadcast::Receiver, + notifs: broadcast::Receiver, } impl SubscriptionTask { @@ -219,8 +219,8 @@ impl SubscriptionTask { } } notif_res = notifs.recv() => { - let notif = match notif_res { - Ok(notif) => notif, + let event = match notif_res { + Ok(event) => event, Err(RecvError::Lagged(skipped)) => { trace!(skipped, "missed notifications"); continue; @@ -230,6 +230,11 @@ impl SubscriptionTask { break; } }; + let notif = match event { + ChainEvent::NewBlock(notif) => *notif, + // Reorg handling will be addressed in future PRs (ENG-1968 et al.) + ChainEvent::Reorg(_) => continue, + }; let output = filter.filter_notification_for_sub(¬if); trace!(count = output.len(), "Filter applied to notification"); diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 6b0dffe..3b4e040 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -18,7 +18,7 @@ mod eth; pub use eth::EthError; mod interest; -pub use interest::NewBlockNotification; +pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification}; mod debug; pub use debug::DebugError;