From 379d1c6b4e54a89c37716e9656c59aa35c923b91 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 10 Mar 2026 08:02:53 -0400 Subject: [PATCH 1/7] feat: handle reorgs in `get_filter_changes` with reorg watermark Add a `reorg_watermark` field to `ActiveFilter` that records the common ancestor block when a chain reorganization occurs. `FilterManager` now subscribes to `ChainEvent::Reorg` broadcasts and eagerly propagates watermarks to all active filters. On the next poll, `get_filter_changes` rewinds `next_start_block` so re-fetched data reflects the new chain. An implicit reorg detection check (latest < start) provides a belt-and-suspenders fallback when the explicit watermark is missed. Co-Authored-By: Claude Opus 4.6 --- crates/rpc/src/config/ctx.rs | 6 +- crates/rpc/src/eth/endpoints.rs | 15 ++- crates/rpc/src/interest/filters.rs | 184 +++++++++++++++++++++++++++-- 3 files changed, 195 insertions(+), 10 deletions(-) diff --git a/crates/rpc/src/config/ctx.rs b/crates/rpc/src/config/ctx.rs index 5c933ef..166979b 100644 --- a/crates/rpc/src/config/ctx.rs +++ b/crates/rpc/src/config/ctx.rs @@ -101,7 +101,11 @@ impl StorageRpcCtx { config: StorageRpcConfig, ) -> Self { let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); - let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl); + let filter_manager = FilterManager::new( + &chain.notif_sender(), + config.stale_filter_ttl, + config.stale_filter_ttl, + ); let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl); let gas_cache = GasOracleCache::new(); Self { diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 75d9479..bfa0a15 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector; use serde::Serialize; use signet_cold::{HeaderSpecifier, ReceiptSpecifier}; use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead}; -use tracing::{Instrument, debug, trace_span}; +use tracing::{Instrument, debug, trace, trace_span}; use trevm::{ EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker, }; @@ -1078,9 +1078,22 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; + // Handle any pending reorg watermark. + if let Some(watermark) = entry.handle_reorg() { + trace!(watermark, "filter reset due to reorg"); + } + let latest = ctx.tags().latest(); let start = entry.next_start_block(); + // Implicit reorg detection: if latest has moved backward past our + // window, a reorg occurred that we missed. Reset to avoid skipping. + if latest + 1 < start { + trace!(latest, start, "implicit reorg detected, resetting filter"); + entry.mark_polled(latest); + return Ok(entry.empty_output()); + } + if start > latest { entry.mark_polled(latest); return Ok(entry.empty_output()); diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index ae09367..89f51ce 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,6 +1,6 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{InterestKind, buffer::EventBuffer}; +use crate::interest::{ChainEvent, InterestKind, buffer::EventBuffer}; use alloy::{ primitives::{B256, U64}, rpc::types::Filter, @@ -13,6 +13,7 @@ use std::{ }, time::{Duration, Instant}, }; +use tokio::sync::broadcast; use tracing::trace; type FilterId = U64; @@ -29,17 +30,22 @@ pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, kind: InterestKind, + reorg_watermark: Option, } impl core::fmt::Display for ActiveFilter { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, - "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), self.kind - ) + )?; + if let Some(w) = self.reorg_watermark { + write!(f, ", reorg_watermark: {w}")?; + } + write!(f, " }}") } } @@ -74,6 +80,31 @@ impl ActiveFilter { pub(crate) const fn empty_output(&self) -> FilterOutput { self.kind.empty_output() } + + /// Record that a reorg occurred back to this ancestor block. + /// + /// If multiple reorgs arrive before the filter is polled, the lowest + /// (most conservative) watermark is kept. + pub(crate) fn set_reorg_watermark(&mut self, common_ancestor: u64) { + self.reorg_watermark = + Some(self.reorg_watermark.map_or(common_ancestor, |w| w.min(common_ancestor))); + } + + /// Reset filter state if a pending reorg affected this filter's window. + /// + /// Takes and clears the watermark. If the watermark is below + /// `next_start_block`, rewinds the start block so the next poll + /// re-fetches from just after the common ancestor. Returns the + /// watermark value when a reset occurred, `None` otherwise. + pub(crate) fn handle_reorg(&mut self) -> Option { + let watermark = self.reorg_watermark.take()?; + if watermark < self.next_start_block { + self.next_start_block = watermark + 1; + Some(watermark) + } else { + None + } + } } /// Inner logic for [`FilterManager`]. @@ -103,9 +134,15 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); let next_start_block = current_block + 1; - let _ = self - .filters - .insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind }); + let _ = self.filters.insert( + id, + ActiveFilter { + next_start_block, + last_poll_time: Instant::now(), + kind, + reorg_watermark: None, + }, + ); id } @@ -124,6 +161,13 @@ impl FilterManagerInner { self.filters.remove(&id) } + /// Set a reorg watermark on all active filters. + pub(crate) fn set_reorg_watermark_all(&self, common_ancestor: u64) { + self.filters + .iter_mut() + .for_each(|mut entry| entry.value_mut().set_reorg_watermark(common_ancestor)); + } + /// Clean stale filters that have not been polled in a while. fn clean_stale(&self, older_than: Duration) { self.filters.retain(|_, filter| filter.time_since_last_poll() < older_than); @@ -145,11 +189,20 @@ pub(crate) struct FilterManager { } impl FilterManager { - /// Create a new filter manager. Spawn a task to clean stale filters. - pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self { + /// Create a new filter manager. + /// + /// Spawns a cleanup thread for stale filters and a tokio task that + /// listens for [`ChainEvent::Reorg`] events and propagates watermarks + /// to all active filters. + pub(crate) fn new( + chain_events: &broadcast::Sender, + clean_interval: Duration, + age_limit: Duration, + ) -> Self { let inner = Arc::new(FilterManagerInner::new()); let manager = Self { inner }; FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); + FilterReorgTask::new(Arc::downgrade(&manager.inner), chain_events.subscribe()).spawn(); manager } } @@ -195,6 +248,121 @@ impl FilterCleanTask { } } +/// Task that listens for reorg events and propagates watermarks to all +/// active filters. +/// +/// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] +/// is dropped. +struct FilterReorgTask { + manager: Weak, + rx: broadcast::Receiver, +} + +impl FilterReorgTask { + const fn new(manager: Weak, rx: broadcast::Receiver) -> Self { + Self { manager, rx } + } + + /// Spawn the listener as a tokio task. + fn spawn(self) { + tokio::spawn(self.run()); + } + + async fn run(mut self) { + loop { + let event = match self.rx.recv().await { + Ok(event) => event, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + trace!(skipped, "filter reorg listener missed notifications"); + continue; + } + Err(_) => break, + }; + + let ChainEvent::Reorg(reorg) = event else { continue }; + + let Some(manager) = self.manager.upgrade() else { break }; + manager.set_reorg_watermark_all(reorg.common_ancestor); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::interest::InterestKind; + + fn block_filter(start: u64) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + kind: InterestKind::Block, + reorg_watermark: None, + } + } + + #[test] + fn set_reorg_watermark_keeps_minimum() { + let mut f = block_filter(10); + f.set_reorg_watermark(8); + assert_eq!(f.reorg_watermark, Some(8)); + + // A higher watermark does not overwrite the lower one. + f.set_reorg_watermark(9); + assert_eq!(f.reorg_watermark, Some(8)); + + // A lower watermark replaces the current one. + f.set_reorg_watermark(5); + assert_eq!(f.reorg_watermark, Some(5)); + } + + #[test] + fn handle_reorg_resets_start_block() { + let mut f = block_filter(10); + f.set_reorg_watermark(7); + + let result = f.handle_reorg(); + assert_eq!(result, Some(7)); + assert_eq!(f.next_start_block, 8); + assert!(f.reorg_watermark.is_none()); + } + + #[test] + fn handle_reorg_noop_when_watermark_at_or_above_start() { + let mut f = block_filter(10); + f.set_reorg_watermark(10); + + let result = f.handle_reorg(); + assert!(result.is_none()); + // next_start_block unchanged. + assert_eq!(f.next_start_block, 10); + assert!(f.reorg_watermark.is_none()); + } + + #[test] + fn handle_reorg_clears_watermark() { + let mut f = block_filter(10); + f.set_reorg_watermark(5); + f.handle_reorg(); + + // Second call returns None — watermark already consumed. + assert!(f.handle_reorg().is_none()); + } + + #[test] + fn set_reorg_watermark_all_propagates() { + let inner = FilterManagerInner::new(); + inner.install_block_filter(20); + inner.install_block_filter(30); + + inner.set_reorg_watermark_all(15); + + inner.filters.iter().for_each(|entry| { + assert_eq!(entry.value().reorg_watermark, Some(15)); + }); + } +} + // Some code in this file has been copied and modified from reth // // The original license is included below: From cb9d35cf10600dd6dcb37f4fb85e2f052bd91e07 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 13 Mar 2026 10:14:15 -0400 Subject: [PATCH 2/7] refactor: redesign filter reorg handling with Arc-shared notifications Replace the simple u64 watermark with a Vec of Arc paired with next_start_block snapshots. This fixes two issues: 1. Race condition: filters created after a reorg no longer receive false watermarks, guarded by a created_at timestamp comparison. 2. Missing removed logs: get_filter_changes now emits `removed: true` logs per the Ethereum JSON-RPC spec. Each pending reorg's snapshot determines which removed blocks the client already saw. Restructure ReorgNotification to group logs per block (RemovedBlock) so filters can determine relevance by block number. The Arc sharing means no log data is cloned until poll time, and automatic drop eliminates the need for cleanup passes. Co-Authored-By: Claude Opus 4.6 --- crates/node/src/node.rs | 5 +- crates/rpc/src/eth/endpoints.rs | 23 +- crates/rpc/src/interest/filters.rs | 331 ++++++++++++++++++++++------- crates/rpc/src/interest/kind.rs | 67 +++--- crates/rpc/src/interest/mod.rs | 12 +- 5 files changed, 321 insertions(+), 117 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index f8e2f95..22bd3b9 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -368,10 +368,11 @@ where let removed_blocks = drained .into_iter() .map(|d| { - let header = d.header.into_inner(); + let number = d.header.number(); + let hash = d.header.hash(); let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { header, logs } + RemovedBlock { number, hash, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index bfa0a15..d697492 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1078,16 +1078,21 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; - // Handle any pending reorg watermark. - if let Some(watermark) = entry.handle_reorg() { - trace!(watermark, "filter reset due to reorg"); + // Drain any pending reorg notifications, producing removed logs + // for log filters. `next_start_block` was already rewound eagerly + // when the reorg was received. + let removed = entry.drain_reorgs(); + if !removed.is_empty() { + trace!(count = removed.len(), "drained removed logs from pending reorgs"); } let latest = ctx.tags().latest(); let start = entry.next_start_block(); // Implicit reorg detection: if latest has moved backward past our - // window, a reorg occurred that we missed. Reset to avoid skipping. + // window, a reorg occurred that we missed (e.g. broadcast lagged). + // Reset to avoid skipping. Removed logs are unavailable in this + // degraded path. if latest + 1 < start { trace!(latest, start, "implicit reorg detected, resetting filter"); entry.mark_polled(latest); @@ -1123,7 +1128,15 @@ where let stream = cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?; - let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?; + + // Prepend removed logs so the client sees removals before + // the replacement data. + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 89f51ce..fd60b4c 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -1,9 +1,9 @@ //! Filter management for `eth_newFilter` / `eth_getFilterChanges`. -use crate::interest::{ChainEvent, InterestKind, buffer::EventBuffer}; +use crate::interest::{ChainEvent, InterestKind, ReorgNotification, buffer::EventBuffer}; use alloy::{ primitives::{B256, U64}, - rpc::types::Filter, + rpc::types::{Filter, Log}, }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ @@ -21,31 +21,38 @@ type FilterId = U64; /// Output of a polled filter: log entries or block hashes. pub(crate) type FilterOutput = EventBuffer; +/// A pending reorg notification paired with the filter's +/// `next_start_block` at the time the reorg was received. +/// +/// The snapshot records which blocks the filter had already delivered, +/// so [`ActiveFilter::drain_reorgs`] can determine which removed logs +/// are relevant. +type PendingReorg = (Arc, u64); + /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last /// polled, and the first block whose contents should be considered. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, + created_at: Instant, kind: InterestKind, - reorg_watermark: Option, + pending_reorgs: Vec, } impl core::fmt::Display for ActiveFilter { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, - "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}", + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}, \ + pending_reorgs: {} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), - self.kind - )?; - if let Some(w) = self.reorg_watermark { - write!(f, ", reorg_watermark: {w}")?; - } - write!(f, " }}") + self.kind, + self.pending_reorgs.len(), + ) } } @@ -64,6 +71,7 @@ impl ActiveFilter { pub(crate) fn mark_polled(&mut self, current_block: u64) { self.next_start_block = current_block + 1; self.last_poll_time = Instant::now(); + self.pending_reorgs.clear(); } /// Get the next start block for the filter. @@ -81,29 +89,67 @@ impl ActiveFilter { self.kind.empty_output() } - /// Record that a reorg occurred back to this ancestor block. + /// Record a reorg notification, eagerly rewinding `next_start_block`. + /// + /// The notification is stored (behind an [`Arc`]) alongside a snapshot + /// of the filter's `next_start_block` at the time of the reorg, so + /// that [`drain_reorgs`] can later determine which removed logs the + /// client has already seen. /// - /// If multiple reorgs arrive before the filter is polled, the lowest - /// (most conservative) watermark is kept. - pub(crate) fn set_reorg_watermark(&mut self, common_ancestor: u64) { - self.reorg_watermark = - Some(self.reorg_watermark.map_or(common_ancestor, |w| w.min(common_ancestor))); + /// If `received_at` is before the filter's creation time, the reorg + /// is silently skipped — the filter was installed after the reorg + /// occurred and its `next_start_block` already reflects the post-reorg + /// chain state. + /// + /// [`drain_reorgs`]: Self::drain_reorgs + fn push_reorg(&mut self, reorg: Arc, received_at: Instant) { + if self.created_at > received_at { + return; + } + + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + self.pending_reorgs.push((reorg, snapshot)); } - /// Reset filter state if a pending reorg affected this filter's window. + /// Drain pending reorgs, returning matched removed logs with + /// `removed: true`. /// - /// Takes and clears the watermark. If the watermark is below - /// `next_start_block`, rewinds the start block so the next poll - /// re-fetches from just after the common ancestor. Returns the - /// watermark value when a reset occurred, `None` otherwise. - pub(crate) fn handle_reorg(&mut self) -> Option { - let watermark = self.reorg_watermark.take()?; - if watermark < self.next_start_block { - self.next_start_block = watermark + 1; - Some(watermark) - } else { - None + /// For each pending reorg, only logs from blocks the filter had + /// already delivered (block number below the snapshot) are included. + /// Block filters return an empty vec — the Ethereum JSON-RPC spec + /// does not define `removed` semantics for block filters. + pub(crate) fn drain_reorgs(&mut self) -> Vec { + let reorgs = std::mem::take(&mut self.pending_reorgs); + + let Some(filter) = self.kind.as_filter() else { + return Vec::new(); + }; + + let mut removed = Vec::new(); + for (notification, snapshot_start) in reorgs { + for block in ¬ification.removed_blocks { + if block.number >= snapshot_start { + continue; + } + for log in &block.logs { + if !filter.matches(log) { + continue; + } + removed.push(Log { + inner: log.clone(), + block_hash: Some(block.hash), + block_number: Some(block.number), + block_timestamp: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, + }); + } + } } + removed } } @@ -133,14 +179,15 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let next_start_block = current_block + 1; + let now = Instant::now(); let _ = self.filters.insert( id, ActiveFilter { - next_start_block, - last_poll_time: Instant::now(), + next_start_block: current_block + 1, + last_poll_time: now, + created_at: now, kind, - reorg_watermark: None, + pending_reorgs: Vec::new(), }, ); id @@ -161,11 +208,15 @@ impl FilterManagerInner { self.filters.remove(&id) } - /// Set a reorg watermark on all active filters. - pub(crate) fn set_reorg_watermark_all(&self, common_ancestor: u64) { + /// Apply a reorg notification to all active filters. + /// + /// Each filter records the shared `Arc` alongside + /// a snapshot of its current `next_start_block`, then eagerly rewinds. + /// Filters created after `received_at` are skipped (race guard). + pub(crate) fn apply_reorg(&self, reorg: Arc, received_at: Instant) { self.filters .iter_mut() - .for_each(|mut entry| entry.value_mut().set_reorg_watermark(common_ancestor)); + .for_each(|mut entry| entry.value_mut().push_reorg(Arc::clone(&reorg), received_at)); } /// Clean stale filters that have not been polled in a while. @@ -192,8 +243,8 @@ impl FilterManager { /// Create a new filter manager. /// /// Spawns a cleanup thread for stale filters and a tokio task that - /// listens for [`ChainEvent::Reorg`] events and propagates watermarks - /// to all active filters. + /// listens for [`ChainEvent::Reorg`] events and propagates reorg + /// notifications to all active filters. pub(crate) fn new( chain_events: &broadcast::Sender, clean_interval: Duration, @@ -248,8 +299,8 @@ impl FilterCleanTask { } } -/// Task that listens for reorg events and propagates watermarks to all -/// active filters. +/// Task that listens for reorg events and propagates them to all active +/// filters. /// /// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] /// is dropped. @@ -281,8 +332,10 @@ impl FilterReorgTask { let ChainEvent::Reorg(reorg) = event else { continue }; + let received_at = Instant::now(); + let reorg = Arc::new(reorg); let Some(manager) = self.manager.upgrade() else { break }; - manager.set_reorg_watermark_all(reorg.common_ancestor); + manager.apply_reorg(reorg, received_at); } } } @@ -290,76 +343,208 @@ impl FilterReorgTask { #[cfg(test)] mod tests { use super::*; - use crate::interest::InterestKind; + use crate::interest::{InterestKind, RemovedBlock}; + use alloy::primitives::{Address, Bytes, LogData, address, b256}; fn block_filter(start: u64) -> ActiveFilter { ActiveFilter { next_start_block: start, last_poll_time: Instant::now(), + created_at: Instant::now(), kind: InterestKind::Block, - reorg_watermark: None, + pending_reorgs: Vec::new(), + } + } + + fn log_filter(start: u64, addr: Address) -> ActiveFilter { + ActiveFilter { + next_start_block: start, + last_poll_time: Instant::now(), + created_at: Instant::now(), + kind: InterestKind::Log(Box::new(Filter::new().address(addr))), + pending_reorgs: Vec::new(), + } + } + + fn test_log(addr: Address) -> alloy::primitives::Log { + alloy::primitives::Log { address: addr, data: LogData::new_unchecked(vec![], Bytes::new()) } + } + + fn reorg_notification(ancestor: u64, removed: Vec) -> ReorgNotification { + ReorgNotification { common_ancestor: ancestor, removed_blocks: removed } + } + + fn removed_block(number: u64, logs: Vec) -> RemovedBlock { + RemovedBlock { + number, + hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + logs, } } #[test] - fn set_reorg_watermark_keeps_minimum() { + fn push_reorg_skips_future_filters() { let mut f = block_filter(10); - f.set_reorg_watermark(8); - assert_eq!(f.reorg_watermark, Some(8)); + // received_at is before the filter was created. + let received_at = f.created_at - Duration::from_secs(1); + let reorg = Arc::new(reorg_notification(5, vec![])); - // A higher watermark does not overwrite the lower one. - f.set_reorg_watermark(9); - assert_eq!(f.reorg_watermark, Some(8)); + f.push_reorg(reorg, received_at); - // A lower watermark replaces the current one. - f.set_reorg_watermark(5); - assert_eq!(f.reorg_watermark, Some(5)); + assert!(f.pending_reorgs.is_empty()); + assert_eq!(f.next_start_block, 10); } #[test] - fn handle_reorg_resets_start_block() { + fn push_reorg_rewinds_start_block() { let mut f = block_filter(10); - f.set_reorg_watermark(7); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(7, vec![])); + + f.push_reorg(reorg, received_at); - let result = f.handle_reorg(); - assert_eq!(result, Some(7)); assert_eq!(f.next_start_block, 8); - assert!(f.reorg_watermark.is_none()); + assert_eq!(f.pending_reorgs.len(), 1); } #[test] - fn handle_reorg_noop_when_watermark_at_or_above_start() { - let mut f = block_filter(10); - f.set_reorg_watermark(10); + fn drain_reorgs_matches_removed_logs() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + let received_at = Instant::now(); + + let reorg = Arc::new(reorg_notification( + 8, + vec![removed_block(9, vec![test_log(addr)]), removed_block(10, vec![test_log(addr)])], + )); + + f.push_reorg(reorg, received_at); + let removed = f.drain_reorgs(); + + assert_eq!(removed.len(), 2); + assert!(removed.iter().all(|l| l.removed)); + assert!(removed.iter().all(|l| l.inner.address == addr)); + } - let result = f.handle_reorg(); - assert!(result.is_none()); - // next_start_block unchanged. - assert_eq!(f.next_start_block, 10); - assert!(f.reorg_watermark.is_none()); + #[test] + fn drain_reorgs_skips_undelivered_blocks() { + let addr = address!("0x0000000000000000000000000000000000000001"); + // Filter has only delivered up to block 10 (next_start = 11). + let mut f = log_filter(11, addr); + let received_at = Instant::now(); + + // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. + let reorg = Arc::new(reorg_notification( + 9, + vec![ + removed_block(10, vec![test_log(addr)]), + removed_block(11, vec![test_log(addr)]), + removed_block(12, vec![test_log(addr)]), + ], + )); + + f.push_reorg(reorg, received_at); + let removed = f.drain_reorgs(); + + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].block_number, Some(10)); + } + + #[test] + fn drain_reorgs_cascading() { + let addr = address!("0x0000000000000000000000000000000000000001"); + // Filter has delivered up to block 100 (next_start = 101). + let mut f = log_filter(101, addr); + let received_at = Instant::now(); + + // Reorg A: rewinds to 98, removes 99-100. + let reorg_a = Arc::new(reorg_notification( + 98, + vec![removed_block(99, vec![test_log(addr)]), removed_block(100, vec![test_log(addr)])], + )); + f.push_reorg(reorg_a, received_at); + assert_eq!(f.next_start_block, 99); + + // Reorg B: rewinds to 95, removes 96-103. + let reorg_b = Arc::new(reorg_notification( + 95, + vec![ + removed_block(96, vec![test_log(addr)]), + removed_block(97, vec![test_log(addr)]), + removed_block(98, vec![test_log(addr)]), + removed_block(99, vec![test_log(addr)]), + removed_block(100, vec![test_log(addr)]), + removed_block(101, vec![test_log(addr)]), + removed_block(102, vec![test_log(addr)]), + removed_block(103, vec![test_log(addr)]), + ], + )); + f.push_reorg(reorg_b, received_at); + assert_eq!(f.next_start_block, 96); + + let removed = f.drain_reorgs(); + + // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. + // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. + // Blocks 99-103 from reorg B are >= 99 → skipped. + assert_eq!(removed.len(), 5); } #[test] - fn handle_reorg_clears_watermark() { + fn drain_reorgs_block_filter_empty() { let mut f = block_filter(10); - f.set_reorg_watermark(5); - f.handle_reorg(); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); - // Second call returns None — watermark already consumed. - assert!(f.handle_reorg().is_none()); + f.push_reorg(reorg, received_at); + let removed = f.drain_reorgs(); + + assert!(removed.is_empty()); + // But the rewind still happened. + assert_eq!(f.next_start_block, 6); } #[test] - fn set_reorg_watermark_all_propagates() { + fn drain_reorgs_clears_pending() { + let addr = address!("0x0000000000000000000000000000000000000001"); + let mut f = log_filter(11, addr); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(8, vec![removed_block(9, vec![test_log(addr)])])); + + f.push_reorg(reorg, received_at); + let first = f.drain_reorgs(); + assert_eq!(first.len(), 1); + + let second = f.drain_reorgs(); + assert!(second.is_empty()); + } + + #[test] + fn apply_reorg_propagates_with_race_guard() { let inner = FilterManagerInner::new(); inner.install_block_filter(20); inner.install_block_filter(30); - inner.set_reorg_watermark_all(15); + let received_at = Instant::now(); + let reorg = Arc::new(reorg_notification(15, vec![])); + inner.apply_reorg(reorg, received_at); inner.filters.iter().for_each(|entry| { - assert_eq!(entry.value().reorg_watermark, Some(15)); + assert_eq!(entry.value().pending_reorgs.len(), 1); + assert_eq!(entry.value().next_start_block, 16); }); + + // A filter installed after the reorg should not have it. + let late_id = inner.install_block_filter(50); + // Re-apply same reorg with the old timestamp. + let reorg2 = Arc::new(reorg_notification(15, vec![])); + inner.apply_reorg(reorg2, received_at); + + let late = inner.filters.get(&late_id).unwrap(); + // The late filter was created after received_at, so it should + // have no pending reorgs from this event. + assert!(late.pending_reorgs.is_empty()); + assert_eq!(late.next_start_block, 51); } } diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f0ed907..a9814b6 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -109,21 +109,22 @@ impl InterestKind { let logs: VecDeque = reorg .removed_blocks - .into_iter() + .iter() .flat_map(|block| { - let block_hash = block.header.hash_slow(); - let block_number = block.header.number; - let block_timestamp = block.header.timestamp; - block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log { - inner: log, - block_hash: Some(block_hash), - block_number: Some(block_number), - block_timestamp: Some(block_timestamp), - transaction_hash: None, - transaction_index: None, - log_index: None, - removed: true, - }) + let hash = block.hash; + let number = block.number; + block.logs.iter().map(move |log| (hash, number, log)) + }) + .filter(|(_, _, log)| filter.matches(log)) + .map(|(hash, number, log)| Log { + inner: log.clone(), + block_hash: Some(hash), + block_number: Some(number), + block_timestamp: None, + transaction_hash: None, + transaction_index: None, + log_index: None, + removed: true, }) .collect(); @@ -134,7 +135,6 @@ impl InterestKind { #[cfg(test)] mod tests { use super::*; - use crate::interest::RemovedBlock; use alloy::primitives::{Address, B256, Bytes, LogData, address, b256}; fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log { @@ -144,27 +144,29 @@ mod tests { } } - fn test_header(number: u64) -> alloy::consensus::Header { - alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() } - } - fn test_filter(addr: Address) -> Filter { Filter::new().address(addr) } + fn test_removed_block( + number: u64, + hash: B256, + logs: Vec, + ) -> crate::interest::RemovedBlock { + crate::interest::RemovedBlock { number, hash, logs } + } + #[test] fn filter_reorg_for_sub_matches_logs() { let addr = address!("0x0000000000000000000000000000000000000001"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); - let header = test_header(11); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: header.clone(), - logs: vec![test_log(addr, topic)], - }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(addr, topic)])], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -173,9 +175,8 @@ mod tests { assert_eq!(logs.len(), 1); assert!(logs[0].removed); assert_eq!(logs[0].inner.address, addr); - assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow()); - assert_eq!(logs[0].block_number.unwrap(), 11); - assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011); + assert_eq!(logs[0].block_hash, Some(block_hash)); + assert_eq!(logs[0].block_number, Some(11)); } #[test] @@ -183,14 +184,13 @@ mod tests { let addr = address!("0x0000000000000000000000000000000000000001"); let other = address!("0x0000000000000000000000000000000000000002"); let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000099"); let kind = InterestKind::Log(Box::new(test_filter(addr))); let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { - header: test_header(11), - logs: vec![test_log(other, topic)], - }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![test_log(other, topic)])], }; let buf = kind.filter_reorg_for_sub(reorg); @@ -200,9 +200,12 @@ mod tests { #[test] fn filter_reorg_for_sub_block_returns_empty() { + let block_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000000001"); + let reorg = ReorgNotification { common_ancestor: 10, - removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }], + removed_blocks: vec![test_removed_block(11, block_hash, vec![])], }; let buf = InterestKind::Block.filter_reorg_for_sub(reorg); diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 3ad2620..c79ecc6 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -69,12 +69,14 @@ pub enum ChainEvent { Reorg(ReorgNotification), } -/// Data from a single block removed during a chain reorganization. +/// A block that was removed during a chain reorganization. #[derive(Debug, Clone)] pub struct RemovedBlock { - /// The header of the removed block. - pub header: alloy::consensus::Header, - /// Logs emitted by the removed block. + /// The block number. + pub number: u64, + /// The block hash. + pub hash: alloy::primitives::B256, + /// Logs emitted in the removed block. pub logs: Vec, } @@ -83,6 +85,6 @@ pub struct RemovedBlock { pub struct ReorgNotification { /// The block number of the common ancestor (last block still valid). pub common_ancestor: u64, - /// Blocks removed by the reorg, each carrying its header and logs. + /// The blocks that were removed, ordered by block number. pub removed_blocks: Vec, } From cfa280cbbb50087a7a9bfd58654114ea92440829 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 13 Mar 2026 15:32:30 -0400 Subject: [PATCH 3/7] ci: trigger CI run From 993d5500517e87c83dd919afb9aca7d2f87dcf28 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 08:56:16 -0400 Subject: [PATCH 4/7] docs: update FilterManager docs for reorg task, remove unnecessary guard Update module-level and struct-level documentation to reflect the new FilterReorgTask tokio worker spawned alongside the existing OS cleanup thread. Remove unnecessary `if !removed.is_empty()` guard around the prepend logic in get_filter_changes. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/eth/endpoints.rs | 8 +++----- crates/rpc/src/interest/filters.rs | 14 ++++++++++---- crates/rpc/src/interest/mod.rs | 7 ++++++- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index d697492..060d647 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1132,11 +1132,9 @@ where // Prepend removed logs so the client sees removals before // the replacement data. - if !removed.is_empty() { - let mut combined = removed; - combined.append(&mut logs); - logs = combined; - } + let mut combined = removed; + combined.append(&mut logs); + logs = combined; entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index fd60b4c..6f81f47 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -32,7 +32,8 @@ type PendingReorg = (Arc, u64); /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last -/// polled, and the first block whose contents should be considered. +/// polled, and the first block whose contents should be considered. Tracks +/// a `created_at` timestamp used to guard against stale reorg notifications. #[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, @@ -231,9 +232,14 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// -/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. -/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. -/// See [`DashMap`] documentation for more information. +/// Calling [`Self::new`] spawns two background workers: +/// - An OS thread that periodically cleans stale filters (using a separate +/// thread to avoid [`DashMap::retain`] deadlock). +/// - A tokio task that listens for reorg broadcasts and eagerly propagates +/// them to all active filters. +/// +/// Both workers hold [`Weak`] references and self-terminate when the +/// manager is dropped. #[derive(Debug, Clone)] pub(crate) struct FilterManager { inner: Arc, diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index c79ecc6..8b20187 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -25,12 +25,17 @@ //! reference to the `Arc`, so they self-terminate once all //! strong references are dropped. //! -//! OS threads are used (rather than tokio tasks) because +//! OS threads are used for cleanup (rather than tokio tasks) because //! [`DashMap::retain`] can deadlock if called from an async context //! that also holds a `DashMap` read guard on the same shard. Running //! cleanup on a dedicated OS thread ensures the retain lock is never //! contended with an in-flight async handler. //! +//! [`FilterManager`] additionally spawns a tokio task that listens +//! for [`ChainEvent::Reorg`] broadcasts and eagerly propagates reorg +//! notifications to all active filters. This task does not call +//! `retain`, so it is safe to run in an async context. +//! //! [`Weak`]: std::sync::Weak //! [`DashMap`]: dashmap::DashMap //! [`DashMap::retain`]: dashmap::DashMap::retain From 74cad4062c2b0817e5e5cb1dd510ed90d9230680 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 09:03:05 -0400 Subject: [PATCH 5/7] fix: restore block_timestamp on removed logs for spec compliance Thread timestamp from DrainedBlock.header through RemovedBlock so that drain_reorgs and filter_reorg_for_sub populate block_timestamp on removed logs, restoring Ethereum JSON-RPC spec compliance lost during the reorg redesign. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/node/src/node.rs | 3 ++- crates/rpc/src/interest/filters.rs | 3 ++- crates/rpc/src/interest/kind.rs | 11 ++++++----- crates/rpc/src/interest/mod.rs | 2 ++ 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 22bd3b9..b2ddc9a 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -370,9 +370,10 @@ where .map(|d| { let number = d.header.number(); let hash = d.header.hash(); + let timestamp = d.header.timestamp(); let logs = d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect(); - RemovedBlock { number, hash, logs } + RemovedBlock { number, hash, timestamp, logs } }) .collect(); let notif = ReorgNotification { common_ancestor, removed_blocks }; diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 6f81f47..0a036be 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -141,7 +141,7 @@ impl ActiveFilter { inner: log.clone(), block_hash: Some(block.hash), block_number: Some(block.number), - block_timestamp: None, + block_timestamp: Some(block.timestamp), transaction_hash: None, transaction_index: None, log_index: None, @@ -384,6 +384,7 @@ mod tests { RemovedBlock { number, hash: b256!("0x0000000000000000000000000000000000000000000000000000000000000001"), + timestamp: 1_000_000 + number, logs, } } diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index a9814b6..f9f7bf7 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -113,14 +113,15 @@ impl InterestKind { .flat_map(|block| { let hash = block.hash; let number = block.number; - block.logs.iter().map(move |log| (hash, number, log)) + let timestamp = block.timestamp; + block.logs.iter().map(move |log| (hash, number, timestamp, log)) }) - .filter(|(_, _, log)| filter.matches(log)) - .map(|(hash, number, log)| Log { + .filter(|(_, _, _, log)| filter.matches(log)) + .map(|(hash, number, timestamp, log)| Log { inner: log.clone(), block_hash: Some(hash), block_number: Some(number), - block_timestamp: None, + block_timestamp: Some(timestamp), transaction_hash: None, transaction_index: None, log_index: None, @@ -153,7 +154,7 @@ mod tests { hash: B256, logs: Vec, ) -> crate::interest::RemovedBlock { - crate::interest::RemovedBlock { number, hash, logs } + crate::interest::RemovedBlock { number, hash, timestamp: 1_000_000 + number, logs } } #[test] diff --git a/crates/rpc/src/interest/mod.rs b/crates/rpc/src/interest/mod.rs index 8b20187..2403037 100644 --- a/crates/rpc/src/interest/mod.rs +++ b/crates/rpc/src/interest/mod.rs @@ -81,6 +81,8 @@ pub struct RemovedBlock { pub number: u64, /// The block hash. pub hash: alloy::primitives::B256, + /// The block timestamp. + pub timestamp: u64, /// Logs emitted in the removed block. pub logs: Vec, } From 4c9f2f1cecf9a8028e25cc0911a4148555909f8c Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 09:51:59 -0400 Subject: [PATCH 6/7] refactor: replace per-filter reorg storage with global ring buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-filter `pending_reorgs` accumulated `Arc` eagerly via O(n) iteration on every reorg. Two early-return paths in `get_filter_changes` then discarded the drained removed logs — notably `start > latest` fires every post-reorg poll before new blocks arrive. Replace with a global `RwLock>` ring buffer (cap 25) on FilterManagerInner. Filters compute removed logs lazily at poll time by scanning entries received since `last_poll_time`, walking reorgs in order to derive snapshots. Both early-return paths now return removed logs instead of `empty_output()`. - Remove `pending_reorgs`, `created_at`, `push_reorg`, `drain_reorgs` from ActiveFilter - Add `compute_removed_logs`, `last_poll_time` accessor - Add `push_reorg` (ring buffer append) and `reorgs_since` to FilterManagerInner - Simplify FilterReorgTask to just append (no per-filter iteration) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/eth/endpoints.rs | 34 ++-- crates/rpc/src/interest/filters.rs | 259 +++++++++++++---------------- 2 files changed, 141 insertions(+), 152 deletions(-) diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 060d647..765556a 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1078,12 +1078,13 @@ where let fm = ctx.filter_manager(); let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; - // Drain any pending reorg notifications, producing removed logs - // for log filters. `next_start_block` was already rewound eagerly - // when the reorg was received. - let removed = entry.drain_reorgs(); + // Scan the global reorg ring buffer for notifications received + // since this filter's last poll, then lazily compute removed logs + // and rewind `next_start_block`. + let reorgs = fm.reorgs_since(entry.last_poll_time()); + let removed = entry.compute_removed_logs(&reorgs); if !removed.is_empty() { - trace!(count = removed.len(), "drained removed logs from pending reorgs"); + trace!(count = removed.len(), "computed removed logs from reorg ring buffer"); } let latest = ctx.tags().latest(); @@ -1091,17 +1092,24 @@ where // Implicit reorg detection: if latest has moved backward past our // window, a reorg occurred that we missed (e.g. broadcast lagged). - // Reset to avoid skipping. Removed logs are unavailable in this - // degraded path. + // Return any removed logs we do have, then reset. if latest + 1 < start { trace!(latest, start, "implicit reorg detected, resetting filter"); entry.mark_polled(latest); - return Ok(entry.empty_output()); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } if start > latest { entry.mark_polled(latest); - return Ok(entry.empty_output()); + return Ok(if removed.is_empty() { + entry.empty_output() + } else { + FilterOutput::from(removed) + }); } let cold = ctx.cold(); @@ -1132,9 +1140,11 @@ where // Prepend removed logs so the client sees removals before // the replacement data. - let mut combined = removed; - combined.append(&mut logs); - logs = combined; + if !removed.is_empty() { + let mut combined = removed; + combined.append(&mut logs); + logs = combined; + } entry.mark_polled(latest); Ok(FilterOutput::from(logs)) diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 0a036be..59297b9 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -7,8 +7,9 @@ use alloy::{ }; use dashmap::{DashMap, mapref::one::RefMut}; use std::{ + collections::VecDeque, sync::{ - Arc, Weak, + Arc, RwLock, Weak, atomic::{AtomicU64, Ordering}, }, time::{Duration, Instant}, @@ -21,38 +22,34 @@ type FilterId = U64; /// Output of a polled filter: log entries or block hashes. pub(crate) type FilterOutput = EventBuffer; -/// A pending reorg notification paired with the filter's -/// `next_start_block` at the time the reorg was received. -/// -/// The snapshot records which blocks the filter had already delivered, -/// so [`ActiveFilter::drain_reorgs`] can determine which removed logs -/// are relevant. -type PendingReorg = (Arc, u64); +/// Maximum number of reorg notifications retained in the global ring +/// buffer. At most one reorg per 12 seconds and a 5-minute stale filter +/// TTL gives a worst case of 25 entries. +const MAX_REORG_ENTRIES: usize = 25; /// An active filter. /// /// Records the filter details, the [`Instant`] at which the filter was last -/// polled, and the first block whose contents should be considered. Tracks -/// a `created_at` timestamp used to guard against stale reorg notifications. +/// polled, and the first block whose contents should be considered. +/// +/// `last_poll_time` doubles as the cursor into the global reorg ring +/// buffer — at poll time the filter scans for reorgs received after +/// this instant. #[derive(Debug, Clone)] pub(crate) struct ActiveFilter { next_start_block: u64, last_poll_time: Instant, - created_at: Instant, kind: InterestKind, - pending_reorgs: Vec, } impl core::fmt::Display for ActiveFilter { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!( f, - "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}, \ - pending_reorgs: {} }}", + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", self.next_start_block, self.last_poll_time.elapsed().as_millis(), self.kind, - self.pending_reorgs.len(), ) } } @@ -72,7 +69,6 @@ impl ActiveFilter { pub(crate) fn mark_polled(&mut self, current_block: u64) { self.next_start_block = current_block + 1; self.last_poll_time = Instant::now(); - self.pending_reorgs.clear(); } /// Get the next start block for the filter. @@ -80,6 +76,11 @@ impl ActiveFilter { self.next_start_block } + /// Get the instant at which the filter was last polled (or created). + pub(crate) const fn last_poll_time(&self) -> Instant { + self.last_poll_time + } + /// Get the duration since the filter was last polled. pub(crate) fn time_since_last_poll(&self) -> Duration { self.last_poll_time.elapsed() @@ -90,47 +91,34 @@ impl ActiveFilter { self.kind.empty_output() } - /// Record a reorg notification, eagerly rewinding `next_start_block`. - /// - /// The notification is stored (behind an [`Arc`]) alongside a snapshot - /// of the filter's `next_start_block` at the time of the reorg, so - /// that [`drain_reorgs`] can later determine which removed logs the - /// client has already seen. + /// Compute removed logs from a sequence of reorg notifications. /// - /// If `received_at` is before the filter's creation time, the reorg - /// is silently skipped — the filter was installed after the reorg - /// occurred and its `next_start_block` already reflects the post-reorg - /// chain state. + /// Walks the reorgs in order, computing snapshots lazily from the + /// filter's current [`next_start_block`]. For each reorg, only logs + /// from blocks the filter had already delivered (block number below + /// the snapshot) are included. /// - /// [`drain_reorgs`]: Self::drain_reorgs - fn push_reorg(&mut self, reorg: Arc, received_at: Instant) { - if self.created_at > received_at { - return; - } - - let snapshot = self.next_start_block; - self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); - self.pending_reorgs.push((reorg, snapshot)); - } - - /// Drain pending reorgs, returning matched removed logs with - /// `removed: true`. + /// Updates `next_start_block` to the rewound value so the subsequent + /// forward scan starts from the correct position. /// - /// For each pending reorg, only logs from blocks the filter had - /// already delivered (block number below the snapshot) are included. /// Block filters return an empty vec — the Ethereum JSON-RPC spec /// does not define `removed` semantics for block filters. - pub(crate) fn drain_reorgs(&mut self) -> Vec { - let reorgs = std::mem::take(&mut self.pending_reorgs); - + /// + /// [`next_start_block`]: Self::next_start_block + pub(crate) fn compute_removed_logs(&mut self, reorgs: &[Arc]) -> Vec { let Some(filter) = self.kind.as_filter() else { + for reorg in reorgs { + self.next_start_block = self.next_start_block.min(reorg.common_ancestor + 1); + } return Vec::new(); }; let mut removed = Vec::new(); - for (notification, snapshot_start) in reorgs { + for notification in reorgs { + let snapshot = self.next_start_block; + self.next_start_block = self.next_start_block.min(notification.common_ancestor + 1); for block in ¬ification.removed_blocks { - if block.number >= snapshot_start { + if block.number >= snapshot { continue; } for log in &block.logs { @@ -159,13 +147,18 @@ impl ActiveFilter { pub(crate) struct FilterManagerInner { current_id: AtomicU64, filters: DashMap, + reorgs: RwLock)>>, } impl FilterManagerInner { /// Create a new filter manager. fn new() -> Self { // Start from 1, as 0 is weird in quantity encoding. - Self { current_id: AtomicU64::new(1), filters: DashMap::new() } + Self { + current_id: AtomicU64::new(1), + filters: DashMap::new(), + reorgs: RwLock::new(VecDeque::new()), + } } /// Get the next filter ID. @@ -180,15 +173,12 @@ impl FilterManagerInner { fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { let id = self.next_id(); - let now = Instant::now(); let _ = self.filters.insert( id, ActiveFilter { next_start_block: current_block + 1, - last_poll_time: now, - created_at: now, + last_poll_time: Instant::now(), kind, - pending_reorgs: Vec::new(), }, ); id @@ -209,15 +199,30 @@ impl FilterManagerInner { self.filters.remove(&id) } - /// Apply a reorg notification to all active filters. + /// Append a reorg notification to the global ring buffer. /// - /// Each filter records the shared `Arc` alongside - /// a snapshot of its current `next_start_block`, then eagerly rewinds. - /// Filters created after `received_at` are skipped (race guard). - pub(crate) fn apply_reorg(&self, reorg: Arc, received_at: Instant) { - self.filters - .iter_mut() - .for_each(|mut entry| entry.value_mut().push_reorg(Arc::clone(&reorg), received_at)); + /// Evicts the oldest entry when the buffer is full. The + /// [`Arc`] is shared across all poll-time readers. + pub(crate) fn push_reorg(&self, reorg: ReorgNotification) { + let entry = (Instant::now(), Arc::new(reorg)); + let mut buf = self.reorgs.write().unwrap(); + if buf.len() >= MAX_REORG_ENTRIES { + buf.pop_front(); + } + buf.push_back(entry); + } + + /// Return all reorg notifications received after `since`. + /// + /// Clones the [`Arc`]s under a brief read lock and returns them. + pub(crate) fn reorgs_since(&self, since: Instant) -> Vec> { + self.reorgs + .read() + .unwrap() + .iter() + .filter(|(received_at, _)| *received_at > since) + .map(|(_, reorg)| Arc::clone(reorg)) + .collect() } /// Clean stale filters that have not been polled in a while. @@ -232,11 +237,16 @@ impl FilterManagerInner { /// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. /// Filter IDs are assigned sequentially, starting from 1. /// +/// Reorg notifications are stored in a global ring buffer (capped at +/// [`MAX_REORG_ENTRIES`]). Filters compute their removed logs lazily at +/// poll time by scanning the buffer for entries received since the last +/// poll. +/// /// Calling [`Self::new`] spawns two background workers: /// - An OS thread that periodically cleans stale filters (using a separate /// thread to avoid [`DashMap::retain`] deadlock). -/// - A tokio task that listens for reorg broadcasts and eagerly propagates -/// them to all active filters. +/// - A tokio task that listens for reorg broadcasts and appends them to +/// the global ring buffer. /// /// Both workers hold [`Weak`] references and self-terminate when the /// manager is dropped. @@ -249,8 +259,8 @@ impl FilterManager { /// Create a new filter manager. /// /// Spawns a cleanup thread for stale filters and a tokio task that - /// listens for [`ChainEvent::Reorg`] events and propagates reorg - /// notifications to all active filters. + /// listens for [`ChainEvent::Reorg`] events and appends them to the + /// global ring buffer. pub(crate) fn new( chain_events: &broadcast::Sender, clean_interval: Duration, @@ -305,8 +315,8 @@ impl FilterCleanTask { } } -/// Task that listens for reorg events and propagates them to all active -/// filters. +/// Task that listens for reorg events and appends them to the global +/// ring buffer. /// /// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`] /// is dropped. @@ -338,10 +348,8 @@ impl FilterReorgTask { let ChainEvent::Reorg(reorg) = event else { continue }; - let received_at = Instant::now(); - let reorg = Arc::new(reorg); let Some(manager) = self.manager.upgrade() else { break }; - manager.apply_reorg(reorg, received_at); + manager.push_reorg(reorg); } } } @@ -356,9 +364,7 @@ mod tests { ActiveFilter { next_start_block: start, last_poll_time: Instant::now(), - created_at: Instant::now(), kind: InterestKind::Block, - pending_reorgs: Vec::new(), } } @@ -366,9 +372,7 @@ mod tests { ActiveFilter { next_start_block: start, last_poll_time: Instant::now(), - created_at: Instant::now(), kind: InterestKind::Log(Box::new(Filter::new().address(addr))), - pending_reorgs: Vec::new(), } } @@ -390,43 +394,26 @@ mod tests { } #[test] - fn push_reorg_skips_future_filters() { + fn compute_removed_logs_rewinds_start_block() { let mut f = block_filter(10); - // received_at is before the filter was created. - let received_at = f.created_at - Duration::from_secs(1); - let reorg = Arc::new(reorg_notification(5, vec![])); - - f.push_reorg(reorg, received_at); - - assert!(f.pending_reorgs.is_empty()); - assert_eq!(f.next_start_block, 10); - } - - #[test] - fn push_reorg_rewinds_start_block() { - let mut f = block_filter(10); - let received_at = Instant::now(); let reorg = Arc::new(reorg_notification(7, vec![])); - f.push_reorg(reorg, received_at); + f.compute_removed_logs(&[reorg]); assert_eq!(f.next_start_block, 8); - assert_eq!(f.pending_reorgs.len(), 1); } #[test] - fn drain_reorgs_matches_removed_logs() { + fn compute_removed_logs_matches_removed() { let addr = address!("0x0000000000000000000000000000000000000001"); let mut f = log_filter(11, addr); - let received_at = Instant::now(); let reorg = Arc::new(reorg_notification( 8, vec![removed_block(9, vec![test_log(addr)]), removed_block(10, vec![test_log(addr)])], )); - f.push_reorg(reorg, received_at); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg]); assert_eq!(removed.len(), 2); assert!(removed.iter().all(|l| l.removed)); @@ -434,11 +421,10 @@ mod tests { } #[test] - fn drain_reorgs_skips_undelivered_blocks() { + fn compute_removed_logs_skips_undelivered_blocks() { let addr = address!("0x0000000000000000000000000000000000000001"); // Filter has only delivered up to block 10 (next_start = 11). let mut f = log_filter(11, addr); - let received_at = Instant::now(); // Reorg removes blocks 10, 11, 12. Only block 10 was delivered. let reorg = Arc::new(reorg_notification( @@ -450,27 +436,23 @@ mod tests { ], )); - f.push_reorg(reorg, received_at); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg]); assert_eq!(removed.len(), 1); assert_eq!(removed[0].block_number, Some(10)); } #[test] - fn drain_reorgs_cascading() { + fn compute_removed_logs_cascading() { let addr = address!("0x0000000000000000000000000000000000000001"); // Filter has delivered up to block 100 (next_start = 101). let mut f = log_filter(101, addr); - let received_at = Instant::now(); // Reorg A: rewinds to 98, removes 99-100. let reorg_a = Arc::new(reorg_notification( 98, vec![removed_block(99, vec![test_log(addr)]), removed_block(100, vec![test_log(addr)])], )); - f.push_reorg(reorg_a, received_at); - assert_eq!(f.next_start_block, 99); // Reorg B: rewinds to 95, removes 96-103. let reorg_b = Arc::new(reorg_notification( @@ -486,25 +468,22 @@ mod tests { removed_block(103, vec![test_log(addr)]), ], )); - f.push_reorg(reorg_b, received_at); - assert_eq!(f.next_start_block, 96); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg_a, reorg_b]); // Reorg A: snapshot=101, blocks 99-100 < 101 → 2 logs. // Reorg B: snapshot=99, blocks 96-98 < 99 → 3 logs. // Blocks 99-103 from reorg B are >= 99 → skipped. assert_eq!(removed.len(), 5); + assert_eq!(f.next_start_block, 96); } #[test] - fn drain_reorgs_block_filter_empty() { + fn compute_removed_logs_block_filter_empty() { let mut f = block_filter(10); - let received_at = Instant::now(); let reorg = Arc::new(reorg_notification(5, vec![removed_block(6, vec![])])); - f.push_reorg(reorg, received_at); - let removed = f.drain_reorgs(); + let removed = f.compute_removed_logs(&[reorg]); assert!(removed.is_empty()); // But the rewind still happened. @@ -512,46 +491,46 @@ mod tests { } #[test] - fn drain_reorgs_clears_pending() { - let addr = address!("0x0000000000000000000000000000000000000001"); - let mut f = log_filter(11, addr); - let received_at = Instant::now(); - let reorg = Arc::new(reorg_notification(8, vec![removed_block(9, vec![test_log(addr)])])); + fn push_reorg_evicts_oldest() { + let inner = FilterManagerInner::new(); + for i in 0..MAX_REORG_ENTRIES + 5 { + inner.push_reorg(reorg_notification(i as u64, vec![])); + } + let buf = inner.reorgs.read().unwrap(); + assert_eq!(buf.len(), MAX_REORG_ENTRIES); + // Oldest surviving entry should be the 6th push (index 5). + assert_eq!(buf[0].1.common_ancestor, 5); + } - f.push_reorg(reorg, received_at); - let first = f.drain_reorgs(); - assert_eq!(first.len(), 1); + #[test] + fn reorgs_since_filters_by_time() { + let inner = FilterManagerInner::new(); + let before = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + inner.push_reorg(reorg_notification(10, vec![])); + let mid = Instant::now(); + std::thread::sleep(Duration::from_millis(5)); + inner.push_reorg(reorg_notification(8, vec![])); + + let all = inner.reorgs_since(before); + assert_eq!(all.len(), 2); - let second = f.drain_reorgs(); - assert!(second.is_empty()); + let recent = inner.reorgs_since(mid); + assert_eq!(recent.len(), 1); + assert_eq!(recent[0].common_ancestor, 8); } #[test] - fn apply_reorg_propagates_with_race_guard() { + fn reorgs_since_skips_pre_creation_reorgs() { let inner = FilterManagerInner::new(); - inner.install_block_filter(20); - inner.install_block_filter(30); + inner.push_reorg(reorg_notification(5, vec![])); + std::thread::sleep(Duration::from_millis(5)); - let received_at = Instant::now(); - let reorg = Arc::new(reorg_notification(15, vec![])); - inner.apply_reorg(reorg, received_at); - - inner.filters.iter().for_each(|entry| { - assert_eq!(entry.value().pending_reorgs.len(), 1); - assert_eq!(entry.value().next_start_block, 16); - }); + let id = inner.install_block_filter(20); + let filter = inner.filters.get(&id).unwrap(); - // A filter installed after the reorg should not have it. - let late_id = inner.install_block_filter(50); - // Re-apply same reorg with the old timestamp. - let reorg2 = Arc::new(reorg_notification(15, vec![])); - inner.apply_reorg(reorg2, received_at); - - let late = inner.filters.get(&late_id).unwrap(); - // The late filter was created after received_at, so it should - // have no pending reorgs from this event. - assert!(late.pending_reorgs.is_empty()); - assert_eq!(late.next_start_block, 51); + let reorgs = inner.reorgs_since(filter.last_poll_time); + assert!(reorgs.is_empty()); } } From 10bb2fba2aee00aee7f6d62d23147f64a3d65f3e Mon Sep 17 00:00:00 2001 From: James Date: Sat, 14 Mar 2026 10:22:28 -0400 Subject: [PATCH 7/7] fix: preserve rewound next_start_block on early-return paths Replace mark_polled(latest) with touch_poll_time() on the implicit-reorg and no-new-blocks early-return paths. mark_polled unconditionally overwrites next_start_block, discarding the rewind applied by compute_removed_logs. The new touch_poll_time method updates only last_poll_time so the next forward scan starts from the correct position. Also restore the block_timestamp assertion in filter_reorg_for_sub test that was dropped during the ring buffer refactor. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/rpc/src/eth/endpoints.rs | 4 ++-- crates/rpc/src/interest/filters.rs | 9 +++++++++ crates/rpc/src/interest/kind.rs | 1 + 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/rpc/src/eth/endpoints.rs b/crates/rpc/src/eth/endpoints.rs index 765556a..6e818e2 100644 --- a/crates/rpc/src/eth/endpoints.rs +++ b/crates/rpc/src/eth/endpoints.rs @@ -1095,7 +1095,7 @@ where // Return any removed logs we do have, then reset. if latest + 1 < start { trace!(latest, start, "implicit reorg detected, resetting filter"); - entry.mark_polled(latest); + entry.touch_poll_time(); return Ok(if removed.is_empty() { entry.empty_output() } else { @@ -1104,7 +1104,7 @@ where } if start > latest { - entry.mark_polled(latest); + entry.touch_poll_time(); return Ok(if removed.is_empty() { entry.empty_output() } else { diff --git a/crates/rpc/src/interest/filters.rs b/crates/rpc/src/interest/filters.rs index 59297b9..24fecea 100644 --- a/crates/rpc/src/interest/filters.rs +++ b/crates/rpc/src/interest/filters.rs @@ -71,6 +71,15 @@ impl ActiveFilter { self.last_poll_time = Instant::now(); } + /// Update the poll timestamp without advancing `next_start_block`. + /// + /// Used on early-return paths (implicit reorg, no new blocks) where + /// `compute_removed_logs` has already rewound `next_start_block` and + /// we must preserve that position for the next forward scan. + pub(crate) fn touch_poll_time(&mut self) { + self.last_poll_time = Instant::now(); + } + /// Get the next start block for the filter. pub(crate) const fn next_start_block(&self) -> u64 { self.next_start_block diff --git a/crates/rpc/src/interest/kind.rs b/crates/rpc/src/interest/kind.rs index f9f7bf7..72cd78f 100644 --- a/crates/rpc/src/interest/kind.rs +++ b/crates/rpc/src/interest/kind.rs @@ -178,6 +178,7 @@ mod tests { assert_eq!(logs[0].inner.address, addr); assert_eq!(logs[0].block_hash, Some(block_hash)); assert_eq!(logs[0].block_number, Some(11)); + assert_eq!(logs[0].block_timestamp, Some(1_000_011)); } #[test]