diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index e1d61726..61197aa0 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,7 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::providers::Provider; +use futures_util::{TryStreamExt, stream}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ @@ -7,7 +9,7 @@ use tokio::{ task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, error, trace, trace_span}; +use tracing::{Instrument, debug, debug_span, error, trace, trace_span}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -71,6 +73,92 @@ impl BundlePoller { } } + /// Spawns a tokio task to check the nonces of all host transactions in a bundle + /// before sending it to the cache task via the outbound channel. + /// + /// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements + /// (reusing the existing validity check pattern from `signet-sim`), then checks + /// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early + /// on the first stale or failed nonce. + /// + /// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered + fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + tokio::spawn(async move { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + + // Recover the bundle to get typed host tx requirements instead of + // manually decoding and recovering signers. + let recovered = match bundle.bundle.try_to_recovered() { + Ok(r) => r, + Err(e) => { + span_debug!(span, ?e, "Failed to recover bundle, dropping"); + return; + } + }; + + // If no host transactions, forward directly + if recovered.host_txs().is_empty() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + return; + } + + let Ok(host_provider) = + crate::config().connect_host_provider().instrument(span.clone()).await + else { + span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + return; + }; + + // Collect host tx requirements (signer + nonce) from the recovered bundle + let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect(); + + // Check all host tx nonces concurrently, cancelling on first failure. + let result = stream::iter(reqs) + .map(Ok) + .try_for_each_concurrent(None, |(idx, req)| { + let host_provider = &host_provider; + let span = &span; + async move { + let tx_count = host_provider + .get_transaction_count(req.signer) + .await + .map_err(|_| { + span_debug!( + span, + idx, + sender = %req.signer, + "Failed to fetch nonce for sender, dropping bundle" + ); + })?; + + if req.nonce < tx_count { + debug!( + parent: span, + sender = %req.signer, + tx_nonce = %req.nonce, + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return Err(()); + } + + Ok(()) + } + }) + .await; + + // All host txs have valid nonces, forward the bundle + if result.is_ok() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + } + }); + } + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -89,10 +177,7 @@ impl BundlePoller { if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; - } + Self::spawn_check_bundle_nonces(bundle, outbound.clone()); } }