diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index f8e2f95..b2ddc9a 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -368,10 +368,12 @@ where let removed_blocks = drained .into_iter() .map(|d| { - let header = d.header.into_inner(); + let number = d.header.number(); + let hash = d.header.hash(); + let timestamp = d.header.timestamp(); let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { header, logs } + RemovedBlock { number, hash, timestamp, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/config/ctx.rs b/crates/rpc/src/config/ctx.rs index 5c933ef..166979b 100644 --- a/crates/rpc/src/config/ctx.rs +++ b/crates/rpc/src/config/ctx.rs @@ -101,7 +101,11 @@ impl StorageRpcCtx { config: StorageRpcConfig, ) -> Self { let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); - let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl); + let filter_manager = FilterManager::new( + &chain.notif_sender(), + config.stale_filter_ttl, + config.stale_filter_ttl, + ); let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl); let gas_cache = GasOracleCache::new(); Self { diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 75d9479..6e818e2 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector; use serde::Serialize; use signet_cold::{HeaderSpecifier, ReceiptSpecifier}; use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead}; -use tracing::{Instrument, debug, trace_span}; +use tracing::{Instrument, debug, trace, trace_span}; use trevm::{ EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker, }; @@ -1078,12 +1078,38 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; + // Scan the global reorg ring buffer for notifications received + // since this filter's last poll, then lazily compute removed logs + // and rewind `next_start_block`. + let reorgs = fm.reorgs_since(entry.last_poll_time()); + let removed = entry.compute_removed_logs(&reorgs); + if !removed.is_empty() { + trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); + } + let latest = ctx.tags().latest(); let start = entry.next_start_block(); + // Implicit reorg detection: if latest has moved backward past our + // window, a reorg occurred that we missed (e.g. broadcast lagged). + // Return any removed logs we do have, then reset. + if latest + 1 < start { + trace!(latest, start, "implicit reorg detected, resetting filter"); + entry.touch_poll_time(); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); + } + if start > latest { - entry.mark_polled(latest); - return Ok(entry.empty_output()); + entry.touch_poll_time(); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } let cold = ctx.cold(); @@ -1110,7 +1136,15 @@ where let stream = cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?; - let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + + // Prepend removed logs so the client sees removals before + // the replacement data. + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index ae09367..24fecea 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,18 +1,20 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{InterestKind, buffer::EventBuffer}; +use crate::interest::{ChainEvent, InterestKind, ReorgNotification, buffer::EventBuffer}; use alloy::{ primitives::{B256, U64}, - rpc::types::Filter, + rpc::types::{Filter, Log}, }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ + collections::VecDeque, sync::{ - Arc, Weak, + Arc, RwLock, Weak, atomic::{AtomicU64, Ordering}, }, time::{Duration, Instant}, }; +use tokio::sync::broadcast; use tracing::trace; type FilterId = U64; @@ -20,11 +22,20 @@ type FilterId = U64; /// Output of a polled filter: log entries or block hashes. pub(crate) type FilterOutput = EventBuffer; +/// Maximum number of reorg notifications retained in the global ring +/// buffer. At most one reorg per 12 seconds and a 5-minute stale filter +/// TTL gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; + /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last /// polled, and the first block whose contents should be considered. -#[derive(Debug, Clone, PartialEq, Eq)] +/// +/// `last_poll_time` doubles as the cursor into the global reorg ring +/// buffer — at poll time the filter scans for reorgs received after +/// this instant. +#[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, @@ -38,7 +49,7 @@ impl core::fmt::Display for ActiveFilter { "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), - self.kind + self.kind, ) } } @@ -60,11 +71,25 @@ impl ActiveFilter { self.last_poll_time = Instant::now(); } + /// Update the poll timestamp without advancing `next_start_block`. + /// + /// Used on early-return paths (implicit reorg, no new blocks) where + /// `compute_removed_logs` has already rewound `next_start_block` and + /// we must preserve that position for the next forward scan. + pub(crate) fn touch_poll_time(&mut self) { + self.last_poll_time = Instant::now(); + } + /// Get the next start block for the filter. pub(crate) const fn next_start_block(&self) -> u64 { self.next_start_block } + /// Get the instant at which the filter was last polled (or created). + pub(crate) const fn last_poll_time(&self) -> Instant { + self.last_poll_time + } + /// Get the duration since the filter was last polled. pub(crate) fn time_since_last_poll(&self) -> Duration { self.last_poll_time.elapsed() @@ -74,6 +99,56 @@ impl ActiveFilter { pub(crate) const fn empty_output(&self) -> FilterOutput { self.kind.empty_output() } + + /// Compute removed logs from a sequence of reorg notifications. + /// + /// Walks the reorgs in order, computing snapshots lazily from the + /// filter's current [`next_start_block`]. For each reorg, only logs + /// from blocks the filter had already delivered (block number below + /// the snapshot) are included. + /// + /// Updates `next_start_block` to the rewound value so the subsequent + /// forward scan starts from the correct position. + /// + /// Block filters return an empty vec — the Ethereum JSON-RPC spec + /// does not define `removed` semantics for block filters. + /// + /// [`next_start_block`]: Self::next_start_block + pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { + let Some(filter) = self.kind.as_filter() else { + for reorg in reorgs { + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + } + return Vec::new(); + }; + + let mut removed = Vec::new(); + for notification in reorgs { + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); + for block in ¬ification.removed_blocks { + if block.number >= snapshot { + continue; + } + for log in &block.logs { + if !filter.matches(log) { + continue; + } + removed.push(Log { + inner: log.clone(), + block_hash: Some(block.hash), + block_number: Some(block.number), + block_timestamp: Some(block.timestamp), + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, + }); + } + } + } + removed + } } /// Inner logic for [`FilterManager`]. @@ -81,13 +156,18 @@ impl ActiveFilter { pub(crate) struct FilterManagerInner { current_id: AtomicU64, filters: DashMap, + reorgs: RwLock)>>, } impl FilterManagerInner { /// Create a new filter manager. fn new() -> Self { // Start from 1, as 0 is weird in quantity encoding. - Self { current_id: AtomicU64::new(1), filters: DashMap::new() } + Self { + current_id: AtomicU64::new(1), + filters: DashMap::new(), + reorgs: RwLock::new(VecDeque::new()), + } } /// Get the next filter ID. @@ -102,10 +182,14 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let next_start_block = current_block + 1; - let _ = self - .filters - .insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind }); + let _ = self.filters.insert( + id, + ActiveFilter { + next_start_block: current_block + 1, + last_poll_time: Instant::now(), + kind, + }, + ); id } @@ -124,6 +208,32 @@ impl FilterManagerInner { self.filters.remove(&id) } + /// Append a reorg notification to the global ring buffer. + /// + /// Evicts the oldest entry when the buffer is full. The + /// [`Arc`] is shared across all poll-time readers. + pub(crate) fn push_reorg(&self, reorg: ReorgNotification) { + let entry = (Instant::now(), Arc::new(reorg)); + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back(entry); + } + + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock and returns them. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() + } + /// Clean stale filters that have not been polled in a while. fn clean_stale(&self, older_than: Duration) { self.filters.retain(|_, filter| filter.time_since_last_poll() < older_than); @@ -136,20 +246,39 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// -/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. -/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. -/// See [`DashMap`] documentation for more information. +/// Reorg notifications are stored in a global ring buffer (capped at +/// [`MAX_REORG_ENTRIES`]). Filters compute their removed logs lazily at +/// poll time by scanning the buffer for entries received since the last +/// poll. +/// +/// Calling [`Self::new`] spawns two background workers: +/// - An OS thread that periodically cleans stale filters (using a separate +/// thread to avoid [`DashMap::retain`] deadlock). +/// - A tokio task that listens for reorg broadcasts and appends them to +/// the global ring buffer. +/// +/// Both workers hold [`Weak`] references and self-terminate when the +/// manager is dropped. #[derive(Debug, Clone)] pub(crate) struct FilterManager { inner: Arc, } impl FilterManager { - /// Create a new filter manager. Spawn a task to clean stale filters. - pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self { + /// Create a new filter manager. + /// + /// Spawns a cleanup thread for stale filters and a tokio task that + /// listens for [`ChainEvent::Reorg`] events and appends them to the + /// global ring buffer. + pub(crate) fn new( + chain_events: &broadcast::Sender, + clean_interval: Duration, + age_limit: Duration, + ) -> Self { let inner = Arc::new(FilterManagerInner::new()); let manager = Self { inner }; FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); + FilterReorgTask::new(Arc::downgrade(&manager.inner), chain_events.subscribe()).spawn(); manager } } @@ -195,6 +324,225 @@ impl FilterCleanTask { } } +/// Task that listens for reorg events and appends them to the global +/// ring buffer. +/// +/// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] +/// is dropped. +struct FilterReorgTask { + manager: Weak, + rx: broadcast::Receiver, +} + +impl FilterReorgTask { + const fn new(manager: Weak, rx: broadcast::Receiver) -> Self { + Self { manager, rx } + } + + /// Spawn the listener as a tokio task. + fn spawn(self) { + tokio::spawn(self.run()); + } + + async fn run(mut self) { + loop { + let event = match self.rx.recv().await { + Ok(event) => event, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + trace!(skipped, "filter reorg listener missed notifications"); + continue; + } + Err(_) => break, + }; + + let ChainEvent::Reorg(reorg) = event else { continue }; + + let Some(manager) = self.manager.upgrade() else { break }; + manager.push_reorg(reorg); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::{InterestKind, RemovedBlock}; + use alloy::primitives::{Address, Bytes, LogData, address, b256}; + + fn block_filter(start: u64) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Block, + } + } + + fn log_filter(start: u64, addr: Address) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Log(Box::new(Filter::new().address(addr))), + } + } + + fn test_log(addr: Address) -> alloy::primitives::Log { + alloy::primitives::Log { address: addr, data: LogData::new_unchecked(vec![], Bytes::new()) } + } + + fn reorg_notification(ancestor: u64, removed: Vec) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: removed } + } + + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + RemovedBlock { + number, + hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + timestamp: 1_000_000 + number, + logs, + } + } + + #[test] + fn compute_removed_logs_rewinds_start_block() { + let mut f = block_filter(10); + let reorg = Arc::new(reorg_notification(7, vec![])); + + f.compute_removed_logs(&[reorg]); + + assert_eq!(f.next_start_block, 8); + } + + #[test] + fn compute_removed_logs_matches_removed() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + + let reorg = Arc::new(reorg_notification( + 8, + vec![removed_block(9, vec![test_log(addr)]), removed_block(10, vec![test_log(addr)])], + )); + + let removed = f.compute_removed_logs(&[reorg]); + + assert_eq!(removed.len(), 2); + assert!(removed.iter().all(|l| l.removed)); + assert!(removed.iter().all(|l| l.inner.address == addr)); + } + + #[test] + fn compute_removed_logs_skips_undelivered_blocks() { + let addr = address!("0x0000000000000000000000000000000000000001"); + // Filter has only delivered up to block 10 (next_start = 11). + let mut f = log_filter(11, addr); + + // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. + let reorg = Arc::new(reorg_notification( + 9, + vec![ + removed_block(10, vec![test_log(addr)]), + removed_block(11, vec![test_log(addr)]), + removed_block(12, vec![test_log(addr)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg]); + + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].block_number, Some(10)); + } + + #[test] + fn compute_removed_logs_cascading() { + let addr = address!("0x0000000000000000000000000000000000000001"); + // Filter has delivered up to block 100 (next_start = 101). + let mut f = log_filter(101, addr); + + // Reorg A: rewinds to 98, removes 99-100. + let reorg_a = Arc::new(reorg_notification( + 98, + vec![removed_block(99, vec![test_log(addr)]), removed_block(100, vec![test_log(addr)])], + )); + + // Reorg B: rewinds to 95, removes 96-103. + let reorg_b = Arc::new(reorg_notification( + 95, + vec![ + removed_block(96, vec![test_log(addr)]), + removed_block(97, vec![test_log(addr)]), + removed_block(98, vec![test_log(addr)]), + removed_block(99, vec![test_log(addr)]), + removed_block(100, vec![test_log(addr)]), + removed_block(101, vec![test_log(addr)]), + removed_block(102, vec![test_log(addr)]), + removed_block(103, vec![test_log(addr)]), + ], + )); + + let removed = f.compute_removed_logs(&[reorg_a, reorg_b]); + + // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. + // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. + // Blocks 99-103 from reorg B are >= 99 → skipped. + assert_eq!(removed.len(), 5); + assert_eq!(f.next_start_block, 96); + } + + #[test] + fn compute_removed_logs_block_filter_empty() { + let mut f = block_filter(10); + let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); + + let removed = f.compute_removed_logs(&[reorg]); + + assert!(removed.is_empty()); + // But the rewind still happened. + assert_eq!(f.next_start_block, 6); + } + + #[test] + fn push_reorg_evicts_oldest() { + let inner = FilterManagerInner::new(); + for i in 0..MAX_REORG_ENTRIES + 5 { + inner.push_reorg(reorg_notification(i as u64, vec![])); + } + let buf = inner.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + // Oldest surviving entry should be the 6th push (index 5). + assert_eq!(buf[0].1.common_ancestor, 5); + } + + #[test] + fn reorgs_since_filters_by_time() { + let inner = FilterManagerInner::new(); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + inner.push_reorg(reorg_notification(10, vec![])); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + inner.push_reorg(reorg_notification(8, vec![])); + + let all = inner.reorgs_since(before); + assert_eq!(all.len(), 2); + + let recent = inner.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); + } + + #[test] + fn reorgs_since_skips_pre_creation_reorgs() { + let inner = FilterManagerInner::new(); + inner.push_reorg(reorg_notification(5, vec![])); + std::thread::sleep(Duration::from_millis(5)); + + let id = inner.install_block_filter(20); + let filter = inner.filters.get(&id).unwrap(); + + let reorgs = inner.reorgs_since(filter.last_poll_time); + assert!(reorgs.is_empty()); + } +} + // Some code in this file has been copied and modified from reth // // The original license is included below: diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f0ed907..72cd78f 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -109,21 +109,23 @@ impl InterestKind { let logs: VecDeque = reorg .removed_blocks - .into_iter() + .iter() .flat_map(|block| { - let block_hash = block.header.hash_slow(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { - inner: log, - block_hash: Some(block_hash), - block_number: Some(block_number), - block_timestamp: Some(block_timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, - }) + let hash = block.hash; + let number = block.number; + let timestamp = block.timestamp; + block.logs.iter().map(move |log| (hash, number, timestamp, log)) + }) + .filter(|(_, _, _, log)| filter.matches(log)) + .map(|(hash, number, timestamp, log)| Log { + inner: log.clone(), + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: Some(timestamp), + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, }) .collect(); @@ -134,7 +136,6 @@ impl InterestKind { #[cfg(test)] mod tests { use super::*; - use crate::interest::RemovedBlock; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { @@ -144,27 +145,29 @@ mod tests { } } - fn test_header(number: u64) -> alloy::consensus::Header { - alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } - } - fn test_filter(addr: Address) -> Filter { Filter::new().address(addr) } + fn test_removed_block( + number: u64, + hash: B256, + logs: Vec, + ) -> crate::interest::RemovedBlock { + crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } + } + #[test] fn filter_reorg_for_sub_matches_logs() { let addr = address!("0x0000000000000000000000000000000000000001"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); - let header = test_header(11); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: header.clone(), - logs: vec![test_log(addr, topic)], - }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(addr, topic)])], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -173,9 +176,9 @@ mod tests { assert_eq!(logs.len(), 1); assert!(logs[0].removed); assert_eq!(logs[0].inner.address, addr); - assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); - assert_eq!(logs[0].block_number.unwrap(), 11); - assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); + assert_eq!(logs[0].block_hash, Some(block_hash)); + assert_eq!(logs[0].block_number, Some(11)); + assert_eq!(logs[0].block_timestamp, Some(1_000_011)); } #[test] @@ -183,14 +186,13 @@ mod tests { let addr = address!("0x0000000000000000000000000000000000000001"); let other = address!("0x0000000000000000000000000000000000000002"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: test_header(11), - logs: vec![test_log(other, topic)], - }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(other, topic)])], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -200,9 +202,12 @@ mod tests { #[test] fn filter_reorg_for_sub_block_returns_empty() { + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![])], }; let buf = InterestKind::Block.filter_reorg_for_sub(reorg); diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 3ad2620..2403037 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -25,12 +25,17 @@ //! reference to the `Arc`, so they self-terminate once all //! strong references are dropped. //! -//! OS threads are used (rather than tokio tasks) because +//! OS threads are used for cleanup (rather than tokio tasks) because //! [`DashMap::retain`] can deadlock if called from an async context //! that also holds a `DashMap` read guard on the same shard. Running //! cleanup on a dedicated OS thread ensures the retain lock is never //! contended with an in-flight async handler. //! +//! [`FilterManager`] additionally spawns a tokio task that listens +//! for [`ChainEvent::Reorg`] broadcasts and eagerly propagates reorg +//! notifications to all active filters. This task does not call +//! `retain`, so it is safe to run in an async context. +//! //! [`Weak`]: std::sync::Weak //! [`DashMap`]: dashmap::DashMap //! [`DashMap::retain`]: dashmap::DashMap::retain @@ -69,12 +74,16 @@ pub enum ChainEvent { Reorg(ReorgNotification), } -/// Data from a single block removed during a chain reorganization. +/// A block that was removed during a chain reorganization. #[derive(Debug, Clone)] pub struct RemovedBlock { - /// The header of the removed block. - pub header: alloy::consensus::Header, - /// Logs emitted by the removed block. + /// The block number. + pub number: u64, + /// The block hash. + pub hash: alloy::primitives::B256, + /// The block timestamp. + pub timestamp: u64, + /// Logs emitted in the removed block. pub logs: Vec, } @@ -83,6 +92,6 @@ pub struct RemovedBlock { pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Blocks removed by the reorg, each carrying its header and logs. + /// The blocks that were removed, ordered by block number. pub removed_blocks: Vec, }