From 62016a2e860d8c7936a26eef553e88cfb3db71de Mon Sep 17 00:00:00 2001 From: James Date: Mon, 9 Mar 2026 14:40:21 -0400 Subject: [PATCH 1/3] feat: handle ChainEvent::Reorg in SubscriptionTask with removed log emission Replace the no-op reorg arm in SubscriptionTask::task_future with proper handling that filters removed logs against subscription criteria and emits them with `removed: true` per the Ethereum JSON-RPC spec. Co-Authored-By: Claude Opus 4.6 --- crates/rpc/src/interest/kind.rs | 104 +++++++++++++++++++++++++++++++- crates/rpc/src/interest/subs.rs | 10 ++- 2 files changed, 111 insertions(+), 3 deletions(-) diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f883e5a..c26a6a6 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -1,6 +1,8 @@ //! Filter kinds for subscriptions and polling filters. -use crate::interest::{NewBlockNotification, filters::FilterOutput, subs::SubscriptionBuffer}; +use crate::interest::{ + NewBlockNotification, ReorgNotification, filters::FilterOutput, subs::SubscriptionBuffer, +}; use alloy::rpc::types::{Filter, Header, Log}; use std::collections::VecDeque; @@ -94,4 +96,104 @@ impl InterestKind { Self::Block => SubscriptionBuffer::Block(VecDeque::new()), } } + + /// Filter a reorg notification for a subscription, producing a buffer of + /// removed logs (with `removed: true`) that match this filter. + /// + /// Block subscriptions return an empty buffer — removed block headers + /// are not available from the reorg notification. + pub(crate) fn filter_reorg_for_sub(&self, reorg: &ReorgNotification) -> SubscriptionBuffer { + let filter = match self.as_filter() { + Some(f) => f, + None => return self.empty_sub_buffer(), + }; + + let logs: VecDeque = reorg + .removed_logs + .iter() + .filter(|log| filter.matches(log)) + .map(|log| Log { + inner: log.clone(), + block_hash: None, + block_number: None, + block_timestamp: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, + }) + .collect(); + + SubscriptionBuffer::Log(logs) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; + + fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { + alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![topic], Bytes::new()), + } + } + + fn test_filter(addr: Address) -> Filter { + Filter::new().address(addr) + } + + #[test] + fn filter_reorg_for_sub_matches_logs() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + + let kind = InterestKind::Log(Box::new(test_filter(addr))); + let reorg = ReorgNotification { + common_ancestor: 10, + removed_hashes: vec![], + removed_logs: vec![test_log(addr, topic)], + }; + + let buf = kind.filter_reorg_for_sub(&reorg); + let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") }; + + assert_eq!(logs.len(), 1); + assert!(logs[0].removed); + assert_eq!(logs[0].inner.address, addr); + assert!(logs[0].block_hash.is_none()); + } + + #[test] + fn filter_reorg_for_sub_filters_non_matching() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let other = address!("0x0000000000000000000000000000000000000002"); + let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + + let kind = InterestKind::Log(Box::new(test_filter(addr))); + let reorg = ReorgNotification { + common_ancestor: 10, + removed_hashes: vec![], + removed_logs: vec![test_log(other, topic)], + }; + + let buf = kind.filter_reorg_for_sub(&reorg); + let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") }; + assert!(logs.is_empty()); + } + + #[test] + fn filter_reorg_for_sub_block_returns_empty() { + let reorg = ReorgNotification { + common_ancestor: 10, + removed_hashes: vec![b256!( + "0x0000000000000000000000000000000000000000000000000000000000000001" + )], + removed_logs: vec![], + }; + + let buf = InterestKind::Block.filter_reorg_for_sub(&reorg); + assert!(buf.is_empty()); + } } diff --git a/crates/rpc/src/interest/subs.rs b/crates/rpc/src/interest/subs.rs index 83edde1..0e13755 100644 --- a/crates/rpc/src/interest/subs.rs +++ b/crates/rpc/src/interest/subs.rs @@ -232,8 +232,14 @@ impl SubscriptionTask { }; let notif = match event { ChainEvent::NewBlock(notif) => *notif, - // Reorg handling will be addressed in future PRs (ENG-1968 et al.) - ChainEvent::Reorg(_) => continue, + ChainEvent::Reorg(reorg) => { + let output = filter.filter_reorg_for_sub(&reorg); + trace!(count = output.len(), "Reorg filter applied"); + if !output.is_empty() { + notif_buffer.extend(output); + } + continue; + } }; let output = filter.filter_notification_for_sub(¬if); From 4a63062feb60a846af3bb93bee0868b1ae4cbb35 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 11 Mar 2026 09:12:08 -0400 Subject: [PATCH 2/3] chore: bump signet-storage deps to 0.6.5 and remove patch overrides Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8df690a..37398d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,12 +53,12 @@ signet-tx-cache = "0.16.0-rc.11" signet-types = "0.16.0-rc.11" signet-zenith = "0.16.0-rc.11" signet-journal = "0.16.0-rc.11" -signet-storage = "0.6.4" -signet-cold = "0.6.4" -signet-hot = "0.6.4" -signet-hot-mdbx = "0.6.4" -signet-cold-mdbx = "0.6.4" -signet-storage-types = "0.6.4" +signet-storage = "0.6.5" +signet-cold = "0.6.5" +signet-hot = "0.6.5" +signet-hot-mdbx = "0.6.5" +signet-cold-mdbx = "0.6.5" +signet-storage-types = "0.6.5" # ajj ajj = { version = "0.6.0" } @@ -114,11 +114,6 @@ url = "2.5.4" tempfile = "3.17.0" [patch.crates-io] -signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } -signet-hot = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } -signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } -signet-storage-types = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } - # signet-bundle = { path = "../sdk/crates/bundle"} # signet-constants = { path = "../sdk/crates/constants"} # signet-evm = { path = "../sdk/crates/evm"} From b79a7328720327b489ad4d93ff4dcdbb733ca04b Mon Sep 17 00:00:00 2001 From: James Date: Wed, 11 Mar 2026 11:22:38 -0400 Subject: [PATCH 3/3] refactor: store removed headers in ReorgNotification for block metadata Address review feedback: - Replace `removed_hashes`/`removed_logs` with per-block `RemovedBlock` structs carrying headers, so removed logs include `block_hash`, `block_number`, and `block_timestamp` per the Ethereum JSON-RPC spec. - Use `let else` instead of match for early return (Evalir). - Take `ReorgNotification` by value in `filter_reorg_for_sub` to avoid intermediate `collect` and per-log clones. - Use `into_inner()` on `SealedHeader` instead of `inner().clone()`. Co-Authored-By: Claude Opus 4.6 --- crates/node/src/node.rs | 18 +++++---- crates/rpc/src/interest/kind.rs | 72 +++++++++++++++++++-------------- crates/rpc/src/interest/mod.rs | 15 +++++-- crates/rpc/src/interest/subs.rs | 2 +- crates/rpc/src/lib.rs | 2 +- 5 files changed, 66 insertions(+), 43 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index aabf803..f8e2f95 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -15,7 +15,9 @@ use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1}; use signet_evm::EthereumHardfork; use signet_extract::Extractor; use signet_node_config::SignetNodeConfig; -use signet_rpc::{ChainNotifier, NewBlockNotification, ReorgNotification, RpcServerGuard}; +use signet_rpc::{ + ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard, +}; use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage}; use signet_types::{PairedHeights, constants::SignetSystemConstants}; use std::{fmt, sync::Arc}; @@ -363,14 +365,16 @@ where /// Send a reorg notification on the broadcast channel. fn notify_reorg(&self, drained: Vec, common_ancestor: u64) { - let removed_hashes = drained.iter().map(|d| d.header.hash()).collect(); - let removed_logs = drained + let removed_blocks = drained .into_iter() - .flat_map(|d| d.receipts) - .flat_map(|r| r.receipt.logs) - .map(|l| l.inner) + .map(|d| { + let header = d.header.into_inner(); + let logs = + d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); + RemovedBlock { header, logs } + }) .collect(); - let notif = ReorgNotification { common_ancestor, removed_hashes, removed_logs }; + let notif = ReorgNotification { common_ancestor, removed_blocks }; // Ignore send errors — no subscribers is fine. let _ = self.chain.send_reorg(notif); } diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index c26a6a6..f0ed907 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -100,27 +100,30 @@ impl InterestKind { /// Filter a reorg notification for a subscription, producing a buffer of /// removed logs (with `removed: true`) that match this filter. /// - /// Block subscriptions return an empty buffer — removed block headers - /// are not available from the reorg notification. - pub(crate) fn filter_reorg_for_sub(&self, reorg: &ReorgNotification) -> SubscriptionBuffer { - let filter = match self.as_filter() { - Some(f) => f, - None => return self.empty_sub_buffer(), + /// Block subscriptions return an empty buffer — the Ethereum JSON-RPC + /// spec does not push removed headers for `newHeads` subscriptions. + pub(crate) fn filter_reorg_for_sub(&self, reorg: ReorgNotification) -> SubscriptionBuffer { + let Some(filter) = self.as_filter() else { + return self.empty_sub_buffer(); }; let logs: VecDeque = reorg - .removed_logs - .iter() - .filter(|log| filter.matches(log)) - .map(|log| Log { - inner: log.clone(), - block_hash: None, - block_number: None, - block_timestamp: None, - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, + .removed_blocks + .into_iter() + .flat_map(|block| { + let block_hash = block.header.hash_slow(); + let block_number = block.header.number; + let block_timestamp = block.header.timestamp; + block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { + inner: log, + block_hash: Some(block_hash), + block_number: Some(block_number), + block_timestamp: Some(block_timestamp), + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, + }) }) .collect(); @@ -131,6 +134,7 @@ impl InterestKind { #[cfg(test)] mod tests { use super::*; + use crate::interest::RemovedBlock; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { @@ -140,6 +144,10 @@ mod tests { } } + fn test_header(number: u64) -> alloy::consensus::Header { + alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } + } + fn test_filter(addr: Address) -> Filter { Filter::new().address(addr) } @@ -149,20 +157,25 @@ mod tests { let addr = address!("0x0000000000000000000000000000000000000001"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let header = test_header(11); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_hashes: vec![], - removed_logs: vec![test_log(addr, topic)], + removed_blocks: vec![RemovedBlock { + header: header.clone(), + logs: vec![test_log(addr, topic)], + }], }; - let buf = kind.filter_reorg_for_sub(&reorg); + let buf = kind.filter_reorg_for_sub(reorg); let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") }; assert_eq!(logs.len(), 1); assert!(logs[0].removed); assert_eq!(logs[0].inner.address, addr); - assert!(logs[0].block_hash.is_none()); + assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); + assert_eq!(logs[0].block_number.unwrap(), 11); + assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); } #[test] @@ -174,11 +187,13 @@ mod tests { let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_hashes: vec![], - removed_logs: vec![test_log(other, topic)], + removed_blocks: vec![RemovedBlock { + header: test_header(11), + logs: vec![test_log(other, topic)], + }], }; - let buf = kind.filter_reorg_for_sub(&reorg); + let buf = kind.filter_reorg_for_sub(reorg); let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") }; assert!(logs.is_empty()); } @@ -187,13 +202,10 @@ mod tests { fn filter_reorg_for_sub_block_returns_empty() { let reorg = ReorgNotification { common_ancestor: 10, - removed_hashes: vec![b256!( - "0x0000000000000000000000000000000000000000000000000000000000000001" - )], - removed_logs: vec![], + removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], }; - let buf = InterestKind::Block.filter_reorg_for_sub(&reorg); + let buf = InterestKind::Block.filter_reorg_for_sub(reorg); assert!(buf.is_empty()); } } diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 27a1d53..3ad2620 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -69,13 +69,20 @@ pub enum ChainEvent { Reorg(ReorgNotification), } +/// Data from a single block removed during a chain reorganization. +#[derive(Debug, Clone)] +pub struct RemovedBlock { + /// The header of the removed block. + pub header: alloy::consensus::Header, + /// Logs emitted by the removed block. + pub logs: Vec, +} + /// Notification sent when a chain reorganization is detected. #[derive(Debug, Clone)] pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Hashes of the removed blocks. - pub removed_hashes: Vec, - /// Logs from the removed blocks (needed for `removed: true` emission). - pub removed_logs: Vec, + /// Blocks removed by the reorg, each carrying its header and logs. + pub removed_blocks: Vec, } diff --git a/crates/rpc/src/interest/subs.rs b/crates/rpc/src/interest/subs.rs index 0e13755..0e0511e 100644 --- a/crates/rpc/src/interest/subs.rs +++ b/crates/rpc/src/interest/subs.rs @@ -233,7 +233,7 @@ impl SubscriptionTask { let notif = match event { ChainEvent::NewBlock(notif) => *notif, ChainEvent::Reorg(reorg) => { - let output = filter.filter_reorg_for_sub(&reorg); + let output = filter.filter_reorg_for_sub(reorg); trace!(count = output.len(), "Reorg filter applied"); if !output.is_empty() { notif_buffer.extend(output); diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 3b4e040..2aaf164 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -18,7 +18,7 @@ mod eth; pub use eth::EthError; mod interest; -pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification}; +pub use interest::{ChainEvent, NewBlockNotification, RemovedBlock, ReorgNotification}; mod debug; pub use debug::DebugError;