Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "builder"
version = "1.0.0-rc.5"
version = "1.0.0-rc.6"
description = "signet builder example"

edition = "2024"
Expand Down Expand Up @@ -52,7 +52,6 @@ eyre = "0.6.12"
futures-util = "0.3.31"
openssl = { version = "0.10", features = ["vendored"] }
reqwest = { version = "0.12.22", features = ["blocking", "json"] }
serde = { version = "1.0.197", features = ["derive"] }
thiserror = "2.0.17"
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
tokio-stream = "0.1.17"
Expand Down
45 changes: 28 additions & 17 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use signet_tx_cache::{
TxCacheError,
types::{BundleKey, CachedBundle},
};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, error, trace, trace_span};
use tracing::{Instrument, trace, trace_span, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -50,25 +53,33 @@ impl BundlePoller {
Duration::from_millis(self.poll_interval_ms)
}

/// Checks the bundle cache for new bundles.
/// Fetches all bundles from the tx-cache, paginating through all available pages.
pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
let res = self.tx_cache.get_bundles(None).await;
let mut all_bundles = Vec::new();
let mut cursor: Option<BundleKey> = None;

match res {
Ok(resp) => {
let bundles = resp.into_inner();
trace!(count = ?bundles.bundles.len(), "found bundles");
Ok(bundles.bundles)
}
Err(err) => {
if matches!(&err, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) {
trace!("Not our slot to fetch bundles");
} else {
error!(?err, "Failed to fetch bundles from tx-cache");
loop {
let resp = match self.tx_cache.get_bundles(cursor).await {
Ok(resp) => resp,
Err(error) => {
if matches!(&error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) {
trace!("Not our slot to fetch bundles");
} else {
warn!(%error, "Failed to fetch bundles from tx-cache");
}
return Err(error);
}
Err(err)
}
};

let (bundle_list, next_cursor) = resp.into_parts();
all_bundles.extend(bundle_list.bundles);

let Some(next) = next_cursor else { break };
cursor = Some(next);
}

trace!(count = all_bundles.len(), "fetched all bundles from tx-cache");
Ok(all_bundles)
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
Expand Down
44 changes: 14 additions & 30 deletions src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
//! Transaction service responsible for fetching and sending trasnsactions to the simulator.
//! Transaction service responsible for fetching and sending transactions to the simulator.
use crate::config::BuilderConfig;
use alloy::{
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
providers::Provider,
};
use eyre::Error;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use futures_util::TryStreamExt;
use signet_tx_cache::{TxCache, TxCacheError};
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle, time};
use tracing::{Instrument, debug, debug_span, trace, trace_span};

/// Poll interval for the transaction poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;

/// Models a response from the transaction pool.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TxPoolResponse {
/// Holds the transactions property as a list on the response.
transactions: Vec<TxEnvelope>,
}

/// Implements a poller for the block builder to pull transactions from the
/// transaction pool.
#[derive(Debug, Clone)]
pub struct TxPoller {
/// Config values from the Builder.
config: &'static BuilderConfig,
/// Reqwest Client for fetching transactions from the cache.
client: Client,
/// Client for the tx cache.
tx_cache: TxCache,
/// Defines the interval at which the service should poll the cache.
poll_interval_ms: u64,
}
Expand All @@ -51,7 +43,8 @@ impl TxPoller {
/// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
let config = crate::config();
Self { config, client: Client::new(), poll_interval_ms }
let tx_cache = TxCache::new(config.tx_pool_url.clone());
Self { config, tx_cache, poll_interval_ms }
}

/// Returns the poll duration as a [`Duration`].
Expand Down Expand Up @@ -98,21 +91,12 @@ impl TxPoller {
});
}

/// Polls the transaction cache for transactions.
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
let url: Url = self.config.tx_pool_url.join("transactions")?;
self.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await
.map(|resp: TxPoolResponse| resp.transactions)
.map_err(Into::into)
/// Polls the transaction cache for transactions, paginating through all available pages.
pub async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> {
self.tx_cache.stream_transactions().try_collect().await
}

async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
async fn task_future(self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
loop {
let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url);

Expand All @@ -124,12 +108,12 @@ impl TxPoller {
}

if let Ok(transactions) =
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| {
debug!(%err, "Error fetching transactions");
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|error| {
debug!(%error, "Error fetching transactions");
})
{
let _guard = span.entered();
trace!(count = ?transactions.len(), "found transactions");
trace!(count = transactions.len(), "found transactions");
for tx in transactions.into_iter() {
self.spawn_check_nonce(tx, outbound.clone());
}
Expand Down
5 changes: 2 additions & 3 deletions tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use builder::{
tasks::cache::TxPoller,
test_utils::{new_signed_tx, setup_logging, setup_test_config},
};
// Import the refactored function
use eyre::{Ok, Result};

#[ignore = "integration test"]
Expand All @@ -16,9 +15,9 @@ async fn test_tx_roundtrip() -> Result<()> {
post_tx().await?;

// Create a new poller
let mut poller = TxPoller::new();
let poller = TxPoller::new();

// Fetch transactions the pool
// Fetch transactions from the pool
let transactions = poller.check_tx_cache().await?;

// Ensure at least one transaction exists
Expand Down