From 2d410c5bda1c42e9ecca07dff90538e128e92e4f Mon Sep 17 00:00:00 2001 From: James Date: Mon, 9 Mar 2026 11:37:53 -0400 Subject: [PATCH 1/2] feat: introduce ChainEvent enum and update ChainNotifier broadcast type Wrap NewBlockNotification in a ChainEvent enum so the broadcast channel can carry both new-block and reorg notifications. Subscribers destructure the enum and skip Reorg events for now (handling comes in later tickets). Refs: ENG-1967 Co-Authored-By: Claude Opus 4.6 --- crates/node/src/node.rs | 4 ++-- crates/rpc/src/config/chain_notifier.rs | 24 ++++++++++++------------ crates/rpc/src/interest/mod.rs | 23 +++++++++++++++++++++++ crates/rpc/src/interest/subs.rs | 18 +++++++++++------- crates/rpc/src/lib.rs | 2 +- 5 files changed, 49 insertions(+), 22 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 078b4b8..4971be6 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -15,7 +15,7 @@ use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1}; use signet_evm::EthereumHardfork; use signet_extract::Extractor; use signet_node_config::SignetNodeConfig; -use signet_rpc::{ChainNotifier, NewBlockNotification, RpcServerGuard}; +use signet_rpc::{ChainEvent, ChainNotifier, NewBlockNotification, RpcServerGuard}; use signet_storage::{HistoryRead, HotKv, HotKvRead, UnifiedStorage}; use signet_types::{PairedHeights, constants::SignetSystemConstants}; use std::{fmt, sync::Arc}; @@ -357,7 +357,7 @@ where receipts: block.receipts.clone(), }; // Ignore send errors — no subscribers is fine. - let _ = self.chain.send_notification(notif); + let _ = self.chain.send_event(ChainEvent::NewBlock(Box::new(notif))); } /// Update the status channel and block tags. This keeps the RPC node diff --git a/crates/rpc/src/config/chain_notifier.rs b/crates/rpc/src/config/chain_notifier.rs index 3ae8507..94fd2d0 100644 --- a/crates/rpc/src/config/chain_notifier.rs +++ b/crates/rpc/src/config/chain_notifier.rs @@ -1,18 +1,18 @@ //! Shared chain state between the node and RPC layer. -use crate::{config::resolve::BlockTags, interest::NewBlockNotification}; +use crate::{config::resolve::BlockTags, interest::ChainEvent}; use tokio::sync::broadcast; /// Shared chain state between the node and RPC layer. /// -/// Combines block tag tracking and new-block notification into a single +/// Combines block tag tracking and chain event notification into a single /// unit that both the node and RPC context hold. Cloning is cheap — all /// fields are backed by `Arc`. /// /// # Construction /// /// ``` -/// use signet_rpc::ChainNotifier; +/// use signet_rpc::{ChainNotifier, ChainEvent, NewBlockNotification}; /// /// let notifier = ChainNotifier::new(128); /// assert_eq!(notifier.tags().latest(), 0); @@ -23,7 +23,7 @@ use tokio::sync::broadcast; #[derive(Debug, Clone)] pub struct ChainNotifier { tags: BlockTags, - notif_tx: broadcast::Sender, + notif_tx: broadcast::Sender, } impl ChainNotifier { @@ -40,27 +40,27 @@ impl ChainNotifier { &self.tags } - /// Send a new block notification. + /// Send a chain event to subscribers. /// /// Returns `Ok(receiver_count)` or `Err` if there are no active /// receivers (which is not usually an error condition). #[allow(clippy::result_large_err)] - pub fn send_notification( + pub fn send_event( &self, - notif: NewBlockNotification, - ) -> Result> { - self.notif_tx.send(notif) + event: ChainEvent, + ) -> Result> { + self.notif_tx.send(event) } - /// Subscribe to new block notifications. - pub fn subscribe(&self) -> broadcast::Receiver { + /// Subscribe to chain events. + pub fn subscribe(&self) -> broadcast::Receiver { self.notif_tx.subscribe() } /// Get a clone of the broadcast sender. /// /// Used by the subscription manager to create its own receiver. - pub fn notif_sender(&self) -> broadcast::Sender { + pub fn notif_sender(&self) -> broadcast::Sender { self.notif_tx.clone() } } diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 644d64b..27a1d53 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -56,3 +56,26 @@ pub struct NewBlockNotification { /// Receipts for the block. pub receipts: Vec, } + +/// A chain event broadcast to subscribers. +/// +/// Wraps block notifications and reorg notifications into a single +/// enum so consumers can handle both through one channel. +#[derive(Debug, Clone)] +pub enum ChainEvent { + /// A new block has been added to the chain. + NewBlock(Box), + /// A chain reorganization has occurred. + Reorg(ReorgNotification), +} + +/// Notification sent when a chain reorganization is detected. +#[derive(Debug, Clone)] +pub struct ReorgNotification { + /// The block number of the common ancestor (last block still valid). + pub common_ancestor: u64, + /// Hashes of the removed blocks. + pub removed_hashes: Vec, + /// Logs from the removed blocks (needed for `removed: true` emission). + pub removed_logs: Vec, +} diff --git a/crates/rpc/src/interest/subs.rs b/crates/rpc/src/interest/subs.rs index 9c1bada..1a59881 100644 --- a/crates/rpc/src/interest/subs.rs +++ b/crates/rpc/src/interest/subs.rs @@ -1,7 +1,7 @@ //! Subscription management for `eth_subscribe` / `eth_unsubscribe`. use crate::interest::{ - InterestKind, NewBlockNotification, + ChainEvent, InterestKind, buffer::{EventBuffer, EventItem}, }; use ajj::HandlerCtx; @@ -71,7 +71,7 @@ impl SubscriptionManager { /// Instantiate a new subscription manager, start a task to clean up /// subscriptions cancelled by user disconnection. pub(crate) fn new( - notif_sender: broadcast::Sender, + notif_sender: broadcast::Sender, clean_interval: Duration, ) -> Self { let inner = Arc::new(SubscriptionManagerInner::new(notif_sender)); @@ -100,12 +100,12 @@ impl core::fmt::Debug for SubscriptionManager { pub(crate) struct SubscriptionManagerInner { next_id: AtomicU64, tasks: DashMap, - notif_sender: broadcast::Sender, + notif_sender: broadcast::Sender, } impl SubscriptionManagerInner { /// Create a new subscription manager. - fn new(notif_sender: broadcast::Sender) -> Self { + fn new(notif_sender: broadcast::Sender) -> Self { Self { next_id: AtomicU64::new(1), tasks: DashMap::new(), notif_sender } } @@ -154,7 +154,7 @@ struct SubscriptionTask { id: U64, filter: InterestKind, token: CancellationToken, - notifs: broadcast::Receiver, + notifs: broadcast::Receiver, } impl SubscriptionTask { @@ -219,8 +219,8 @@ impl SubscriptionTask { } } notif_res = notifs.recv() => { - let notif = match notif_res { - Ok(notif) => notif, + let event = match notif_res { + Ok(event) => event, Err(RecvError::Lagged(skipped)) => { trace!(skipped, "missed notifications"); continue; @@ -230,6 +230,10 @@ impl SubscriptionTask { break; } }; + let notif = match event { + ChainEvent::NewBlock(notif) => *notif, + ChainEvent::Reorg(_) => continue, + }; let output = filter.filter_notification_for_sub(¬if); trace!(count = output.len(), "Filter applied to notification"); diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 6b0dffe..3b4e040 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -18,7 +18,7 @@ mod eth; pub use eth::EthError; mod interest; -pub use interest::NewBlockNotification; +pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification}; mod debug; pub use debug::DebugError; From 2048d52ad91aec347dabba2151a68011c0bc72c7 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 9 Mar 2026 11:51:01 -0400 Subject: [PATCH 2/2] fix: encapsulate ChainEvent construction in ChainNotifier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR review feedback: - Add public `send_new_block` that wraps internally - Add public `send_reorg` for reorg notifications - Make `send_event` private — callers use typed convenience methods - Add comment on Reorg skip noting future PR coverage (ENG-1968) Co-Authored-By: Claude Opus 4.6 --- crates/node/src/node.rs | 4 ++-- crates/rpc/src/config/chain_notifier.rs | 32 +++++++++++++++++++++---- crates/rpc/src/interest/subs.rs | 1 + 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 4971be6..f27a61b 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -15,7 +15,7 @@ use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1}; use signet_evm::EthereumHardfork; use signet_extract::Extractor; use signet_node_config::SignetNodeConfig; -use signet_rpc::{ChainEvent, ChainNotifier, NewBlockNotification, RpcServerGuard}; +use signet_rpc::{ChainNotifier, NewBlockNotification, RpcServerGuard}; use signet_storage::{HistoryRead, HotKv, HotKvRead, UnifiedStorage}; use signet_types::{PairedHeights, constants::SignetSystemConstants}; use std::{fmt, sync::Arc}; @@ -357,7 +357,7 @@ where receipts: block.receipts.clone(), }; // Ignore send errors — no subscribers is fine. - let _ = self.chain.send_event(ChainEvent::NewBlock(Box::new(notif))); + let _ = self.chain.send_new_block(notif); } /// Update the status channel and block tags. This keeps the RPC node diff --git a/crates/rpc/src/config/chain_notifier.rs b/crates/rpc/src/config/chain_notifier.rs index 94fd2d0..9d719ab 100644 --- a/crates/rpc/src/config/chain_notifier.rs +++ b/crates/rpc/src/config/chain_notifier.rs @@ -1,6 +1,9 @@ //! Shared chain state between the node and RPC layer. -use crate::{config::resolve::BlockTags, interest::ChainEvent}; +use crate::{ + config::resolve::BlockTags, + interest::{ChainEvent, NewBlockNotification, ReorgNotification}, +}; use tokio::sync::broadcast; /// Shared chain state between the node and RPC layer. @@ -12,7 +15,7 @@ use tokio::sync::broadcast; /// # Construction /// /// ``` -/// use signet_rpc::{ChainNotifier, ChainEvent, NewBlockNotification}; +/// use signet_rpc::ChainNotifier; /// /// let notifier = ChainNotifier::new(128); /// assert_eq!(notifier.tags().latest(), 0); @@ -40,12 +43,33 @@ impl ChainNotifier { &self.tags } - /// Send a chain event to subscribers. + /// Send a new block notification. /// /// Returns `Ok(receiver_count)` or `Err` if there are no active /// receivers (which is not usually an error condition). #[allow(clippy::result_large_err)] - pub fn send_event( + pub fn send_new_block( + &self, + notif: NewBlockNotification, + ) -> Result> { + self.send_event(ChainEvent::NewBlock(Box::new(notif))) + } + + /// Send a reorg notification. + /// + /// Returns `Ok(receiver_count)` or `Err` if there are no active + /// receivers (which is not usually an error condition). + #[allow(clippy::result_large_err)] + pub fn send_reorg( + &self, + notif: ReorgNotification, + ) -> Result> { + self.send_event(ChainEvent::Reorg(notif)) + } + + /// Send a chain event to subscribers. + #[allow(clippy::result_large_err)] + fn send_event( &self, event: ChainEvent, ) -> Result> { diff --git a/crates/rpc/src/interest/subs.rs b/crates/rpc/src/interest/subs.rs index 1a59881..83edde1 100644 --- a/crates/rpc/src/interest/subs.rs +++ b/crates/rpc/src/interest/subs.rs @@ -232,6 +232,7 @@ impl SubscriptionTask { }; let notif = match event { ChainEvent::NewBlock(notif) => *notif, + // Reorg handling will be addressed in future PRs (ENG-1968 et al.) ChainEvent::Reorg(_) => continue, };