Skip to content

feat: handle reorgs in get_filter_changes with reorg watermark#98

Open
prestwich wants to merge 7 commits intodevelopfrom
james/eng-1971
Open

feat: handle reorgs in get_filter_changes with reorg watermark#98
prestwich wants to merge 7 commits intodevelopfrom
james/eng-1971

Conversation

@prestwich
Copy link
Member

@prestwich prestwich commented Mar 10, 2026

Summary

  • Replaces per-filter reorg storage with a global ring buffer on FilterManagerInner that retains recent ReorgNotifications
  • Spawns a FilterReorgTask that subscribes to ChainEvent::Reorg broadcasts and eagerly populates the ring buffer
  • On poll, get_filter_changes scans the ring buffer for notifications received since the filter's last poll, lazily computes removed logs, and rewinds next_start_block for the subsequent forward scan
  • Adds an implicit reorg detection fallback (latest + 1 < start) for cases where the broadcast was missed
  • Restructures RemovedBlock to flat number/hash/timestamp fields (replaces full Header)
  • Preserves block_timestamp on removed logs for Ethereum JSON-RPC spec compliance

Closes ENG-1971

Stack

This PR includes commits from #96 and #97. Review only the top commits.

  1. feat: update BlockTags during reorgs to prevent stale tag window #96BlockTags::rewind_to for reorg tag updates
  2. feat: handle ChainEvent::Reorg in SubscriptionTask #97SubscriptionTask reorg handling
  3. feat: handle reorgs in get_filter_changes with reorg watermark #98 ← this PRget_filter_changes reorg handling with global ring buffer
  4. test: integration tests for reorg tracking in RPC subscriptions and filters #99 — Integration tests (includes feat: update BlockTags during reorgs to prevent stale tag window #96, feat: handle ChainEvent::Reorg in SubscriptionTask #97, feat: handle reorgs in get_filter_changes with reorg watermark #98)

Test plan

  • cargo clippy -p signet-rpc --all-features --all-targets — clean
  • cargo clippy -p signet-rpc --no-default-features --all-targets — clean
  • cargo +nightly fmt — clean
  • cargo t -p signet-rpc — 35 tests + 4 doc-tests pass
  • Review that FilterReorgTask self-terminates when FilterManager is dropped (uses Weak)
  • Review implicit reorg detection edge cases

🤖 Generated with Claude Code

@prestwich prestwich requested a review from a team as a code owner March 10, 2026 12:03
Copy link
Member

@Evalir Evalir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. It requires thinking quite a bit through the possible reorg cases but the flow is clear to me. I'm good with filter changes returning the empty output instead of querying an impossible range

Comment on lines +133 to +137
pub fn rewind_to(&self, ancestor: u64) {
self.latest.fetch_min(ancestor, Ordering::Release);
self.safe.fetch_min(ancestor, Ordering::Release);
self.finalized.fetch_min(ancestor, Ordering::Release);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this order should be fine, since finalized should actually not be possible to move (else things went very, very wrong (or you are unichain))

@Evalir Evalir self-requested a review March 13, 2026 14:22
prestwich and others added 2 commits March 13, 2026 13:47
Add a `reorg_watermark` field to `ActiveFilter` that records the common
ancestor block when a chain reorganization occurs. `FilterManager` now
subscribes to `ChainEvent::Reorg` broadcasts and eagerly propagates
watermarks to all active filters. On the next poll, `get_filter_changes`
rewinds `next_start_block` so re-fetched data reflects the new chain.

An implicit reorg detection check (latest < start) provides a
belt-and-suspenders fallback when the explicit watermark is missed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the simple u64 watermark with a Vec of Arc<ReorgNotification>
paired with next_start_block snapshots. This fixes two issues:

1. Race condition: filters created after a reorg no longer receive
   false watermarks, guarded by a created_at timestamp comparison.

2. Missing removed logs: get_filter_changes now emits `removed: true`
   logs per the Ethereum JSON-RPC spec. Each pending reorg's snapshot
   determines which removed blocks the client already saw.

Restructure ReorgNotification to group logs per block (RemovedBlock)
so filters can determine relevance by block number. The Arc sharing
means no log data is cloned until poll time, and automatic drop
eliminates the need for cleanup passes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@prestwich prestwich changed the base branch from james/eng-1970 to develop March 13, 2026 17:50
@prestwich prestwich marked this pull request as ready for review March 13, 2026 17:50
@prestwich
Copy link
Member Author

@Evalir we have redesigned the watermarking to store pending notification in each filter

@prestwich prestwich requested a review from Fraser999 March 13, 2026 17:50
@prestwich
Copy link
Member Author

[Claude Code]

Code review

Found 2 issues:

  1. Removed logs silently discarded on early-return paths in get_filter_changes.
    drain_reorgs() is called unconditionally at the top and populates removed, but two early-return paths discard those logs without sending them to the client:

    • The implicit reorg guard (if latest + 1 < start) returns entry.empty_output(). The comment says "Removed logs are unavailable in this degraded path," but they are available in removed when the explicit broadcast was received before the implicit check fires.
    • The existing if start > latest guard also returns entry.empty_output(). After a reorg rewinds next_start_block to common_ancestor + 1, if latest == common_ancestor this path fires and the drained removed logs are dropped.

    In both cases the client never receives the removed: true logs it needs to invalidate stale state.

    let latest = ctx.tags().latest();
    let start = entry.next_start_block();
    // Implicit reorg detection: if latest has moved backward past our
    // window, a reorg occurred that we missed (e.g. broadcast lagged).
    // Reset to avoid skipping. Removed logs are unavailable in this
    // degraded path.
    if latest + 1 < start {
    trace!(latest, start, "implicit reorg detected, resetting filter");
    entry.mark_polled(latest);
    return Ok(entry.empty_output());
    }
    if start > latest {
    entry.mark_polled(latest);
    return Ok(entry.empty_output());
    }
    let cold = ctx.cold();

  2. block_timestamp regression from PR feat: handle ChainEvent::Reorg in SubscriptionTask #97 review feedback.
    PR feat: handle ChainEvent::Reorg in SubscriptionTask #97 added full headers to RemovedBlock specifically to populate block_timestamp on removed logs, per Fraser999's review comment requesting Ethereum JSON-RPC spec compliance. This PR replaces header: Header with flat number/hash fields, dropping timestamp entirely. Both drain_reorgs and filter_reorg_for_sub now set block_timestamp: None. The test that previously asserted block_timestamp.unwrap() was also removed. Consider adding a timestamp: u64 field to RemovedBlock to preserve spec compliance.

    /// A block that was removed during a chain reorganization.
    #[derive(Debug, Clone)]
    pub struct RemovedBlock {
    /// The block number.
    pub number: u64,
    /// The block hash.
    pub hash: alloy::primitives::B256,
    /// Logs emitted in the removed block.
    pub logs: Vec<alloy::primitives::Log>,

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

prestwich and others added 2 commits March 14, 2026 08:56
Update module-level and struct-level documentation to reflect the new
FilterReorgTask tokio worker spawned alongside the existing OS cleanup
thread. Remove unnecessary `if !removed.is_empty()` guard around the
prepend logic in get_filter_changes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Thread timestamp from DrainedBlock.header through RemovedBlock so that
drain_reorgs and filter_reorg_for_sub populate block_timestamp on
removed logs, restoring Ethereum JSON-RPC spec compliance lost during
the reorg redesign.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

block timestamp regression addressed in 74cad40

Per-filter `pending_reorgs` accumulated `Arc<ReorgNotification>`
eagerly via O(n) iteration on every reorg. Two early-return paths in
`get_filter_changes` then discarded the drained removed logs —
notably `start > latest` fires every post-reorg poll before new
blocks arrive.

Replace with a global `RwLock<VecDeque<(Instant, Arc)>>` ring buffer
(cap 25) on FilterManagerInner. Filters compute removed logs lazily
at poll time by scanning entries received since `last_poll_time`,
walking reorgs in order to derive snapshots. Both early-return paths
now return removed logs instead of `empty_output()`.

- Remove `pending_reorgs`, `created_at`, `push_reorg`, `drain_reorgs`
  from ActiveFilter
- Add `compute_removed_logs`, `last_poll_time` accessor
- Add `push_reorg` (ring buffer append) and `reorgs_since` to
  FilterManagerInner
- Simplify FilterReorgTask to just append (no per-filter iteration)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

4c9f2f1 includes a further refactor that addresses the dropped logs

@prestwich
Copy link
Member Author

[Claude Code]

Code review

Found 2 issues:

  1. mark_polled(latest) overwrites the reorg rewind on the implicit-reorg path. compute_removed_logs rewinds next_start_block to common_ancestor + 1, then mark_polled(latest) unconditionally sets it to latest + 1. Since latest < common_ancestor on this branch, the rewind is lost and the next poll skips blocks between latest + 1 and common_ancestor.

// Return any removed logs we do have, then reset.
if latest + 1 < start {
trace!(latest, start, "implicit reorg detected, resetting filter");
entry.mark_polled(latest);
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});

  1. block_timestamp regression guard removed from filter_reorg_for_sub_matches_logs. PR feat: handle ChainEvent::Reorg in SubscriptionTask #97 added this assertion per Fraser999's review for spec compliance. Commit 74cad40 on this branch fixed the production code, but the test assertion was dropped in 4c9f2f1 — leaving the fix untested.

assert_eq!(logs.len(), 1);
assert!(logs[0].removed);
assert_eq!(logs[0].inner.address, addr);
assert_eq!(logs[0].block_hash, Some(block_hash));
assert_eq!(logs[0].block_number, Some(11));
}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Replace mark_polled(latest) with touch_poll_time() on the implicit-reorg
and no-new-blocks early-return paths. mark_polled unconditionally
overwrites next_start_block, discarding the rewind applied by
compute_removed_logs. The new touch_poll_time method updates only
last_poll_time so the next forward scan starts from the correct position.

Also restore the block_timestamp assertion in filter_reorg_for_sub test
that was dropped during the ring buffer refactor.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Member Author

[Claude Code]

Ring buffer enthusiasm score for this PR: 11/10. We have graduated from "store everything per-filter" to "one ring buffer to rule them all." Next PR will presumably replace the database with a very large ring buffer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants