diff --git a/Cargo.toml b/Cargo.toml index 8df690a..37398d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,12 +53,12 @@ signet-tx-cache = "0.16.0-rc.11" signet-types = "0.16.0-rc.11" signet-zenith = "0.16.0-rc.11" signet-journal = "0.16.0-rc.11" -signet-storage = "0.6.4" -signet-cold = "0.6.4" -signet-hot = "0.6.4" -signet-hot-mdbx = "0.6.4" -signet-cold-mdbx = "0.6.4" -signet-storage-types = "0.6.4" +signet-storage = "0.6.5" +signet-cold = "0.6.5" +signet-hot = "0.6.5" +signet-hot-mdbx = "0.6.5" +signet-cold-mdbx = "0.6.5" +signet-storage-types = "0.6.5" # ajj ajj = { version = "0.6.0" } @@ -114,11 +114,6 @@ url = "2.5.4" tempfile = "3.17.0" [patch.crates-io] -signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } -signet-hot = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } -signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } -signet-storage-types = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" } - # signet-bundle = { path = "../sdk/crates/bundle"} # signet-constants = { path = "../sdk/crates/constants"} # signet-evm = { path = "../sdk/crates/evm"} diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index aabf803..f8e2f95 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -15,7 +15,9 @@ use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1}; use signet_evm::EthereumHardfork; use signet_extract::Extractor; use signet_node_config::SignetNodeConfig; -use signet_rpc::{ChainNotifier, NewBlockNotification, ReorgNotification, RpcServerGuard}; +use signet_rpc::{ + ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard, +}; use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage}; use signet_types::{PairedHeights, constants::SignetSystemConstants}; use std::{fmt, sync::Arc}; @@ -363,14 +365,16 @@ where /// Send a reorg notification on the broadcast channel. fn notify_reorg(&self, drained: Vec, common_ancestor: u64) { - let removed_hashes = drained.iter().map(|d| d.header.hash()).collect(); - let removed_logs = drained + let removed_blocks = drained .into_iter() - .flat_map(|d| d.receipts) - .flat_map(|r| r.receipt.logs) - .map(|l| l.inner) + .map(|d| { + let header = d.header.into_inner(); + let logs = + d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); + RemovedBlock { header, logs } + }) .collect(); - let notif = ReorgNotification { common_ancestor, removed_hashes, removed_logs }; + let notif = ReorgNotification { common_ancestor, removed_blocks }; // Ignore send errors — no subscribers is fine. let _ = self.chain.send_reorg(notif); } diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f883e5a..f0ed907 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -1,6 +1,8 @@ //! Filter kinds for subscriptions and polling filters. -use crate::interest::{NewBlockNotification, filters::FilterOutput, subs::SubscriptionBuffer}; +use crate::interest::{ + NewBlockNotification, ReorgNotification, filters::FilterOutput, subs::SubscriptionBuffer, +}; use alloy::rpc::types::{Filter, Header, Log}; use std::collections::VecDeque; @@ -94,4 +96,116 @@ impl InterestKind { Self::Block => SubscriptionBuffer::Block(VecDeque::new()), } } + + /// Filter a reorg notification for a subscription, producing a buffer of + /// removed logs (with `removed: true`) that match this filter. + /// + /// Block subscriptions return an empty buffer — the Ethereum JSON-RPC + /// spec does not push removed headers for `newHeads` subscriptions. + pub(crate) fn filter_reorg_for_sub(&self, reorg: ReorgNotification) -> SubscriptionBuffer { + let Some(filter) = self.as_filter() else { + return self.empty_sub_buffer(); + }; + + let logs: VecDeque = reorg + .removed_blocks + .into_iter() + .flat_map(|block| { + let block_hash = block.header.hash_slow(); + let block_number = block.header.number; + let block_timestamp = block.header.timestamp; + block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { + inner: log, + block_hash: Some(block_hash), + block_number: Some(block_number), + block_timestamp: Some(block_timestamp), + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, + }) + }) + .collect(); + + SubscriptionBuffer::Log(logs) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::RemovedBlock; + use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; + + fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { + alloy::primitives::Log { + address: addr, + data: LogData::new_unchecked(vec![topic], Bytes::new()), + } + } + + fn test_header(number: u64) -> alloy::consensus::Header { + alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } + } + + fn test_filter(addr: Address) -> Filter { + Filter::new().address(addr) + } + + #[test] + fn filter_reorg_for_sub_matches_logs() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + + let header = test_header(11); + let kind = InterestKind::Log(Box::new(test_filter(addr))); + let reorg = ReorgNotification { + common_ancestor: 10, + removed_blocks: vec![RemovedBlock { + header: header.clone(), + logs: vec![test_log(addr, topic)], + }], + }; + + let buf = kind.filter_reorg_for_sub(reorg); + let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") }; + + assert_eq!(logs.len(), 1); + assert!(logs[0].removed); + assert_eq!(logs[0].inner.address, addr); + assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); + assert_eq!(logs[0].block_number.unwrap(), 11); + assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); + } + + #[test] + fn filter_reorg_for_sub_filters_non_matching() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let other = address!("0x0000000000000000000000000000000000000002"); + let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + + let kind = InterestKind::Log(Box::new(test_filter(addr))); + let reorg = ReorgNotification { + common_ancestor: 10, + removed_blocks: vec![RemovedBlock { + header: test_header(11), + logs: vec![test_log(other, topic)], + }], + }; + + let buf = kind.filter_reorg_for_sub(reorg); + let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") }; + assert!(logs.is_empty()); + } + + #[test] + fn filter_reorg_for_sub_block_returns_empty() { + let reorg = ReorgNotification { + common_ancestor: 10, + removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], + }; + + let buf = InterestKind::Block.filter_reorg_for_sub(reorg); + assert!(buf.is_empty()); + } } diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 27a1d53..3ad2620 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -69,13 +69,20 @@ pub enum ChainEvent { Reorg(ReorgNotification), } +/// Data from a single block removed during a chain reorganization. +#[derive(Debug, Clone)] +pub struct RemovedBlock { + /// The header of the removed block. + pub header: alloy::consensus::Header, + /// Logs emitted by the removed block. + pub logs: Vec, +} + /// Notification sent when a chain reorganization is detected. #[derive(Debug, Clone)] pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Hashes of the removed blocks. - pub removed_hashes: Vec, - /// Logs from the removed blocks (needed for `removed: true` emission). - pub removed_logs: Vec, + /// Blocks removed by the reorg, each carrying its header and logs. + pub removed_blocks: Vec, } diff --git a/crates/rpc/src/interest/subs.rs b/crates/rpc/src/interest/subs.rs index 83edde1..0e0511e 100644 --- a/crates/rpc/src/interest/subs.rs +++ b/crates/rpc/src/interest/subs.rs @@ -232,8 +232,14 @@ impl SubscriptionTask { }; let notif = match event { ChainEvent::NewBlock(notif) => *notif, - // Reorg handling will be addressed in future PRs (ENG-1968 et al.) - ChainEvent::Reorg(_) => continue, + ChainEvent::Reorg(reorg) => { + let output = filter.filter_reorg_for_sub(reorg); + trace!(count = output.len(), "Reorg filter applied"); + if !output.is_empty() { + notif_buffer.extend(output); + } + continue; + } }; let output = filter.filter_notification_for_sub(¬if); diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 3b4e040..2aaf164 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -18,7 +18,7 @@ mod eth; pub use eth::EthError; mod interest; -pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification}; +pub use interest::{ChainEvent, NewBlockNotification, RemovedBlock, ReorgNotification}; mod debug; pub use debug::DebugError;