Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ signet-tx-cache = "0.16.0-rc.11"
signet-types = "0.16.0-rc.11"
signet-zenith = "0.16.0-rc.11"
signet-journal = "0.16.0-rc.11"
signet-storage = "0.6.4"
signet-cold = "0.6.4"
signet-hot = "0.6.4"
signet-hot-mdbx = "0.6.4"
signet-cold-mdbx = "0.6.4"
signet-storage-types = "0.6.4"
signet-storage = "0.6.5"
signet-cold = "0.6.5"
signet-hot = "0.6.5"
signet-hot-mdbx = "0.6.5"
signet-cold-mdbx = "0.6.5"
signet-storage-types = "0.6.5"

# ajj
ajj = { version = "0.6.0" }
Expand Down Expand Up @@ -114,11 +114,6 @@ url = "2.5.4"
tempfile = "3.17.0"

[patch.crates-io]
signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
signet-hot = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
signet-storage-types = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }

# signet-bundle = { path = "../sdk/crates/bundle"}
# signet-constants = { path = "../sdk/crates/constants"}
# signet-evm = { path = "../sdk/crates/evm"}
Expand Down
18 changes: 11 additions & 7 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1};
use signet_evm::EthereumHardfork;
use signet_extract::Extractor;
use signet_node_config::SignetNodeConfig;
use signet_rpc::{ChainNotifier, NewBlockNotification, ReorgNotification, RpcServerGuard};
use signet_rpc::{
ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard,
};
use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage};
use signet_types::{PairedHeights, constants::SignetSystemConstants};
use std::{fmt, sync::Arc};
Expand Down Expand Up @@ -363,14 +365,16 @@ where

/// Send a reorg notification on the broadcast channel.
fn notify_reorg(&self, drained: Vec<DrainedBlock>, common_ancestor: u64) {
let removed_hashes = drained.iter().map(|d| d.header.hash()).collect();
let removed_logs = drained
let removed_blocks = drained
.into_iter()
.flat_map(|d| d.receipts)
.flat_map(|r| r.receipt.logs)
.map(|l| l.inner)
.map(|d| {
let header = d.header.into_inner();
let logs =
d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect();
RemovedBlock { header, logs }
})
.collect();
let notif = ReorgNotification { common_ancestor, removed_hashes, removed_logs };
let notif = ReorgNotification { common_ancestor, removed_blocks };
// Ignore send errors — no subscribers is fine.
let _ = self.chain.send_reorg(notif);
}
Expand Down
116 changes: 115 additions & 1 deletion crates/rpc/src/interest/kind.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Filter kinds for subscriptions and polling filters.

use crate::interest::{NewBlockNotification, filters::FilterOutput, subs::SubscriptionBuffer};
use crate::interest::{
NewBlockNotification, ReorgNotification, filters::FilterOutput, subs::SubscriptionBuffer,
};
use alloy::rpc::types::{Filter, Header, Log};
use std::collections::VecDeque;

Expand Down Expand Up @@ -94,4 +96,116 @@ impl InterestKind {
Self::Block => SubscriptionBuffer::Block(VecDeque::new()),
}
}

/// Filter a reorg notification for a subscription, producing a buffer of
/// removed logs (with `removed: true`) that match this filter.
///
/// Block subscriptions return an empty buffer — the Ethereum JSON-RPC
/// spec does not push removed headers for `newHeads` subscriptions.
pub(crate) fn filter_reorg_for_sub(&self, reorg: ReorgNotification) -> SubscriptionBuffer {
let Some(filter) = self.as_filter() else {
return self.empty_sub_buffer();
};

let logs: VecDeque<Log> = reorg
.removed_blocks
.into_iter()
.flat_map(|block| {
let block_hash = block.header.hash_slow();
let block_number = block.header.number;
let block_timestamp = block.header.timestamp;
block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log {
inner: log,
block_hash: Some(block_hash),
block_number: Some(block_number),
block_timestamp: Some(block_timestamp),
transaction_hash: None,
transaction_index: None,
log_index: None,
removed: true,
})
})
.collect();

SubscriptionBuffer::Log(logs)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::interest::RemovedBlock;
use alloy::primitives::{Address, B256, Bytes, LogData, address, b256};

fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log {
alloy::primitives::Log {
address: addr,
data: LogData::new_unchecked(vec![topic], Bytes::new()),
}
}

fn test_header(number: u64) -> alloy::consensus::Header {
alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() }
}

fn test_filter(addr: Address) -> Filter {
Filter::new().address(addr)
}

#[test]
fn filter_reorg_for_sub_matches_logs() {
let addr = address!("0x0000000000000000000000000000000000000001");
let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001");

let header = test_header(11);
let kind = InterestKind::Log(Box::new(test_filter(addr)));
let reorg = ReorgNotification {
common_ancestor: 10,
removed_blocks: vec![RemovedBlock {
header: header.clone(),
logs: vec![test_log(addr, topic)],
}],
};

let buf = kind.filter_reorg_for_sub(reorg);
let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") };

assert_eq!(logs.len(), 1);
assert!(logs[0].removed);
assert_eq!(logs[0].inner.address, addr);
assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow());
assert_eq!(logs[0].block_number.unwrap(), 11);
assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011);
}

#[test]
fn filter_reorg_for_sub_filters_non_matching() {
let addr = address!("0x0000000000000000000000000000000000000001");
let other = address!("0x0000000000000000000000000000000000000002");
let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001");

let kind = InterestKind::Log(Box::new(test_filter(addr)));
let reorg = ReorgNotification {
common_ancestor: 10,
removed_blocks: vec![RemovedBlock {
header: test_header(11),
logs: vec![test_log(other, topic)],
}],
};

let buf = kind.filter_reorg_for_sub(reorg);
let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") };
assert!(logs.is_empty());
}

#[test]
fn filter_reorg_for_sub_block_returns_empty() {
let reorg = ReorgNotification {
common_ancestor: 10,
removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }],
};

let buf = InterestKind::Block.filter_reorg_for_sub(reorg);
assert!(buf.is_empty());
}
}
15 changes: 11 additions & 4 deletions crates/rpc/src/interest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,20 @@ pub enum ChainEvent {
Reorg(ReorgNotification),
}

/// Data from a single block removed during a chain reorganization.
#[derive(Debug, Clone)]
pub struct RemovedBlock {
/// The header of the removed block.
pub header: alloy::consensus::Header,
/// Logs emitted by the removed block.
pub logs: Vec<alloy::primitives::Log>,
}

/// Notification sent when a chain reorganization is detected.
#[derive(Debug, Clone)]
pub struct ReorgNotification {
/// The block number of the common ancestor (last block still valid).
pub common_ancestor: u64,
/// Hashes of the removed blocks.
pub removed_hashes: Vec<alloy::primitives::B256>,
/// Logs from the removed blocks (needed for `removed: true` emission).
pub removed_logs: Vec<alloy::primitives::Log>,
/// Blocks removed by the reorg, each carrying its header and logs.
pub removed_blocks: Vec<RemovedBlock>,
}
10 changes: 8 additions & 2 deletions crates/rpc/src/interest/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,14 @@ impl SubscriptionTask {
};
let notif = match event {
ChainEvent::NewBlock(notif) => *notif,
// Reorg handling will be addressed in future PRs (ENG-1968 et al.)
ChainEvent::Reorg(_) => continue,
ChainEvent::Reorg(reorg) => {
let output = filter.filter_reorg_for_sub(reorg);
trace!(count = output.len(), "Reorg filter applied");
if !output.is_empty() {
notif_buffer.extend(output);
}
continue;
}
};

let output = filter.filter_notification_for_sub(&notif);
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod eth;
pub use eth::EthError;

mod interest;
pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification};
pub use interest::{ChainEvent, NewBlockNotification, RemovedBlock, ReorgNotification};

mod debug;
pub use debug::DebugError;
Expand Down
Loading