Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ where
receipts: block.receipts.clone(),
};
// Ignore send errors — no subscribers is fine.
let _ = self.chain.send_notification(notif);
let _ = self.chain.send_new_block(notif);
}

/// Update the status channel and block tags. This keeps the RPC node
Expand Down
42 changes: 33 additions & 9 deletions crates/rpc/src/config/chain_notifier.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! Shared chain state between the node and RPC layer.

use crate::{config::resolve::BlockTags, interest::NewBlockNotification};
use crate::{
config::resolve::BlockTags,
interest::{ChainEvent, NewBlockNotification, ReorgNotification},
};
use tokio::sync::broadcast;

/// Shared chain state between the node and RPC layer.
///
/// Combines block tag tracking and new-block notification into a single
/// Combines block tag tracking and chain event notification into a single
/// unit that both the node and RPC context hold. Cloning is cheap — all
/// fields are backed by `Arc`.
///
Expand All @@ -23,7 +26,7 @@ use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub struct ChainNotifier {
tags: BlockTags,
notif_tx: broadcast::Sender<NewBlockNotification>,
notif_tx: broadcast::Sender<ChainEvent>,
}

impl ChainNotifier {
Expand All @@ -45,22 +48,43 @@ impl ChainNotifier {
/// Returns `Ok(receiver_count)` or `Err` if there are no active
/// receivers (which is not usually an error condition).
#[allow(clippy::result_large_err)]
pub fn send_notification(
pub fn send_new_block(
&self,
notif: NewBlockNotification,
) -> Result<usize, broadcast::error::SendError<NewBlockNotification>> {
self.notif_tx.send(notif)
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
self.send_event(ChainEvent::NewBlock(Box::new(notif)))
}

/// Subscribe to new block notifications.
pub fn subscribe(&self) -> broadcast::Receiver<NewBlockNotification> {
/// Send a reorg notification.
///
/// Returns `Ok(receiver_count)` or `Err` if there are no active
/// receivers (which is not usually an error condition).
#[allow(clippy::result_large_err)]
pub fn send_reorg(
&self,
notif: ReorgNotification,
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
self.send_event(ChainEvent::Reorg(notif))
}

/// Send a chain event to subscribers.
#[allow(clippy::result_large_err)]
fn send_event(
&self,
event: ChainEvent,
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
self.notif_tx.send(event)
}

/// Subscribe to chain events.
pub fn subscribe(&self) -> broadcast::Receiver<ChainEvent> {
self.notif_tx.subscribe()
}

/// Get a clone of the broadcast sender.
///
/// Used by the subscription manager to create its own receiver.
pub fn notif_sender(&self) -> broadcast::Sender<NewBlockNotification> {
pub fn notif_sender(&self) -> broadcast::Sender<ChainEvent> {
self.notif_tx.clone()
}
}
23 changes: 23 additions & 0 deletions crates/rpc/src/interest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,26 @@ pub struct NewBlockNotification {
/// Receipts for the block.
pub receipts: Vec<signet_storage_types::Receipt>,
}

/// A chain event broadcast to subscribers.
///
/// Wraps block notifications and reorg notifications into a single
/// enum so consumers can handle both through one channel.
#[derive(Debug, Clone)]
pub enum ChainEvent {
/// A new block has been added to the chain.
NewBlock(Box<NewBlockNotification>),
/// A chain reorganization has occurred.
Reorg(ReorgNotification),
}

/// Notification sent when a chain reorganization is detected.
#[derive(Debug, Clone)]
pub struct ReorgNotification {
/// The block number of the common ancestor (last block still valid).
pub common_ancestor: u64,
/// Hashes of the removed blocks.
pub removed_hashes: Vec<alloy::primitives::B256>,
/// Logs from the removed blocks (needed for `removed: true` emission).
pub removed_logs: Vec<alloy::primitives::Log>,
}
19 changes: 12 additions & 7 deletions crates/rpc/src/interest/subs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Subscription management for `eth_subscribe` / `eth_unsubscribe`.

use crate::interest::{
InterestKind, NewBlockNotification,
ChainEvent, InterestKind,
buffer::{EventBuffer, EventItem},
};
use ajj::HandlerCtx;
Expand Down Expand Up @@ -71,7 +71,7 @@ impl SubscriptionManager {
/// Instantiate a new subscription manager, start a task to clean up
/// subscriptions cancelled by user disconnection.
pub(crate) fn new(
notif_sender: broadcast::Sender<NewBlockNotification>,
notif_sender: broadcast::Sender<ChainEvent>,
clean_interval: Duration,
) -> Self {
let inner = Arc::new(SubscriptionManagerInner::new(notif_sender));
Expand Down Expand Up @@ -100,12 +100,12 @@ impl core::fmt::Debug for SubscriptionManager {
pub(crate) struct SubscriptionManagerInner {
next_id: AtomicU64,
tasks: DashMap<U64, CancellationToken>,
notif_sender: broadcast::Sender<NewBlockNotification>,
notif_sender: broadcast::Sender<ChainEvent>,
}

impl SubscriptionManagerInner {
/// Create a new subscription manager.
fn new(notif_sender: broadcast::Sender<NewBlockNotification>) -> Self {
fn new(notif_sender: broadcast::Sender<ChainEvent>) -> Self {
Self { next_id: AtomicU64::new(1), tasks: DashMap::new(), notif_sender }
}

Expand Down Expand Up @@ -154,7 +154,7 @@ struct SubscriptionTask {
id: U64,
filter: InterestKind,
token: CancellationToken,
notifs: broadcast::Receiver<NewBlockNotification>,
notifs: broadcast::Receiver<ChainEvent>,
}

impl SubscriptionTask {
Expand Down Expand Up @@ -219,8 +219,8 @@ impl SubscriptionTask {
}
}
notif_res = notifs.recv() => {
let notif = match notif_res {
Ok(notif) => notif,
let event = match notif_res {
Ok(event) => event,
Err(RecvError::Lagged(skipped)) => {
trace!(skipped, "missed notifications");
continue;
Expand All @@ -230,6 +230,11 @@ impl SubscriptionTask {
break;
}
};
let notif = match event {
ChainEvent::NewBlock(notif) => *notif,
// Reorg handling will be addressed in future PRs (ENG-1968 et al.)
ChainEvent::Reorg(_) => continue,
};

let output = filter.filter_notification_for_sub(&notif);
trace!(count = output.len(), "Filter applied to notification");
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod eth;
pub use eth::EthError;

mod interest;
pub use interest::NewBlockNotification;
pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification};

mod debug;
pub use debug::DebugError;
Expand Down