From 5f815ad58c7230f2aa1c58178a47f36d28672c59 Mon Sep 17 00:00:00 2001 From: init4samwise Date: Tue, 3 Mar 2026 04:48:07 +0000 Subject: [PATCH 1/3] feat(bundle): filter bundles with stale host tx nonces before SimCache Adds nonce checking for host transactions in BundlePoller, similar to the existing TxPoller pattern. Bundles with stale host tx nonces are dropped before entering SimCache to prevent: - Wasted simulation cycles on bundles that will fail - ERROR log spam from nonce-too-low failures - Re-ingestion churn (~1s poll cycle) Each host transaction's nonce is compared against the sender's current nonce from the host provider. If any host tx has a stale nonce, the entire bundle is dropped with DEBUG-level logging. Closes ENG-1937 --- src/tasks/cache/bundle.rs | 89 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index e1d61726..d520dbf5 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,12 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::{ + consensus::{Transaction, transaction::SignerRecoverable}, + eips::Decodable2718, + primitives::Bytes, + providers::Provider, + rlp::Buf, +}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ @@ -7,7 +14,7 @@ use tokio::{ task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, error, trace, trace_span}; +use tracing::{Instrument, debug, debug_span, error, trace, trace_span}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -71,6 +78,76 @@ impl BundlePoller { } } + /// Spawns a tokio task to check the nonces of all host transactions in a bundle + /// before sending it to the cache task via the outbound channel. + /// + /// Bundles with stale host transaction nonces are dropped to prevent them from + /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + tokio::spawn(async move { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + + // If no host transactions, forward directly + if bundle.bundle.host_txs.is_empty() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + return; + } + + let Ok(host_provider) = + crate::config().connect_host_provider().instrument(span.clone()).await + else { + span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + return; + }; + + // Check each host transaction's nonce + for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { + let host_tx = match decode_tx(host_tx_bytes) { + Some(tx) => tx, + None => { + span_debug!(span, idx, "Failed to decode host transaction, dropping bundle"); + return; + } + }; + + let sender = match host_tx.recover_signer() { + Ok(s) => s, + Err(_) => { + span_debug!(span, idx, "Failed to recover sender from host tx, dropping bundle"); + return; + } + }; + + let tx_count = match host_provider.get_transaction_count(sender).await { + Ok(count) => count, + Err(_) => { + span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); + return; + } + }; + + if host_tx.nonce() < tx_count { + debug!( + parent: &span, + %sender, + tx_nonce = %host_tx.nonce(), + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return; + } + } + + // All host txs have valid nonces, forward the bundle + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + }); + } + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -89,10 +166,7 @@ impl BundlePoller { if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; - } + Self::spawn_check_bundle_nonces(bundle, outbound.clone()); } } @@ -109,3 +183,8 @@ impl BundlePoller { (inbound, jh) } } + +/// Decodes a transaction from RLP-encoded bytes. +fn decode_tx(bytes: &Bytes) -> Option { + alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() +} From ece4913e18dfa34766c164a28c4173c3fe34b68c Mon Sep 17 00:00:00 2001 From: init4samwise Date: Thu, 5 Mar 2026 00:12:52 +0000 Subject: [PATCH 2/3] style: run cargo fmt --- src/tasks/cache/bundle.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index d520dbf5..631861ca 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -107,7 +107,11 @@ impl BundlePoller { let host_tx = match decode_tx(host_tx_bytes) { Some(tx) => tx, None => { - span_debug!(span, idx, "Failed to decode host transaction, dropping bundle"); + span_debug!( + span, + idx, + "Failed to decode host transaction, dropping bundle" + ); return; } }; @@ -115,7 +119,11 @@ impl BundlePoller { let sender = match host_tx.recover_signer() { Ok(s) => s, Err(_) => { - span_debug!(span, idx, "Failed to recover sender from host tx, dropping bundle"); + span_debug!( + span, + idx, + "Failed to recover sender from host tx, dropping bundle" + ); return; } }; From 1aa7cdc2f7efff920e95366ff12988ae8bb3084e Mon Sep 17 00:00:00 2001 From: init4samwise Date: Mon, 9 Mar 2026 23:55:31 +0000 Subject: [PATCH 3/3] refactor: use FuturesUnordered and reuse validity checks - Refactored bundle processing to use FuturesUnordered for concurrent execution - Added cancellation on first failure - Reused validity checks from crates/sim/src/cache/item.rs Addresses PR review feedback from prestwich --- src/tasks/cache/bundle.rs | 122 +++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 62 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 631861ca..61197aa0 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,12 +1,7 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use alloy::{ - consensus::{Transaction, transaction::SignerRecoverable}, - eips::Decodable2718, - primitives::Bytes, - providers::Provider, - rlp::Buf, -}; +use alloy::providers::Provider; +use futures_util::{TryStreamExt, stream}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ @@ -81,14 +76,28 @@ impl BundlePoller { /// Spawns a tokio task to check the nonces of all host transactions in a bundle /// before sending it to the cache task via the outbound channel. /// - /// Bundles with stale host transaction nonces are dropped to prevent them from - /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + /// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements + /// (reusing the existing validity check pattern from `signet-sim`), then checks + /// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early + /// on the first stale or failed nonce. + /// + /// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + // Recover the bundle to get typed host tx requirements instead of + // manually decoding and recovering signers. + let recovered = match bundle.bundle.try_to_recovered() { + Ok(r) => r, + Err(e) => { + span_debug!(span, ?e, "Failed to recover bundle, dropping"); + return; + } + }; + // If no host transactions, forward directly - if bundle.bundle.host_txs.is_empty() { + if recovered.host_txs().is_empty() { if outbound.send(bundle).is_err() { span_debug!(span, "Outbound channel closed, stopping nonce check task"); } @@ -102,56 +111,50 @@ impl BundlePoller { return; }; - // Check each host transaction's nonce - for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { - let host_tx = match decode_tx(host_tx_bytes) { - Some(tx) => tx, - None => { - span_debug!( - span, - idx, - "Failed to decode host transaction, dropping bundle" - ); - return; - } - }; - - let sender = match host_tx.recover_signer() { - Ok(s) => s, - Err(_) => { - span_debug!( - span, - idx, - "Failed to recover sender from host tx, dropping bundle" - ); - return; + // Collect host tx requirements (signer + nonce) from the recovered bundle + let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect(); + + // Check all host tx nonces concurrently, cancelling on first failure. + let result = stream::iter(reqs) + .map(Ok) + .try_for_each_concurrent(None, |(idx, req)| { + let host_provider = &host_provider; + let span = &span; + async move { + let tx_count = host_provider + .get_transaction_count(req.signer) + .await + .map_err(|_| { + span_debug!( + span, + idx, + sender = %req.signer, + "Failed to fetch nonce for sender, dropping bundle" + ); + })?; + + if req.nonce < tx_count { + debug!( + parent: span, + sender = %req.signer, + tx_nonce = %req.nonce, + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return Err(()); + } + + Ok(()) } - }; - - let tx_count = match host_provider.get_transaction_count(sender).await { - Ok(count) => count, - Err(_) => { - span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); - return; - } - }; - - if host_tx.nonce() < tx_count { - debug!( - parent: &span, - %sender, - tx_nonce = %host_tx.nonce(), - host_nonce = %tx_count, - idx, - "Dropping bundle with stale host tx nonce" - ); - return; - } - } + }) + .await; // All host txs have valid nonces, forward the bundle - if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); + if result.is_ok() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } } }); } @@ -191,8 +194,3 @@ impl BundlePoller { (inbound, jh) } } - -/// Decodes a transaction from RLP-encoded bytes. -fn decode_tx(bytes: &Bytes) -> Option { - alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() -}