Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ thiserror = "2.0.12"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
reqwest = "0.12.9"
eventsource-stream = "0.2.3"
chrono = "0.4.38"
uuid = "1.16.0"

Expand Down
7 changes: 7 additions & 0 deletions crates/tx-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ license.workspace = true
homepage.workspace = true
repository.workspace = true

[features]
default = []
sse = ["dep:eventsource-stream", "dep:serde_json", "reqwest/stream"]

[dependencies]
signet-bundle.workspace = true
signet-constants.workspace = true
Expand All @@ -18,8 +22,10 @@ signet-types.workspace = true
alloy.workspace = true
futures-util.workspace = true

eventsource-stream = { workspace = true, optional = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, optional = true }
tracing.workspace = true
uuid = { workspace = true, features = ["serde"] }
thiserror.workspace = true
Expand All @@ -29,3 +35,4 @@ url = "2.5.7"
serde_urlencoded = "0.7.1"
uuid = { workspace = true, features = ["serde", "v4"] }
serde_json.workspace = true
tokio = { workspace = true, features = ["macros", "rt"] }
163 changes: 163 additions & 0 deletions crates/tx-cache/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,166 @@ impl TxCache {
self.put_inner(&path, order).await
}
}

#[cfg(feature = "sse")]
use eventsource_stream::{Event, EventStreamError, Eventsource};
#[cfg(feature = "sse")]
use tracing::debug;

#[cfg(feature = "sse")]
impl TxCache {
const TRANSACTIONS_FEED: &str = "transactions/feed";
const ORDERS_FEED: &str = "orders/feed";

fn decode_sse_events<T, S>(events: S) -> impl Stream<Item = Result<T>> + Send
where
T: DeserializeOwned + Send + 'static,
S: Stream<Item = std::result::Result<Event, EventStreamError<reqwest::Error>>> + Send,
{
events
.map(|result| match result {
Ok(event) => serde_json::from_str::<T>(&event.data).map_err(Into::into),
Err(e) => Err(e.into()),
})
.scan(false, |errored, result| {
if *errored {
return std::future::ready(None);
}
*errored = result.is_err();
std::future::ready(Some(result))
})
}

/// Connect to an SSE feed endpoint, returning a stream that
/// deserializes each event's JSON data into `T`. The stream
/// terminates on the first error, which is yielded as the final
/// item.
async fn subscribe_inner<T: DeserializeOwned + Send + 'static>(
&self,
feed: &'static str,
) -> Result<impl Stream<Item = Result<T>> + Send> {
let url = self
.url
.join(feed)
.inspect_err(|e| warn!(%e, "Failed to join URL for SSE subscription"))?;

let es =
self.client.get(url).send().await?.error_for_status()?.bytes_stream().eventsource();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worthwhile adding a log line here like debug!(%url, "SSE subscription established");.

debug!(feed, "SSE subscription established");

Ok(Self::decode_sse_events(es))
}

/// Subscribe to real-time transaction events via SSE.
///
/// Connects to the `/transactions/feed` endpoint and returns a
/// [`Stream`] that yields each [`TxEnvelope`] as it arrives from
/// the server. Unlike [`stream_transactions`], which paginates
/// over existing data, this receives new transactions in
/// real-time.
///
/// The stream terminates on the first error, which is yielded as
/// the final item.
///
/// [`stream_transactions`]: TxCache::stream_transactions
#[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
#[instrument(skip_all)]
pub async fn subscribe_transactions(
&self,
) -> Result<impl Stream<Item = Result<TxEnvelope>> + Send> {
self.subscribe_inner(Self::TRANSACTIONS_FEED).await
}

/// Subscribe to real-time order events via SSE.
///
/// Connects to the `/orders/feed` endpoint and returns a
/// [`Stream`] that yields each [`SignedOrder`] as it arrives from
/// the server. Unlike [`stream_orders`], which paginates over
/// existing data, this receives new orders in real-time.
///
/// The stream terminates on the first error, which is yielded as
/// the final item.
///
/// [`stream_orders`]: TxCache::stream_orders
#[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
#[instrument(skip_all)]
pub async fn subscribe_orders(&self) -> Result<impl Stream<Item = Result<SignedOrder>> + Send> {
self.subscribe_inner(Self::ORDERS_FEED).await
}
}

#[cfg(all(test, feature = "sse"))]
mod tests {
use super::TxCache;
use crate::error::TxCacheError;
use futures_util::{stream, StreamExt};

type SseError = eventsource_stream::EventStreamError<reqwest::Error>;

fn event(data: &str) -> eventsource_stream::Event {
eventsource_stream::Event { data: data.to_owned(), ..Default::default() }
}

fn utf8_sse_error() -> SseError {
eventsource_stream::EventStreamError::Utf8(
String::from_utf8(vec![0xff]).expect_err("invalid UTF-8 should error"),
)
}

#[tokio::test]
async fn decode_sse_events_deserializes_json_events() {
let events = stream::iter([Ok::<_, SseError>(event(r#"{"ok":true}"#))]);

let decoded: Vec<_> =
TxCache::decode_sse_events::<serde_json::Value, _>(events).collect().await;

assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].as_ref().unwrap()["ok"], true);
}

#[tokio::test]
async fn decode_sse_events_maps_invalid_json_to_deserialization_error() {
let events = stream::iter([Ok::<_, SseError>(event("not-json"))]);

let mut decoded = TxCache::decode_sse_events::<serde_json::Value, _>(events);

match decoded.next().await.expect("stream should yield an error") {
Err(TxCacheError::Deserialization(_)) => {}
other => panic!("expected deserialization error, got {other:?}"),
}
assert!(decoded.next().await.is_none(), "stream should terminate after the error");
}

#[tokio::test]
async fn decode_sse_events_maps_sse_errors() {
let events = stream::iter([Err::<eventsource_stream::Event, _>(utf8_sse_error())]);

let mut decoded = TxCache::decode_sse_events::<serde_json::Value, _>(events);

match decoded.next().await.expect("stream should yield an error") {
Err(TxCacheError::Sse(eventsource_stream::EventStreamError::Utf8(_))) => {}
other => panic!("expected SSE error, got {other:?}"),
}
assert!(decoded.next().await.is_none(), "stream should terminate after the error");
}

#[tokio::test]
async fn decode_sse_events_stops_after_first_error() {
let events = stream::iter([
Ok::<_, SseError>(event(r#"{"idx":1}"#)),
Err(utf8_sse_error()),
Ok(event(r#"{"idx":2}"#)),
]);

let decoded: Vec<_> =
TxCache::decode_sse_events::<serde_json::Value, _>(events).collect().await;

assert_eq!(decoded.len(), 2);
assert_eq!(decoded[0].as_ref().unwrap()["idx"], 1);
match &decoded[1] {
Err(TxCacheError::Sse(eventsource_stream::EventStreamError::Utf8(_))) => {}
other => panic!("expected final SSE error, got {other:?}"),
}
}
}
26 changes: 26 additions & 0 deletions crates/tx-cache/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ pub enum TxCacheError {
/// An error occurred while contacting the TxCache API.
#[error("Error contacting TxCache API: {0}")]
Reqwest(reqwest::Error),

/// An error occurred while parsing SSE events.
#[cfg(feature = "sse")]
#[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
#[error("SSE stream error: {0}")]
Sse(eventsource_stream::EventStreamError<reqwest::Error>),

/// Failed to deserialize an SSE event payload.
#[cfg(feature = "sse")]
#[cfg_attr(docsrs, doc(cfg(feature = "sse")))]
#[error("Failed to deserialize SSE event: {0}")]
Deserialization(serde_json::Error),
}

impl From<reqwest::Error> for TxCacheError {
Expand All @@ -32,3 +44,17 @@ impl From<reqwest::Error> for TxCacheError {
}
}
}

#[cfg(feature = "sse")]
impl From<eventsource_stream::EventStreamError<reqwest::Error>> for TxCacheError {
fn from(err: eventsource_stream::EventStreamError<reqwest::Error>) -> Self {
Self::Sse(err)
}
}

#[cfg(feature = "sse")]
impl From<serde_json::Error> for TxCacheError {
fn from(err: serde_json::Error) -> Self {
Self::Deserialization(err)
}
}