Skip to content

Commit 31e6420

Browse files
prestwichclaude
andcommitted
feat: handle reorgs in get_filter_changes with reorg watermark
Add a `reorg_watermark` field to `ActiveFilter` that records the common ancestor block when a chain reorganization occurs. `FilterManager` now subscribes to `ChainEvent::Reorg` broadcasts and eagerly propagates watermarks to all active filters. On the next poll, `get_filter_changes` rewinds `next_start_block` so re-fetched data reflects the new chain. An implicit reorg detection check (latest < start) provides a belt-and-suspenders fallback when the explicit watermark is missed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 62016a2 commit 31e6420

3 files changed

Lines changed: 195 additions & 10 deletions

File tree

crates/rpc/src/config/ctx.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ impl<H: HotKv> StorageRpcCtx<H> {
101101
config: StorageRpcConfig,
102102
) -> Self {
103103
let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests));
104-
let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl);
104+
let filter_manager = FilterManager::new(
105+
&chain.notif_sender(),
106+
config.stale_filter_ttl,
107+
config.stale_filter_ttl,
108+
);
105109
let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl);
106110
let gas_cache = GasOracleCache::new();
107111
Self {

crates/rpc/src/eth/endpoints.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector;
3232
use serde::Serialize;
3333
use signet_cold::{HeaderSpecifier, ReceiptSpecifier};
3434
use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead};
35-
use tracing::{Instrument, debug, trace_span};
35+
use tracing::{Instrument, debug, trace, trace_span};
3636
use trevm::{
3737
EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker,
3838
};
@@ -1078,9 +1078,22 @@ where
10781078
let fm = ctx.filter_manager();
10791079
let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?;
10801080

1081+
// Handle any pending reorg watermark.
1082+
if let Some(watermark) = entry.handle_reorg() {
1083+
trace!(watermark, "filter reset due to reorg");
1084+
}
1085+
10811086
let latest = ctx.tags().latest();
10821087
let start = entry.next_start_block();
10831088

1089+
// Implicit reorg detection: if latest has moved backward past our
1090+
// window, a reorg occurred that we missed. Reset to avoid skipping.
1091+
if latest + 1 < start {
1092+
trace!(latest, start, "implicit reorg detected, resetting filter");
1093+
entry.mark_polled(latest);
1094+
return Ok(entry.empty_output());
1095+
}
1096+
10841097
if start > latest {
10851098
entry.mark_polled(latest);
10861099
return Ok(entry.empty_output());

crates/rpc/src/interest/filters.rs

Lines changed: 176 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Filter management for `eth_newFilter` / `eth_getFilterChanges`.
22
3-
use crate::interest::{InterestKind, buffer::EventBuffer};
3+
use crate::interest::{ChainEvent, InterestKind, buffer::EventBuffer};
44
use alloy::{
55
primitives::{B256, U64},
66
rpc::types::Filter,
@@ -13,6 +13,7 @@ use std::{
1313
},
1414
time::{Duration, Instant},
1515
};
16+
use tokio::sync::broadcast;
1617
use tracing::trace;
1718

1819
type FilterId = U64;
@@ -29,17 +30,22 @@ pub(crate) struct ActiveFilter {
2930
next_start_block: u64,
3031
last_poll_time: Instant,
3132
kind: InterestKind,
33+
reorg_watermark: Option<u64>,
3234
}
3335

3436
impl core::fmt::Display for ActiveFilter {
3537
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
3638
write!(
3739
f,
38-
"ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}",
40+
"ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?}",
3941
self.next_start_block,
4042
self.last_poll_time.elapsed().as_millis(),
4143
self.kind
42-
)
44+
)?;
45+
if let Some(w) = self.reorg_watermark {
46+
write!(f, ", reorg_watermark: {w}")?;
47+
}
48+
write!(f, " }}")
4349
}
4450
}
4551

@@ -74,6 +80,31 @@ impl ActiveFilter {
7480
pub(crate) const fn empty_output(&self) -> FilterOutput {
7581
self.kind.empty_output()
7682
}
83+
84+
/// Record that a reorg occurred back to this ancestor block.
85+
///
86+
/// If multiple reorgs arrive before the filter is polled, the lowest
87+
/// (most conservative) watermark is kept.
88+
pub(crate) fn set_reorg_watermark(&mut self, common_ancestor: u64) {
89+
self.reorg_watermark =
90+
Some(self.reorg_watermark.map_or(common_ancestor, |w| w.min(common_ancestor)));
91+
}
92+
93+
/// Reset filter state if a pending reorg affected this filter's window.
94+
///
95+
/// Takes and clears the watermark. If the watermark is below
96+
/// `next_start_block`, rewinds the start block so the next poll
97+
/// re-fetches from just after the common ancestor. Returns the
98+
/// watermark value when a reset occurred, `None` otherwise.
99+
pub(crate) fn handle_reorg(&mut self) -> Option<u64> {
100+
let watermark = self.reorg_watermark.take()?;
101+
if watermark < self.next_start_block {
102+
self.next_start_block = watermark + 1;
103+
Some(watermark)
104+
} else {
105+
None
106+
}
107+
}
77108
}
78109

79110
/// Inner logic for [`FilterManager`].
@@ -103,9 +134,15 @@ impl FilterManagerInner {
103134
fn install(&self, current_block: u64, kind: InterestKind) -> FilterId {
104135
let id = self.next_id();
105136
let next_start_block = current_block + 1;
106-
let _ = self
107-
.filters
108-
.insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind });
137+
let _ = self.filters.insert(
138+
id,
139+
ActiveFilter {
140+
next_start_block,
141+
last_poll_time: Instant::now(),
142+
kind,
143+
reorg_watermark: None,
144+
},
145+
);
109146
id
110147
}
111148

@@ -124,6 +161,13 @@ impl FilterManagerInner {
124161
self.filters.remove(&id)
125162
}
126163

164+
/// Set a reorg watermark on all active filters.
165+
pub(crate) fn set_reorg_watermark_all(&self, common_ancestor: u64) {
166+
self.filters
167+
.iter_mut()
168+
.for_each(|mut entry| entry.value_mut().set_reorg_watermark(common_ancestor));
169+
}
170+
127171
/// Clean stale filters that have not been polled in a while.
128172
fn clean_stale(&self, older_than: Duration) {
129173
self.filters.retain(|_, filter| filter.time_since_last_poll() < older_than);
@@ -145,11 +189,20 @@ pub(crate) struct FilterManager {
145189
}
146190

147191
impl FilterManager {
148-
/// Create a new filter manager. Spawn a task to clean stale filters.
149-
pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self {
192+
/// Create a new filter manager.
193+
///
194+
/// Spawns a cleanup thread for stale filters and a tokio task that
195+
/// listens for [`ChainEvent::Reorg`] events and propagates watermarks
196+
/// to all active filters.
197+
pub(crate) fn new(
198+
chain_events: &broadcast::Sender<ChainEvent>,
199+
clean_interval: Duration,
200+
age_limit: Duration,
201+
) -> Self {
150202
let inner = Arc::new(FilterManagerInner::new());
151203
let manager = Self { inner };
152204
FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn();
205+
FilterReorgTask::new(Arc::downgrade(&manager.inner), chain_events.subscribe()).spawn();
153206
manager
154207
}
155208
}
@@ -195,6 +248,121 @@ impl FilterCleanTask {
195248
}
196249
}
197250

251+
/// Task that listens for reorg events and propagates watermarks to all
252+
/// active filters.
253+
///
254+
/// Uses a [`Weak`] reference to self-terminate when the [`FilterManager`]
255+
/// is dropped.
256+
struct FilterReorgTask {
257+
manager: Weak<FilterManagerInner>,
258+
rx: broadcast::Receiver<ChainEvent>,
259+
}
260+
261+
impl FilterReorgTask {
262+
const fn new(manager: Weak<FilterManagerInner>, rx: broadcast::Receiver<ChainEvent>) -> Self {
263+
Self { manager, rx }
264+
}
265+
266+
/// Spawn the listener as a tokio task.
267+
fn spawn(self) {
268+
tokio::spawn(self.run());
269+
}
270+
271+
async fn run(mut self) {
272+
loop {
273+
let event = match self.rx.recv().await {
274+
Ok(event) => event,
275+
Err(broadcast::error::RecvError::Lagged(skipped)) => {
276+
trace!(skipped, "filter reorg listener missed notifications");
277+
continue;
278+
}
279+
Err(_) => break,
280+
};
281+
282+
let ChainEvent::Reorg(reorg) = event else { continue };
283+
284+
let Some(manager) = self.manager.upgrade() else { break };
285+
manager.set_reorg_watermark_all(reorg.common_ancestor);
286+
}
287+
}
288+
}
289+
290+
#[cfg(test)]
291+
mod tests {
292+
use super::*;
293+
use crate::interest::InterestKind;
294+
295+
fn block_filter(start: u64) -> ActiveFilter {
296+
ActiveFilter {
297+
next_start_block: start,
298+
last_poll_time: Instant::now(),
299+
kind: InterestKind::Block,
300+
reorg_watermark: None,
301+
}
302+
}
303+
304+
#[test]
305+
fn set_reorg_watermark_keeps_minimum() {
306+
let mut f = block_filter(10);
307+
f.set_reorg_watermark(8);
308+
assert_eq!(f.reorg_watermark, Some(8));
309+
310+
// A higher watermark does not overwrite the lower one.
311+
f.set_reorg_watermark(9);
312+
assert_eq!(f.reorg_watermark, Some(8));
313+
314+
// A lower watermark replaces the current one.
315+
f.set_reorg_watermark(5);
316+
assert_eq!(f.reorg_watermark, Some(5));
317+
}
318+
319+
#[test]
320+
fn handle_reorg_resets_start_block() {
321+
let mut f = block_filter(10);
322+
f.set_reorg_watermark(7);
323+
324+
let result = f.handle_reorg();
325+
assert_eq!(result, Some(7));
326+
assert_eq!(f.next_start_block, 8);
327+
assert!(f.reorg_watermark.is_none());
328+
}
329+
330+
#[test]
331+
fn handle_reorg_noop_when_watermark_at_or_above_start() {
332+
let mut f = block_filter(10);
333+
f.set_reorg_watermark(10);
334+
335+
let result = f.handle_reorg();
336+
assert!(result.is_none());
337+
// next_start_block unchanged.
338+
assert_eq!(f.next_start_block, 10);
339+
assert!(f.reorg_watermark.is_none());
340+
}
341+
342+
#[test]
343+
fn handle_reorg_clears_watermark() {
344+
let mut f = block_filter(10);
345+
f.set_reorg_watermark(5);
346+
f.handle_reorg();
347+
348+
// Second call returns None — watermark already consumed.
349+
assert!(f.handle_reorg().is_none());
350+
}
351+
352+
#[test]
353+
fn set_reorg_watermark_all_propagates() {
354+
let inner = FilterManagerInner::new();
355+
inner.install_block_filter(20);
356+
inner.install_block_filter(30);
357+
358+
inner.set_reorg_watermark_all(15);
359+
360+
inner.filters.iter().for_each(|entry| {
361+
assert_eq!(entry.value().reorg_watermark, Some(15));
362+
});
363+
}
364+
}
365+
198366
// Some code in this file has been copied and modified from reth
199367
// <https://github.com/paradigmxyz/reth>
200368
// The original license is included below:

0 commit comments

Comments
 (0)