Skip to content

Commit e6c8b7e

Browse files
prestwichclaude
andauthored
feat: introduce ChainEvent enum and update ChainNotifier broadcast type (#94)
* feat: introduce ChainEvent enum and update ChainNotifier broadcast type Wrap NewBlockNotification in a ChainEvent enum so the broadcast channel can carry both new-block and reorg notifications. Subscribers destructure the enum and skip Reorg events for now (handling comes in later tickets). Refs: ENG-1967 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: encapsulate ChainEvent construction in ChainNotifier Address PR review feedback: - Add public `send_new_block` that wraps internally - Add public `send_reorg` for reorg notifications - Make `send_event` private — callers use typed convenience methods - Add comment on Reorg skip noting future PR coverage (ENG-1968) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0b9609c commit e6c8b7e

5 files changed

Lines changed: 70 additions & 18 deletions

File tree

crates/node/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ where
357357
receipts: block.receipts.clone(),
358358
};
359359
// Ignore send errors — no subscribers is fine.
360-
let _ = self.chain.send_notification(notif);
360+
let _ = self.chain.send_new_block(notif);
361361
}
362362

363363
/// Update the status channel and block tags. This keeps the RPC node

crates/rpc/src/config/chain_notifier.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
//! Shared chain state between the node and RPC layer.
22
3-
use crate::{config::resolve::BlockTags, interest::NewBlockNotification};
3+
use crate::{
4+
config::resolve::BlockTags,
5+
interest::{ChainEvent, NewBlockNotification, ReorgNotification},
6+
};
47
use tokio::sync::broadcast;
58

69
/// Shared chain state between the node and RPC layer.
710
///
8-
/// Combines block tag tracking and new-block notification into a single
11+
/// Combines block tag tracking and chain event notification into a single
912
/// unit that both the node and RPC context hold. Cloning is cheap — all
1013
/// fields are backed by `Arc`.
1114
///
@@ -23,7 +26,7 @@ use tokio::sync::broadcast;
2326
#[derive(Debug, Clone)]
2427
pub struct ChainNotifier {
2528
tags: BlockTags,
26-
notif_tx: broadcast::Sender<NewBlockNotification>,
29+
notif_tx: broadcast::Sender<ChainEvent>,
2730
}
2831

2932
impl ChainNotifier {
@@ -45,22 +48,43 @@ impl ChainNotifier {
4548
/// Returns `Ok(receiver_count)` or `Err` if there are no active
4649
/// receivers (which is not usually an error condition).
4750
#[allow(clippy::result_large_err)]
48-
pub fn send_notification(
51+
pub fn send_new_block(
4952
&self,
5053
notif: NewBlockNotification,
51-
) -> Result<usize, broadcast::error::SendError<NewBlockNotification>> {
52-
self.notif_tx.send(notif)
54+
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
55+
self.send_event(ChainEvent::NewBlock(Box::new(notif)))
5356
}
5457

55-
/// Subscribe to new block notifications.
56-
pub fn subscribe(&self) -> broadcast::Receiver<NewBlockNotification> {
58+
/// Send a reorg notification.
59+
///
60+
/// Returns `Ok(receiver_count)` or `Err` if there are no active
61+
/// receivers (which is not usually an error condition).
62+
#[allow(clippy::result_large_err)]
63+
pub fn send_reorg(
64+
&self,
65+
notif: ReorgNotification,
66+
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
67+
self.send_event(ChainEvent::Reorg(notif))
68+
}
69+
70+
/// Send a chain event to subscribers.
71+
#[allow(clippy::result_large_err)]
72+
fn send_event(
73+
&self,
74+
event: ChainEvent,
75+
) -> Result<usize, broadcast::error::SendError<ChainEvent>> {
76+
self.notif_tx.send(event)
77+
}
78+
79+
/// Subscribe to chain events.
80+
pub fn subscribe(&self) -> broadcast::Receiver<ChainEvent> {
5781
self.notif_tx.subscribe()
5882
}
5983

6084
/// Get a clone of the broadcast sender.
6185
///
6286
/// Used by the subscription manager to create its own receiver.
63-
pub fn notif_sender(&self) -> broadcast::Sender<NewBlockNotification> {
87+
pub fn notif_sender(&self) -> broadcast::Sender<ChainEvent> {
6488
self.notif_tx.clone()
6589
}
6690
}

crates/rpc/src/interest/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,26 @@ pub struct NewBlockNotification {
5656
/// Receipts for the block.
5757
pub receipts: Vec<signet_storage_types::Receipt>,
5858
}
59+
60+
/// A chain event broadcast to subscribers.
61+
///
62+
/// Wraps block notifications and reorg notifications into a single
63+
/// enum so consumers can handle both through one channel.
64+
#[derive(Debug, Clone)]
65+
pub enum ChainEvent {
66+
/// A new block has been added to the chain.
67+
NewBlock(Box<NewBlockNotification>),
68+
/// A chain reorganization has occurred.
69+
Reorg(ReorgNotification),
70+
}
71+
72+
/// Notification sent when a chain reorganization is detected.
73+
#[derive(Debug, Clone)]
74+
pub struct ReorgNotification {
75+
/// The block number of the common ancestor (last block still valid).
76+
pub common_ancestor: u64,
77+
/// Hashes of the removed blocks.
78+
pub removed_hashes: Vec<alloy::primitives::B256>,
79+
/// Logs from the removed blocks (needed for `removed: true` emission).
80+
pub removed_logs: Vec<alloy::primitives::Log>,
81+
}

crates/rpc/src/interest/subs.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Subscription management for `eth_subscribe` / `eth_unsubscribe`.
22
33
use crate::interest::{
4-
InterestKind, NewBlockNotification,
4+
ChainEvent, InterestKind,
55
buffer::{EventBuffer, EventItem},
66
};
77
use ajj::HandlerCtx;
@@ -71,7 +71,7 @@ impl SubscriptionManager {
7171
/// Instantiate a new subscription manager, start a task to clean up
7272
/// subscriptions cancelled by user disconnection.
7373
pub(crate) fn new(
74-
notif_sender: broadcast::Sender<NewBlockNotification>,
74+
notif_sender: broadcast::Sender<ChainEvent>,
7575
clean_interval: Duration,
7676
) -> Self {
7777
let inner = Arc::new(SubscriptionManagerInner::new(notif_sender));
@@ -100,12 +100,12 @@ impl core::fmt::Debug for SubscriptionManager {
100100
pub(crate) struct SubscriptionManagerInner {
101101
next_id: AtomicU64,
102102
tasks: DashMap<U64, CancellationToken>,
103-
notif_sender: broadcast::Sender<NewBlockNotification>,
103+
notif_sender: broadcast::Sender<ChainEvent>,
104104
}
105105

106106
impl SubscriptionManagerInner {
107107
/// Create a new subscription manager.
108-
fn new(notif_sender: broadcast::Sender<NewBlockNotification>) -> Self {
108+
fn new(notif_sender: broadcast::Sender<ChainEvent>) -> Self {
109109
Self { next_id: AtomicU64::new(1), tasks: DashMap::new(), notif_sender }
110110
}
111111

@@ -154,7 +154,7 @@ struct SubscriptionTask {
154154
id: U64,
155155
filter: InterestKind,
156156
token: CancellationToken,
157-
notifs: broadcast::Receiver<NewBlockNotification>,
157+
notifs: broadcast::Receiver<ChainEvent>,
158158
}
159159

160160
impl SubscriptionTask {
@@ -219,8 +219,8 @@ impl SubscriptionTask {
219219
}
220220
}
221221
notif_res = notifs.recv() => {
222-
let notif = match notif_res {
223-
Ok(notif) => notif,
222+
let event = match notif_res {
223+
Ok(event) => event,
224224
Err(RecvError::Lagged(skipped)) => {
225225
trace!(skipped, "missed notifications");
226226
continue;
@@ -230,6 +230,11 @@ impl SubscriptionTask {
230230
break;
231231
}
232232
};
233+
let notif = match event {
234+
ChainEvent::NewBlock(notif) => *notif,
235+
// Reorg handling will be addressed in future PRs (ENG-1968 et al.)
236+
ChainEvent::Reorg(_) => continue,
237+
};
233238

234239
let output = filter.filter_notification_for_sub(&notif);
235240
trace!(count = output.len(), "Filter applied to notification");

crates/rpc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod eth;
1818
pub use eth::EthError;
1919

2020
mod interest;
21-
pub use interest::NewBlockNotification;
21+
pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification};
2222

2323
mod debug;
2424
pub use debug::DebugError;

0 commit comments

Comments
 (0)