Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,12 @@ where
let removed_blocks = drained
.into_iter()
.map(|d| {
let header = d.header.into_inner();
let number = d.header.number();
let hash = d.header.hash();
let timestamp = d.header.timestamp();
let logs =
d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect();
RemovedBlock { header, logs }
RemovedBlock { number, hash, timestamp, logs }
})
.collect();
let notif = ReorgNotification { common_ancestor, removed_blocks };
Expand Down
6 changes: 5 additions & 1 deletion crates/rpc/src/config/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ impl<H: HotKv> StorageRpcCtx<H> {
config: StorageRpcConfig,
) -> Self {
let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests));
let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl);
let filter_manager = FilterManager::new(
&chain.notif_sender(),
config.stale_filter_ttl,
config.stale_filter_ttl,
);
let sub_manager = SubscriptionManager::new(chain.notif_sender(), config.stale_filter_ttl);
let gas_cache = GasOracleCache::new();
Self {
Expand Down
42 changes: 38 additions & 4 deletions crates/rpc/src/eth/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use revm_inspectors::access_list::AccessListInspector;
use serde::Serialize;
use signet_cold::{HeaderSpecifier, ReceiptSpecifier};
use signet_hot::{HistoryRead, HotKv, db::HotDbRead, model::HotKvRead};
use tracing::{Instrument, debug, trace_span};
use tracing::{Instrument, debug, trace, trace_span};
use trevm::{
EstimationResult, revm::context::result::ExecutionResult, revm::database::DBErrorMarker,
};
Expand Down Expand Up @@ -1078,12 +1078,38 @@ where
let fm = ctx.filter_manager();
let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?;

// Scan the global reorg ring buffer for notifications received
// since this filter's last poll, then lazily compute removed logs
// and rewind `next_start_block`.
let reorgs = fm.reorgs_since(entry.last_poll_time());
let removed = entry.compute_removed_logs(&reorgs);
if !removed.is_empty() {
trace!(count = removed.len(), "computed removed logs from reorg ring buffer");
}

let latest = ctx.tags().latest();
let start = entry.next_start_block();

// Implicit reorg detection: if latest has moved backward past our
// window, a reorg occurred that we missed (e.g. broadcast lagged).
// Return any removed logs we do have, then reset.
if latest + 1 < start {
trace!(latest, start, "implicit reorg detected, resetting filter");
entry.touch_poll_time();
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});
}

if start > latest {
entry.mark_polled(latest);
return Ok(entry.empty_output());
entry.touch_poll_time();
return Ok(if removed.is_empty() {
entry.empty_output()
} else {
FilterOutput::from(removed)
});
}

let cold = ctx.cold();
Expand All @@ -1110,7 +1136,15 @@ where
let stream =
cold.stream_logs(resolved, max_logs, deadline).await.map_err(|e| e.to_string())?;

let logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?;
let mut logs = collect_log_stream(stream).await.map_err(|e| e.to_string())?;

// Prepend removed logs so the client sees removals before
// the replacement data.
if !removed.is_empty() {
let mut combined = removed;
combined.append(&mut logs);
logs = combined;
}

entry.mark_polled(latest);
Ok(FilterOutput::from(logs))
Expand Down
Loading