diff --git a/Cargo.lock b/Cargo.lock index d5cb2fa1..c9f7f185 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2417,7 +2417,7 @@ dependencies = [ [[package]] name = "builder" -version = "1.0.0-rc.5" +version = "1.0.0-rc.6" dependencies = [ "alloy", "alloy-chains", @@ -2430,7 +2430,6 @@ dependencies = [ "openssl", "reqwest", "reth-chainspec", - "serde", "signet-block-processor", "signet-constants", "signet-genesis", diff --git a/Cargo.toml b/Cargo.toml index 1ecced31..ac37b498 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "builder" -version = "1.0.0-rc.5" +version = "1.0.0-rc.6" description = "signet builder example" edition = "2024" @@ -52,7 +52,6 @@ eyre = "0.6.12" futures-util = "0.3.31" openssl = { version = "0.10", features = ["vendored"] } reqwest = { version = "0.12.22", features = ["blocking", "json"] } -serde = { version = "1.0.197", features = ["derive"] } thiserror = "2.0.17" tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } tokio-stream = "0.1.17" diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index e1d61726..38c56ff1 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,13 +1,16 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; -use signet_tx_cache::{TxCacheError, types::CachedBundle}; +use signet_tx_cache::{ + TxCacheError, + types::{BundleKey, CachedBundle}, +}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, error, trace, trace_span}; +use tracing::{Instrument, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -50,25 +53,33 @@ impl BundlePoller { Duration::from_millis(self.poll_interval_ms) } - /// Checks the bundle cache for new bundles. + /// Fetches all bundles from the tx-cache, paginating through all available pages. pub async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { - let res = self.tx_cache.get_bundles(None).await; + let mut all_bundles = Vec::new(); + let mut cursor: Option = None; - match res { - Ok(resp) => { - let bundles = resp.into_inner(); - trace!(count = ?bundles.bundles.len(), "found bundles"); - Ok(bundles.bundles) - } - Err(err) => { - if matches!(&err, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) { - trace!("Not our slot to fetch bundles"); - } else { - error!(?err, "Failed to fetch bundles from tx-cache"); + loop { + let resp = match self.tx_cache.get_bundles(cursor).await { + Ok(resp) => resp, + Err(error) => { + if matches!(&error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) { + trace!("Not our slot to fetch bundles"); + } else { + warn!(%error, "Failed to fetch bundles from tx-cache"); + } + return Err(error); } - Err(err) - } + }; + + let (bundle_list, next_cursor) = resp.into_parts(); + all_bundles.extend(bundle_list.bundles); + + let Some(next) = next_cursor else { break }; + cursor = Some(next); } + + trace!(count = all_bundles.len(), "fetched all bundles from tx-cache"); + Ok(all_bundles) } async fn task_future(self, outbound: UnboundedSender) { diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index bc1efe29..29a792ac 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,12 +1,11 @@ -//! Transaction service responsible for fetching and sending trasnsactions to the simulator. +//! Transaction service responsible for fetching and sending transactions to the simulator. use crate::config::BuilderConfig; use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use eyre::Error; -use reqwest::{Client, Url}; -use serde::{Deserialize, Serialize}; +use futures_util::TryStreamExt; +use signet_tx_cache::{TxCache, TxCacheError}; use std::time::Duration; use tokio::{sync::mpsc, task::JoinHandle, time}; use tracing::{Instrument, debug, debug_span, trace, trace_span}; @@ -14,21 +13,14 @@ use tracing::{Instrument, debug, debug_span, trace, trace_span}; /// Poll interval for the transaction poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; -/// Models a response from the transaction pool. -#[derive(Debug, Clone, Serialize, Deserialize)] -struct TxPoolResponse { - /// Holds the transactions property as a list on the response. - transactions: Vec, -} - /// Implements a poller for the block builder to pull transactions from the /// transaction pool. #[derive(Debug, Clone)] pub struct TxPoller { /// Config values from the Builder. config: &'static BuilderConfig, - /// Reqwest Client for fetching transactions from the cache. - client: Client, + /// Client for the tx cache. + tx_cache: TxCache, /// Defines the interval at which the service should poll the cache. poll_interval_ms: u64, } @@ -51,7 +43,8 @@ impl TxPoller { /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { let config = crate::config(); - Self { config, client: Client::new(), poll_interval_ms } + let tx_cache = TxCache::new(config.tx_pool_url.clone()); + Self { config, tx_cache, poll_interval_ms } } /// Returns the poll duration as a [`Duration`]. @@ -98,21 +91,12 @@ impl TxPoller { }); } - /// Polls the transaction cache for transactions. - pub async fn check_tx_cache(&mut self) -> Result, Error> { - let url: Url = self.config.tx_pool_url.join("transactions")?; - self.client - .get(url) - .send() - .await? - .error_for_status()? - .json() - .await - .map(|resp: TxPoolResponse| resp.transactions) - .map_err(Into::into) + /// Polls the transaction cache for transactions, paginating through all available pages. + pub async fn check_tx_cache(&self) -> Result, TxCacheError> { + self.tx_cache.stream_transactions().try_collect().await } - async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + async fn task_future(self, outbound: mpsc::UnboundedSender) { loop { let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); @@ -124,12 +108,12 @@ impl TxPoller { } if let Ok(transactions) = - self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| { - debug!(%err, "Error fetching transactions"); + self.check_tx_cache().instrument(span.clone()).await.inspect_err(|error| { + debug!(%error, "Error fetching transactions"); }) { let _guard = span.entered(); - trace!(count = ?transactions.len(), "found transactions"); + trace!(count = transactions.len(), "found transactions"); for tx in transactions.into_iter() { self.spawn_check_nonce(tx, outbound.clone()); } diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index 075433ae..bca37e3c 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -3,7 +3,6 @@ use builder::{ tasks::cache::TxPoller, test_utils::{new_signed_tx, setup_logging, setup_test_config}, }; -// Import the refactored function use eyre::{Ok, Result}; #[ignore = "integration test"] @@ -16,9 +15,9 @@ async fn test_tx_roundtrip() -> Result<()> { post_tx().await?; // Create a new poller - let mut poller = TxPoller::new(); + let poller = TxPoller::new(); - // Fetch transactions the pool + // Fetch transactions from the pool let transactions = poller.check_tx_cache().await?; // Ensure at least one transaction exists