From 45097682e366180a7c2e66f1f06d894138844746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 30 Jan 2026 05:02:30 +0000 Subject: [PATCH 1/5] feat!: Unify PendingRequest via closure-based type erasure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eliminate the duplicated AsyncPendingRequest/BlockingPendingRequest 21-variant enums by replacing them with a single concrete PendingRequest struct that uses a Box handler to type-erase the response dispatch logic. This makes State non-generic, collapses AsyncBatchRequest/BlockingBatchRequest into a single BatchRequest, and removes all async/blocking-specific type aliases for response channels. BatchRequest gains request_async() and request_blocking() convenience methods that create channels internally and return receivers, so callers no longer need to wire up channels manually. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- README.md | 8 +- src/batch_request.rs | 196 ++++++++----------------- src/client.rs | 103 ++++++-------- src/lib.rs | 51 ++----- src/pending_request.rs | 315 ++++++++++++++--------------------------- src/state.rs | 26 ++-- 6 files changed, 228 insertions(+), 471 deletions(-) diff --git a/README.md b/README.md index 10c6d2a..d17f5aa 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ models. ## Example (async with Tokio) ```rust,no_run -use electrum_streaming_client::{AsyncClient, AsyncBatchRequest, Event}; +use electrum_streaming_client::{AsyncClient, Event}; use tokio::net::TcpStream; use futures::StreamExt; @@ -28,11 +28,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(worker); // spawn the client worker task - let mut batch = AsyncBatchRequest::new(); - let fut = batch.request(electrum_streaming_client::request::RelayFee); - client.send_batch(batch)?; - let relay_fee = fut.await?; - + let relay_fee = client.send_request(electrum_streaming_client::request::RelayFee).await?; println!("Relay fee: {relay_fee:?}"); while let Some(event) = events.next().await { diff --git a/src/batch_request.rs b/src/batch_request.rs index e476f7c..f280034 100644 --- a/src/batch_request.rs +++ b/src/batch_request.rs @@ -1,35 +1,30 @@ -use crate::*; +use crate::pending_request::{PendingRequest, RequestExt}; +use crate::{MaybeBatch, ResponseResult}; -/// A builder for batching multiple asynchronous requests to the Electrum server. +/// A builder for batching multiple requests to the Electrum server. /// /// This type allows queuing both: -/// - tracked requests via [`request`] (which return a [`Future`] that resolves to a response), and -/// - event-style requests via [`event_request`] (which emit [`Event`]s through the -/// [`AsyncEventReceiver`] instead of a future). +/// - tracked requests via [`request`] (which take a callback to receive the typed response), and +/// - event-style requests via [`event_request`] (which emit [`Event`]s through the event receiver +/// instead of a callback). /// -/// After building the batch, submit it using [`AsyncClient::send_batch`]. The batch will be -/// converted into a raw JSON-RPC message and sent to the server. -/// -/// **Important:** Do not `.await` any futures returned by [`request`] until *after* the batch has -/// been sent. Doing so will cause the future to block indefinitely, as the request ID is not yet -/// assigned and the response cannot be matched. -/// -/// This type is useful for reducing round-trips and issuing dependent or related requests together. +/// After building the batch, submit it using [`AsyncClient::send_batch`] or +/// [`BlockingClient::send_batch`]. The batch will be converted into a raw JSON-RPC message and +/// sent to the server. /// /// [`request`]: Self::request /// [`event_request`]: Self::event_request -/// [`Future`]: core::future::Future /// [`AsyncClient::send_batch`]: crate::AsyncClient::send_batch -/// [`AsyncEventReceiver`]: crate::AsyncEventReceiver +/// [`BlockingClient::send_batch`]: crate::BlockingClient::send_batch /// [`Event`]: crate::Event #[must_use] #[derive(Debug, Default)] -pub struct AsyncBatchRequest { - inner: Option>, +pub struct BatchRequest { + inner: Option>, } -impl AsyncBatchRequest { - /// Creates a new empty async batch request builder. +impl BatchRequest { + /// Creates a new empty batch request builder. pub fn new() -> Self { Self::default() } @@ -37,150 +32,79 @@ impl AsyncBatchRequest { /// Consumes the batch and returns its raw contents, if any requests were added. /// /// Returns `Some` if the batch is non-empty, or `None` if it was empty. - /// - /// This is used internally by [`AsyncClient::send_batch`] to extract the batched request set. - /// - /// [`AsyncClient::send_batch`]: crate::AsyncClient::send_batch - pub fn into_inner(self) -> Option> { + pub fn into_inner(self) -> Option> { self.inner } - /// Adds a tracked request to the batch and returns a [`Future`] that resolves to the response. - /// - /// This request will be tracked internally. The returned future must only be `.await`ed - /// *after* the batch has been submitted with [`AsyncClient::send_batch`]. Awaiting too early - /// will block forever. - /// - /// # Errors - /// Returns an error if the request could not be added (e.g., duplicate or overflow). + /// Adds a tracked request to the batch with a typed callback. /// - /// [`Future`]: futures::Future - /// [`AsyncClient::send_batch`]: crate::AsyncClient::send_batch - pub fn request( - &mut self, - req: Req, - ) -> impl std::future::Future> - + Send - + Sync - + 'static + /// The callback will be invoked with the deserialized response (or error) once the server + /// replies. The callback is type-erased internally, so it works for both async and blocking + /// clients. + pub fn request(&mut self, req: Req, callback: F) where - Req: Request, - AsyncPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, + F: FnOnce(ResponseResult) + Send + Sync + 'static, { - let (resp_tx, resp_rx) = futures::channel::oneshot::channel(); - MaybeBatch::push_opt(&mut self.inner, (req, Some(resp_tx)).into()); - async move { - resp_rx - .await - .map_err(|_| BatchRequestError::Canceled)? - .map_err(BatchRequestError::Response) - } + MaybeBatch::push_opt(&mut self.inner, PendingRequest::new(req, Some(callback))); } - /// Adds an event-style request to the batch. - /// - /// These requests do not return a future and will not be tracked internally. Any server - /// response (including the initial result and any future notifications) will be delivered as - /// [`Event`]s through the [`AsyncEventReceiver`] stream. + /// Adds a tracked request and returns an async receiver for the response. /// - /// Use this for subscription-style RPCs where responses should be handled uniformly as events. - /// - /// [`Event`]: crate::Event - /// [`AsyncEventReceiver`]: crate::AsyncEventReceiver - pub fn event_request(&mut self, request: Req) + /// This is a convenience wrapper around [`request`](Self::request) that creates a + /// [`futures::channel::oneshot`] channel internally. The returned receiver can be + /// `.await`ed after the batch is sent. + pub fn request_async( + &mut self, + req: Req, + ) -> futures::channel::oneshot::Receiver> where - Req: Request, - AsyncPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, + Req::Response: Send, { - MaybeBatch::push_opt(&mut self.inner, (request, None).into()); + let (tx, rx) = futures::channel::oneshot::channel(); + self.request(req, move |result| { + let _ = tx.send(result); + }); + rx } -} -/// A builder for batching multiple blocking requests to the Electrum server. -/// -/// This type allows queuing both: -/// - tracked requests via [`request`] (which return blocking receivers for the responses), and -/// - event-style requests via [`event_request`] (which emit [`Event`]s through the -/// [`BlockingEventReceiver`] instead of a response handle). -/// -/// After building the batch, submit it using [`BlockingClient::send_batch`]. The batch will be -/// serialized and sent to the server in a single write. -/// -/// **Important:** Do not call `.recv()` on any response receivers returned by [`request`] until -/// *after* the batch has been sent. Receiving early will block forever, as the request has not yet -/// been transmitted and the ID not assigned. -/// -/// [`request`]: Self::request -/// [`event_request`]: Self::event_request -/// [`BlockingClient::send_batch`]: crate::BlockingClient::send_batch -/// [`BlockingEventReceiver`]: crate::BlockingEventReceiver -/// [`Event`]: crate::Event -#[must_use] -#[derive(Debug, Default)] -pub struct BlockingBatchRequest { - inner: Option>, -} - -impl BlockingBatchRequest { - /// Creates a new empty blocking batch request builder. - pub fn new() -> Self { - Self::default() - } - - /// Consumes the batch and returns its raw contents, if any requests were added. - /// - /// Returns `Some` if the batch is non-empty, or `None` if it was empty. - /// - /// This is used internally by [`BlockingClient::send_batch`] to extract the batched request set. - /// - /// [`BlockingClient::send_batch`]: crate::BlockingClient::send_batch - pub fn into_inner(self) -> Option> { - self.inner - } - - /// Adds a tracked request to the batch and returns a receiver for the response. + /// Adds a tracked request and returns a blocking receiver for the response. /// - /// This request will be tracked internally. The returned receiver must only be used - /// *after* the batch has been submitted with [`BlockingClient::send_batch`]. - /// Calling `.recv()` or `.wait()` too early will block indefinitely. - /// - /// # Errors - /// Returns an error if the request could not be added (e.g., duplicate or overflow). - /// - /// [`BlockingClient::send_batch`]: crate::BlockingClient::send_batch - pub fn request(&mut self, req: Req) -> BlockingResponseReceiver + /// This is a convenience wrapper around [`request`](Self::request) that creates a + /// [`std::sync::mpsc::sync_channel`] internally. The returned receiver can be used + /// with [`recv`](std::sync::mpsc::Receiver::recv) after the batch is sent. + pub fn request_blocking( + &mut self, + req: Req, + ) -> std::sync::mpsc::Receiver> where - Req: Request, - BlockingPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, + Req::Response: Send, { - let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1); - MaybeBatch::push_opt(&mut self.inner, (req, Some(resp_tx)).into()); - resp_rx + let (tx, rx) = std::sync::mpsc::sync_channel(1); + self.request(req, move |result| { + let _ = tx.send(result); + }); + rx } /// Adds an event-style request to the batch. /// - /// These requests do not return a receiver and will not be tracked internally. Any server - /// response (including the initial result and any future notifications) will be delivered as - /// [`Event`]s through the [`BlockingEventReceiver`] stream. + /// These requests do not take a callback. Any server response (including the initial result + /// and any future notifications) will be delivered as [`Event`]s through the event receiver. /// /// Use this for subscription-style RPCs where responses should be handled uniformly as events. /// /// [`Event`]: crate::Event - /// [`BlockingEventReceiver`]: crate::BlockingEventReceiver - pub fn event_request(&mut self, request: Req) - where - Req: Request, - BlockingPendingRequestTuple: Into, - { - MaybeBatch::push_opt(&mut self.inner, (request, None).into()); + pub fn event_request(&mut self, req: Req) { + MaybeBatch::push_opt(&mut self.inner, PendingRequest::event(req)); } } -/// An error that can occur when adding a request to a batch or polling its result. +/// An error that can occur when sending a request or polling its result. /// -/// This error is returned by [`AsyncBatchRequest::request`] or [`BlockingBatchRequest::request`] -/// when the future or receiver representing the response cannot complete. +/// This error is returned by client `send_request` methods when the response cannot be obtained. /// /// It typically indicates that the batch was dropped, the client shut down, or the request /// failed to be processed internally. @@ -194,7 +118,7 @@ pub enum BatchRequestError { /// The server returned a response error. /// /// This indicates that the Electrum server replied with an error object, rather than a result. - Response(ResponseError), + Response(crate::ResponseError), } impl std::fmt::Display for BatchRequestError { diff --git a/src/client.rs b/src/client.rs index fb6ea29..0464cb8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,4 @@ +use crate::pending_request::{PendingRequest, RequestExt}; use crate::*; /// An asynchronous Electrum client built on the [`futures`] I/O ecosystem. @@ -63,11 +64,11 @@ impl AsyncClient { { use futures::{channel::mpsc, StreamExt}; let (event_tx, event_recv) = mpsc::unbounded::(); - let (req_tx, mut req_recv) = mpsc::unbounded::>(); + let (req_tx, mut req_recv) = mpsc::unbounded::>(); let mut incoming_stream = crate::io::ReadStreamer::new(futures::io::BufReader::new(reader)).fuse(); - let mut state = State::::new(); + let mut state = State::new(); let mut next_id = 0_u32; let fut = async move { @@ -151,18 +152,16 @@ impl AsyncClient { /// [`AsyncRequestError::Canceled`]. pub async fn send_request(&self, req: Req) -> Result where - Req: Request, - AsyncPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, + Req::Response: Send, { - use futures::TryFutureExt; - let mut batch = AsyncBatchRequest::new(); - let resp_fut = batch.request(req).map_err(|e| match e { - BatchRequestError::Canceled => AsyncRequestError::Canceled, - BatchRequestError::Response(e) => AsyncRequestError::Response(e), - }); + let mut batch = BatchRequest::new(); + let rx = batch.request_async(req); self.send_batch(batch) .map_err(AsyncRequestError::Dispatch)?; - resp_fut.await + rx.await + .map_err(|_| AsyncRequestError::Canceled)? + .map_err(AsyncRequestError::Response) } /// Sends a request that is expected to result in an event-based response (e.g., a @@ -184,10 +183,9 @@ impl AsyncClient { /// [`AsyncRequestSendError`]: crate::AsyncRequestSendError pub fn send_event_request(&self, request: Req) -> Result<(), AsyncRequestSendError> where - Req: Request, - AsyncPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, { - let mut batch = AsyncBatchRequest::new(); + let mut batch = BatchRequest::new(); batch.event_request(request); self.send_batch(batch)?; Ok(()) @@ -195,33 +193,24 @@ impl AsyncClient { /// Sends a batch of requests to the Electrum server. /// - /// The batch is constructed using [`AsyncBatchRequest`], which allows queuing both tracked - /// requests (via [`AsyncBatchRequest::request`]) and event-style requests (via - /// [`AsyncBatchRequest::event_request`]). - /// - /// Tracked requests return futures that resolve to the server’s response. Event-style requests - /// (e.g., subscriptions) do produce an initial server response, but it is delivered through the - /// [`AsyncEventReceiver`] and not through a dedicated future. + /// The batch is constructed using [`BatchRequest`], which allows queuing both tracked + /// requests (via [`BatchRequest::request`]) and event-style requests (via + /// [`BatchRequest::event_request`]). /// - /// **Important:** Do not `.await` any futures returned by [`AsyncBatchRequest::request`] until - /// *after* the batch has been submitted via `send_batch`. Awaiting too early will block - /// forever, as the requests haven’t been assigned IDs or sent yet. - /// - /// This method does not await any responses itself. Responses and notifications will be - /// delivered asynchronously via the [`AsyncEventReceiver`] or via the [`Future`]s returned by - /// [`AsyncBatchRequest::request`] — assuming they are awaited at the correct time. + /// Tracked requests use callbacks that are invoked when the server responds. Event-style + /// requests (e.g., subscriptions) produce an initial server response delivered through the + /// [`AsyncEventReceiver`]. /// /// # Returns /// - `Ok(true)` if the batch was non-empty and sent successfully. /// - `Ok(false)` if the batch was empty and nothing was sent. /// - `Err` if the batch could not be sent (e.g., if the client was shut down). /// - /// [`Future`]: futures::Future - /// [`AsyncBatchRequest`]: crate::AsyncBatchRequest - /// [`AsyncBatchRequest::request`]: crate::AsyncBatchRequest::request - /// [`AsyncBatchRequest::event_request`]: crate::AsyncBatchRequest::event_request + /// [`BatchRequest`]: crate::BatchRequest + /// [`BatchRequest::request`]: crate::BatchRequest::request + /// [`BatchRequest::event_request`]: crate::BatchRequest::event_request /// [`AsyncEventReceiver`]: crate::AsyncEventReceiver - pub fn send_batch(&self, batch_req: AsyncBatchRequest) -> Result { + pub fn send_batch(&self, batch_req: BatchRequest) -> Result { match batch_req.into_inner() { Some(batch) => self.tx.unbounded_send(batch).map(|_| true), None => Ok(false), @@ -286,10 +275,9 @@ impl BlockingClient { { use std::sync::mpsc::*; let (event_tx, event_recv) = channel::(); - let (req_tx, req_recv) = channel::>(); + let (req_tx, req_recv) = channel::>(); let incoming_stream = crate::io::ReadStreamer::new(std::io::BufReader::new(reader)); - let read_state = - std::sync::Arc::new(std::sync::Mutex::new(State::::new())); + let read_state = std::sync::Arc::new(std::sync::Mutex::new(State::new())); let write_state = std::sync::Arc::clone(&read_state); let read_join = std::thread::spawn(move || -> std::io::Result<()> { @@ -332,15 +320,14 @@ impl BlockingClient { /// [`BlockingRequestError`]: crate::BlockingRequestError pub fn send_request(&self, req: Req) -> Result where - Req: Request, - BlockingPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, + Req::Response: Send, { - let mut batch = BlockingBatchRequest::new(); - let resp_rx = batch.request(req); + let mut batch = BatchRequest::new(); + let rx = batch.request_blocking(req); self.send_batch(batch) .map_err(BlockingRequestError::Dispatch)?; - resp_rx - .recv() + rx.recv() .map_err(|_| BlockingRequestError::Canceled)? .map_err(BlockingRequestError::Response) } @@ -364,10 +351,9 @@ impl BlockingClient { /// [`BlockingRequestSendError`]: crate::BlockingRequestSendError pub fn send_event_request(&self, request: Req) -> Result<(), BlockingRequestSendError> where - Req: Request, - BlockingPendingRequestTuple: Into, + Req: RequestExt + Send + Sync + 'static, { - let mut batch = BlockingBatchRequest::new(); + let mut batch = BatchRequest::new(); batch.event_request(request); self.send_batch(batch)?; Ok(()) @@ -375,31 +361,24 @@ impl BlockingClient { /// Sends a batch of requests to the Electrum server. /// - /// The batch is constructed using [`BlockingBatchRequest`], which allows queuing both tracked - /// requests (via [`BlockingBatchRequest::request`]) and event-style requests (via - /// [`BlockingBatchRequest::event_request`]). - /// - /// Tracked requests return blocking handles that can be used to wait for server responses. - /// Event-style requests (e.g., subscriptions) still result in a server response, but it is - /// emitted through the [`BlockingEventReceiver`] instead of through a blocking response handle. + /// The batch is constructed using [`BatchRequest`], which allows queuing both tracked + /// requests (via [`BatchRequest::request`]) and event-style requests (via + /// [`BatchRequest::event_request`]). /// - /// **Important:** Do not call `.recv()` or `.wait()` on any response handles returned by - /// [`BlockingBatchRequest::request`] until after the batch has been submitted using - /// `send_batch`. Doing so will block indefinitely, as the request has not yet been sent. + /// Tracked requests use callbacks that are invoked when the server responds. Event-style + /// requests (e.g., subscriptions) produce an initial server response delivered through the + /// [`BlockingEventReceiver`]. /// /// # Returns /// - `Ok(true)` if the batch was non-empty and sent successfully. /// - `Ok(false)` if the batch was empty and nothing was sent. /// - `Err` if the batch could not be sent (e.g., if the client was shut down). /// - /// [`BlockingBatchRequest`]: crate::BlockingBatchRequest - /// [`BlockingBatchRequest::request`]: crate::BlockingBatchRequest::request - /// [`BlockingBatchRequest::event_request`]: crate::BlockingBatchRequest::event_request + /// [`BatchRequest`]: crate::BatchRequest + /// [`BatchRequest::request`]: crate::BatchRequest::request + /// [`BatchRequest::event_request`]: crate::BatchRequest::event_request /// [`BlockingEventReceiver`]: crate::BlockingEventReceiver - pub fn send_batch( - &self, - batch_req: BlockingBatchRequest, - ) -> Result { + pub fn send_batch(&self, batch_req: BatchRequest) -> Result { match batch_req.into_inner() { Some(batch) => self.tx.send(batch).map(|_| true), None => Ok(false), diff --git a/src/lib.rs b/src/lib.rs index 75e2ea2..9389008 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,26 +39,18 @@ pub type ResponseResult = Result; /// Internal type aliases for asynchronous client components. mod async_aliases { use super::*; - use futures::channel::{ - mpsc::{TrySendError, UnboundedReceiver, UnboundedSender}, - oneshot::{Receiver, Sender}, - }; - use pending_request::AsyncPendingRequest; - - /// Internal [`State`] instance specialized for tracking asynchronous requests. - /// - /// Used by the async client to associate incoming responses with pending requests. - pub type AsyncState = State; + use futures::channel::mpsc::{TrySendError, UnboundedReceiver, UnboundedSender}; + use pending_request::PendingRequest; /// The sending half of the channel used to enqueue one or more requests from [`AsyncClient`]. /// /// These requests are processed and forwarded to [`State::track_request`] to be assigned an ID and serialized. - pub type AsyncRequestSender = UnboundedSender>; + pub type AsyncRequestSender = UnboundedSender>; /// The receiving half of the request channel used internally by the async client. /// /// Requests sent by [`AsyncClient`] are dequeued here and forwarded to [`State::track_request`]. - pub type AsyncRequestReceiver = UnboundedReceiver>; + pub type AsyncRequestReceiver = UnboundedReceiver>; /// The error returned by [`AsyncClient::send_request`] when a request fails. /// @@ -67,18 +59,8 @@ mod async_aliases { /// The error that occurs when a request cannot be sent into the async request channel. /// - /// This typically means the client’s background task has shut down or the queue is disconnected. - pub type AsyncRequestSendError = TrySendError>; - - /// A oneshot sender used to deliver the result of a tracked async request. - /// - /// Used internally by the client to fulfill the future returned by [`AsyncBatchRequest::request`]. - pub type AsyncResponseSender = Sender>; - - /// A oneshot receiver used to await the result of a tracked async request. - /// - /// Awaiting this will yield the final response or error once the server replies. - pub type AsyncResponseReceiver = Receiver>; + /// This typically means the client's background task has shut down or the queue is disconnected. + pub type AsyncRequestSendError = TrySendError>; /// The sending half of the internal event stream, used to emit [`Event`]s from the client worker loop. pub type AsyncEventSender = UnboundedSender; @@ -93,17 +75,14 @@ pub use async_aliases::*; /// Internal type aliases for blocking client components. mod blocking_aliases { use super::*; - use pending_request::BlockingPendingRequest; - use std::sync::mpsc::{Receiver, SendError, Sender, SyncSender}; - - /// Internal [`State`] specialized for tracking blocking requests. - pub type BlockingState = State; + use pending_request::PendingRequest; + use std::sync::mpsc::{Receiver, SendError, Sender}; /// Channel sender for sending blocking requests from [`BlockingClient`] to the write thread. - pub type BlockingRequestSender = Sender>; + pub type BlockingRequestSender = Sender>; /// Channel receiver used by the write thread to dequeue pending requests. - pub type BlockingRequestReceiver = Receiver>; + pub type BlockingRequestReceiver = Receiver>; /// Error returned by [`BlockingClient::send_request`] if the request fails or is canceled. pub type BlockingRequestError = request::Error; @@ -111,13 +90,7 @@ mod blocking_aliases { /// Error that occurs when a blocking request cannot be sent to the internal request channel. /// /// Typically indicates that the client has been shut down. - pub type BlockingRequestSendError = SendError>; - - /// One-shot sender used to deliver the result of a tracked blocking request. - pub type BlockingResponseSender = SyncSender>; - - /// One-shot receiver used to block and wait for a response to a tracked request. - pub type BlockingResponseReceiver = Receiver>; + pub type BlockingRequestSendError = SendError>; /// Channel sender used by the read thread to emit [`Event`]s. pub type BlockingEventSender = Sender; @@ -171,7 +144,7 @@ pub struct RawNotification { /// A raw JSON-RPC response from the Electrum server. /// -/// This is the server’s response to a client-issued request. It may contain either a `result` +/// This is the server's response to a client-issued request. It may contain either a `result` /// or an `error` (as per the JSON-RPC spec). #[derive(Debug, Clone, serde::Deserialize)] #[allow(clippy::manual_non_exhaustive)] diff --git a/src/pending_request.rs b/src/pending_request.rs index 40429de..aa62d0d 100644 --- a/src/pending_request.rs +++ b/src/pending_request.rs @@ -1,20 +1,20 @@ -use crate::{AsyncResponseSender, BlockingResponseSender, MethodAndParams, Request, ResponseError}; +use crate::{Event, MethodAndParams, Request, ResponseError, ResponseResult}; +use serde_json::Value; -/// A tracked or untracked asynchronous request, paired with an optional response sender. +/// Extension trait for request types that can construct [`SatisfiedRequest`] and [`ErroredRequest`]. /// -/// If `Some(sender)` is present, the response will be delivered through it. -/// If `None`, the response is expected to be emitted as an [`Event`] instead. +/// This trait is automatically implemented for all built-in request types via the +/// [`gen_pending_request_types!`] macro. It bridges a typed request to the enum variants used in +/// [`Event`]. /// /// [`Event`]: crate::Event -pub type AsyncPendingRequestTuple = (Req, Option>); +pub trait RequestExt: Request + Sized { + /// Wraps this request and its decoded response into a [`SatisfiedRequest`]. + fn into_satisfied(self, resp: Self::Response) -> SatisfiedRequest; -/// A tracked or untracked blocking request, paired with an optional response sender. -/// -/// If `Some(sender)` is present, the response will be sent through it. -/// If `None`, the response is expected to be emitted as an [`Event`] instead. -/// -/// [`Event`]: crate::Event -pub type BlockingPendingRequestTuple = (Req, Option>); + /// Wraps this request and an error into an [`ErroredRequest`]. + fn into_errored(self, error: ResponseError) -> ErroredRequest; +} macro_rules! gen_pending_request_types { ($($name:ident),*) => { @@ -27,7 +27,7 @@ macro_rules! gen_pending_request_types { /// `SatisfiedRequest` is used by the [`Event::Response`] variant to expose typed /// request-response pairs to the caller. /// - /// You typically don’t construct this manually — it is created internally by the client + /// You typically don't construct this manually — it is created internally by the client /// after decoding JSON-RPC responses. /// /// [`Event::Response`]: crate::Event::Response @@ -69,199 +69,16 @@ macro_rules! gen_pending_request_types { impl std::error::Error for ErroredRequest {} - /// A trait representing a request that has been sent to the Electrum server and is awaiting - /// a response. - /// - /// This trait is used internally to track the lifecycle of a request, including: - /// - extracting its method and parameters before sending, - /// - handling a successful server response, - /// - handling an error response. - /// - /// Both [`AsyncPendingRequest`] and [`BlockingPendingRequest`] implement this trait. - /// These are generated enums that hold the original request and, optionally, a response - /// channel. - /// - /// You should not implement this trait manually — it is only used inside the client engine - /// for matching raw Electrum responses to typed results. - /// - /// [`AsyncPendingRequest`]: crate::pending_request::AsyncPendingRequest - /// [`BlockingPendingRequest`]: crate::pending_request::BlockingPendingRequest - pub trait PendingRequest { - /// Returns the Electrum method name and parameters for this request. - /// - /// This is used to serialize the request into a JSON-RPC message before sending it to - /// the server. - /// - /// The method and parameters must match the format expected by the Electrum protocol. - fn to_method_and_params(&self) -> MethodAndParams; - - /// Attempts to decode a successful server response (`result`) into a typed value. - /// - /// This is called when a matching response arrives from the server. If the request was - /// tracked, this method deserializes the response and either: - /// - completes the associated response channel (if present), or - /// - returns a [`SatisfiedRequest`] directly, if untracked. - /// - /// Returns an error if deserialization fails. - /// - /// [`SatisfiedRequest`]: crate::SatisfiedRequest - fn satisfy(self, raw_resp: serde_json::Value) -> Result, serde_json::Error>; - - /// Handles a server-side error response (`error`) for this request. - /// - /// If the request was tracked, this sends the error through the associated response - /// channel. Otherwise, it returns a [`ErroredRequest`] containing the original request - /// and the error. - /// - /// [`ErroredRequest`]: crate::ErroredRequest - fn satisfy_error(self, raw_error: serde_json::Value) -> Option; - } - - /// An internal representation of a pending asynchronous Electrum request. - /// - /// Each variant corresponds to a specific request type. The enum holds: - /// - the original request (`req`), and - /// - an optional response channel (`resp_tx`) that will be completed once a server response - /// is received. - /// - /// This type is created when calling [`AsyncBatchRequest::request`] or - /// [`AsyncBatchRequest::event_request`], and is consumed by the client when processing - /// responses. - /// - /// If `resp_tx` is present, the request is tracked and its response will complete the - /// associated future. If `resp_tx` is `None`, the response will be delivered as an - /// [`Event`] instead. - /// - /// You typically don’t construct this type directly — it is produced by the batch builder - /// or macros. - /// - /// [`AsyncBatchRequest::request`]: crate::AsyncBatchRequest::request - /// [`AsyncBatchRequest::event_request`]: crate::AsyncBatchRequest::event_request - /// [`Event`]: crate::Event - #[derive(Debug)] - pub enum AsyncPendingRequest { - $($name { - req: crate::request::$name, - resp_tx: Option::Response>>, - }),*, - } - $( - impl From::Response>> for AsyncPendingRequest { - fn from((req, resp_tx): AsyncPendingRequestTuple::Response>) -> Self { - Self::$name{ req, resp_tx } - } - } - )* - - impl PendingRequest for AsyncPendingRequest { - fn to_method_and_params(&self) -> MethodAndParams { - match self { - $(AsyncPendingRequest::$name{ req, .. } => req.to_method_and_params()),* + impl RequestExt for crate::request::$name { + fn into_satisfied(self, resp: ::Response) -> SatisfiedRequest { + SatisfiedRequest::$name { req: self, resp } } - } - - fn satisfy(self, raw_resp: serde_json::Value) -> Result, serde_json::Error> { - use crate::request; - match self { - $(Self::$name{ req, resp_tx } => { - let resp = serde_json::from_value::<::Response>(raw_resp)?; - Ok(match resp_tx { - Some(tx) => { - let _ = tx.send(Ok(resp)); - None - } - None => Some(SatisfiedRequest::$name { req, resp }), - }) - }),* - } - } - - fn satisfy_error(self, raw_error: serde_json::Value) -> Option { - let error = ResponseError(raw_error); - match self { - $(Self::$name{ req, resp_tx } => { - match resp_tx { - Some(tx) => { let _ = tx.send(Err(error)); None } - None => Some(ErroredRequest::$name{ req, error }), - } - }),* - } - } - } - - /// An internal representation of a pending blocking Electrum request. - /// - /// Each variant corresponds to a specific request type. The enum holds: - /// - the original request (`req`), and - /// - an optional response channel (`resp_tx`) that will be fulfilled once a server response - /// is received. - /// - /// This type is created when calling [`BlockingBatchRequest::request`] or - /// [`BlockingBatchRequest::event_request`], and is consumed by the client when processing - /// server responses. - /// - /// If `resp_tx` is present, the request is tracked and the response will be sent through - /// the associated receiver. If `resp_tx` is `None`, the response will be delivered as an - /// [`Event`] instead. - /// - /// This type is used internally by the blocking client and is typically not constructed - /// directly. - /// - /// [`BlockingBatchRequest::request`]: crate::BlockingBatchRequest::request - /// [`BlockingBatchRequest::event_request`]: crate::BlockingBatchRequest::event_request - /// [`Event`]: crate::Event - #[derive(Debug)] - pub enum BlockingPendingRequest { - $($name { - req: crate::request::$name, - resp_tx: Option::Response>>, - }),*, - } - - $( - impl From::Response>> for BlockingPendingRequest { - fn from((req, resp_tx): BlockingPendingRequestTuple::Response>) -> Self { - Self::$name{ req, resp_tx } + fn into_errored(self, error: ResponseError) -> ErroredRequest { + ErroredRequest::$name { req: self, error } } } )* - - impl PendingRequest for BlockingPendingRequest { - fn to_method_and_params(&self) -> MethodAndParams { - match self { - $(BlockingPendingRequest::$name{ req, .. } => req.to_method_and_params()),* - } - } - - fn satisfy(self, raw_resp: serde_json::Value) -> Result, serde_json::Error> { - use crate::request; - match self { - $(Self::$name{ req, resp_tx } => { - let resp = serde_json::from_value::<::Response>(raw_resp)?; - Ok(match resp_tx { - Some(tx) => { - let _ = tx.send(Ok(resp)); - None - } - None => Some(SatisfiedRequest::$name { req, resp }), - }) - }),* - } - } - - fn satisfy_error(self, raw_error: serde_json::Value) -> Option { - let error = ResponseError(raw_error); - match self { - $(Self::$name{ req, resp_tx } => { - match resp_tx { - Some(tx) => { let _ = tx.send(Err(error)); None } - None => Some(ErroredRequest::$name{ req, error }), - } - }),* - } - } - } }; } @@ -289,19 +106,95 @@ gen_pending_request_types! { Custom } -impl PendingRequest for Box { - fn to_method_and_params(&self) -> MethodAndParams { - self.as_ref().to_method_and_params() +/// A pending request that has been sent to the Electrum server and is awaiting a response. +/// +/// This struct holds a type-erased handler closure that knows how to deserialize the server's +/// raw JSON response and either: +/// - dispatch it through a callback (for tracked requests), or +/// - construct an [`Event`] (for event-style requests). +/// +/// Construct via [`PendingRequest::new`] (with a callback) or [`PendingRequest::event`] (without). +/// +/// [`Event`]: crate::Event +pub struct PendingRequest { + method_and_params: MethodAndParams, + handler: Box< + dyn FnOnce(Result) -> Result, serde_json::Error> + Send + Sync, + >, +} + +impl PendingRequest { + /// Creates a new pending request with an optional typed callback. + /// + /// If `callback` is `Some`, the response will be deserialized and dispatched through it, + /// and [`State::process_incoming`] will return `Ok(None)` for this request. + /// + /// If `callback` is `None`, the response will be wrapped in an [`Event`] and returned from + /// [`State::process_incoming`]. + /// + /// [`State::process_incoming`]: crate::State::process_incoming + /// [`Event`]: crate::Event + pub fn new( + req: Req, + callback: Option) + Send + Sync + 'static>, + ) -> Self { + let method_and_params = req.to_method_and_params(); + Self { + method_and_params, + handler: Box::new(move |raw_result| match (raw_result, callback) { + (Ok(raw_resp), Some(cb)) => { + let resp = serde_json::from_value(raw_resp)?; + cb(Ok(resp)); + Ok(None) + } + (Ok(raw_resp), None) => { + let resp = serde_json::from_value(raw_resp)?; + Ok(Some(Event::Response(req.into_satisfied(resp)))) + } + (Err(raw_err), Some(cb)) => { + cb(Err(ResponseError(raw_err))); + Ok(None) + } + (Err(raw_err), None) => Ok(Some(Event::ResponseError( + req.into_errored(ResponseError(raw_err)), + ))), + }), + } } - fn satisfy( - self, - raw_resp: serde_json::Value, - ) -> Result, serde_json::Error> { - (*self).satisfy(raw_resp) + /// Creates a new pending request without a callback (event-style). + /// + /// The server's response will be returned as an [`Event`] from [`State::process_incoming`]. + /// + /// [`State::process_incoming`]: crate::State::process_incoming + /// [`Event`]: crate::Event + pub fn event(req: Req) -> Self { + Self::new(req, None::)>) } - fn satisfy_error(self, raw_error: serde_json::Value) -> Option { - (*self).satisfy_error(raw_error) + /// Returns the method name and parameters for this request. + /// + /// Used to serialize the request into a JSON-RPC message and to reconstruct + /// [`RawRequest`]s for pending requests. + /// + /// [`RawRequest`]: crate::RawRequest + pub fn to_method_and_params(&self) -> MethodAndParams { + self.method_and_params.clone() + } + + /// Handles the raw server result by invoking the type-erased handler closure. + /// + /// Returns `Ok(Some(Event))` for event-style requests, `Ok(None)` for callback-dispatched + /// requests, or `Err` if deserialization failed. + pub fn handle(self, result: Result) -> Result, serde_json::Error> { + (self.handler)(result) + } +} + +impl std::fmt::Debug for PendingRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PendingRequest") + .field("method_and_params", &self.method_and_params) + .finish_non_exhaustive() } } diff --git a/src/state.rs b/src/state.rs index 2849539..13513de 100644 --- a/src/state.rs +++ b/src/state.rs @@ -59,17 +59,17 @@ impl Event { /// state as needed and may return an [`Event`] representing a notification or a response to a /// previously tracked request. #[derive(Debug)] -pub struct State { - pending: HashMap, +pub struct State { + pending: HashMap, } -impl Default for State { +impl Default for State { fn default() -> Self { Self::new() } } -impl State { +impl State { /// Creates a new [`State`] instance. pub fn new() -> Self { Self { @@ -104,13 +104,9 @@ impl State { /// batch. pub fn track_request(&mut self, next_id: &mut u32, req: R) -> MaybeBatch where - R: Into>, + R: Into>, { - fn _add_request( - state: &mut State, - next_id: &mut u32, - req: PReq, - ) -> RawRequest { + fn _add_request(state: &mut State, next_id: &mut u32, req: PendingRequest) -> RawRequest { let id = *next_id; *next_id = id.wrapping_add(1); let (method, params) = req.to_method_and_params(); @@ -156,13 +152,9 @@ impl State { .pending .remove(&resp.id) .ok_or(ProcessError::MissingRequest(resp.id))?; - Ok(match resp.result { - Ok(raw_resp) => pending_req - .satisfy(raw_resp) - .map_err(|de_err| ProcessError::CannotDeserializeResponse(resp.id, de_err))? - .map(Event::Response), - Err(raw_err) => pending_req.satisfy_error(raw_err).map(Event::ResponseError), - }) + pending_req + .handle(resp.result) + .map_err(|de_err| ProcessError::CannotDeserializeResponse(resp.id, de_err)) } } } From 0f4c565fece0d0e65a34ef63f4948824196e2d1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 30 Jan 2026 10:19:29 +0000 Subject: [PATCH 2/5] refactor!: Clean up API naming, remove dead code, and restructure modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename types: SatisfiedRequest → CompletedRequest, ErroredRequest → FailedRequest, RawNotificationOrResponse → RawIncoming, MaybeBatch → RawOneOrMany, State → RequestTracker - Rename methods: process_incoming → handle_incoming, into_satisfied → into_completed, into_errored → into_failed - Add request_async/request_blocking convenience methods to BatchRequest - Remove dead BatchRequestError type - Merge batch_request.rs into pending_request.rs - Extract wire types from lib.rs into protocol.rs - Remove Mutex from PendingRequest in favor of Send + Sync bounds 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- README.md | 2 +- src/batch_request.rs | 133 ---------------- src/client.rs | 12 +- src/io.rs | 24 +-- src/lib.rs | 226 ++------------------------- src/pending_request.rs | 154 +++++++++++++++--- src/protocol.rs | 207 ++++++++++++++++++++++++ src/{state.rs => request_tracker.rs} | 58 +++---- tests/synopsis.rs | 6 +- 9 files changed, 401 insertions(+), 421 deletions(-) delete mode 100644 src/batch_request.rs create mode 100644 src/protocol.rs rename src/{state.rs => request_tracker.rs} (80%) diff --git a/README.md b/README.md index d17f5aa..77bac7c 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ models. - **Streaming protocol support**: Handles both server-initiated notifications and responses. - **Transport agnostic**: Works with any I/O type implementing the appropriate `Read`/`Write` traits. -- **Sans-IO core**: The [`State`] struct tracks pending requests and processes server messages. +- **Sans-IO core**: The [`RequestTracker`] struct tracks pending requests and handles incoming server messages. - **Typed request/response system**: Strongly typed Electrum method wrappers with minimal overhead. ## Example (async with Tokio) diff --git a/src/batch_request.rs b/src/batch_request.rs deleted file mode 100644 index f280034..0000000 --- a/src/batch_request.rs +++ /dev/null @@ -1,133 +0,0 @@ -use crate::pending_request::{PendingRequest, RequestExt}; -use crate::{MaybeBatch, ResponseResult}; - -/// A builder for batching multiple requests to the Electrum server. -/// -/// This type allows queuing both: -/// - tracked requests via [`request`] (which take a callback to receive the typed response), and -/// - event-style requests via [`event_request`] (which emit [`Event`]s through the event receiver -/// instead of a callback). -/// -/// After building the batch, submit it using [`AsyncClient::send_batch`] or -/// [`BlockingClient::send_batch`]. The batch will be converted into a raw JSON-RPC message and -/// sent to the server. -/// -/// [`request`]: Self::request -/// [`event_request`]: Self::event_request -/// [`AsyncClient::send_batch`]: crate::AsyncClient::send_batch -/// [`BlockingClient::send_batch`]: crate::BlockingClient::send_batch -/// [`Event`]: crate::Event -#[must_use] -#[derive(Debug, Default)] -pub struct BatchRequest { - inner: Option>, -} - -impl BatchRequest { - /// Creates a new empty batch request builder. - pub fn new() -> Self { - Self::default() - } - - /// Consumes the batch and returns its raw contents, if any requests were added. - /// - /// Returns `Some` if the batch is non-empty, or `None` if it was empty. - pub fn into_inner(self) -> Option> { - self.inner - } - - /// Adds a tracked request to the batch with a typed callback. - /// - /// The callback will be invoked with the deserialized response (or error) once the server - /// replies. The callback is type-erased internally, so it works for both async and blocking - /// clients. - pub fn request(&mut self, req: Req, callback: F) - where - Req: RequestExt + Send + Sync + 'static, - F: FnOnce(ResponseResult) + Send + Sync + 'static, - { - MaybeBatch::push_opt(&mut self.inner, PendingRequest::new(req, Some(callback))); - } - - /// Adds a tracked request and returns an async receiver for the response. - /// - /// This is a convenience wrapper around [`request`](Self::request) that creates a - /// [`futures::channel::oneshot`] channel internally. The returned receiver can be - /// `.await`ed after the batch is sent. - pub fn request_async( - &mut self, - req: Req, - ) -> futures::channel::oneshot::Receiver> - where - Req: RequestExt + Send + Sync + 'static, - Req::Response: Send, - { - let (tx, rx) = futures::channel::oneshot::channel(); - self.request(req, move |result| { - let _ = tx.send(result); - }); - rx - } - - /// Adds a tracked request and returns a blocking receiver for the response. - /// - /// This is a convenience wrapper around [`request`](Self::request) that creates a - /// [`std::sync::mpsc::sync_channel`] internally. The returned receiver can be used - /// with [`recv`](std::sync::mpsc::Receiver::recv) after the batch is sent. - pub fn request_blocking( - &mut self, - req: Req, - ) -> std::sync::mpsc::Receiver> - where - Req: RequestExt + Send + Sync + 'static, - Req::Response: Send, - { - let (tx, rx) = std::sync::mpsc::sync_channel(1); - self.request(req, move |result| { - let _ = tx.send(result); - }); - rx - } - - /// Adds an event-style request to the batch. - /// - /// These requests do not take a callback. Any server response (including the initial result - /// and any future notifications) will be delivered as [`Event`]s through the event receiver. - /// - /// Use this for subscription-style RPCs where responses should be handled uniformly as events. - /// - /// [`Event`]: crate::Event - pub fn event_request(&mut self, req: Req) { - MaybeBatch::push_opt(&mut self.inner, PendingRequest::event(req)); - } -} - -/// An error that can occur when sending a request or polling its result. -/// -/// This error is returned by client `send_request` methods when the response cannot be obtained. -/// -/// It typically indicates that the batch was dropped, the client shut down, or the request -/// failed to be processed internally. -#[derive(Debug)] -pub enum BatchRequestError { - /// The request was canceled before a response was received. - /// - /// This can occur if the client shuts down or if the request is dropped internally. - Canceled, - - /// The server returned a response error. - /// - /// This indicates that the Electrum server replied with an error object, rather than a result. - Response(crate::ResponseError), -} - -impl std::fmt::Display for BatchRequestError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Canceled => write!(f, "Request was canceled before being satisfied."), - Self::Response(e) => write!(f, "Request satisfied with error: {}", e), - } - } -} - -impl std::error::Error for BatchRequestError {} diff --git a/src/client.rs b/src/client.rs index 0464cb8..56a42f2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -64,11 +64,11 @@ impl AsyncClient { { use futures::{channel::mpsc, StreamExt}; let (event_tx, event_recv) = mpsc::unbounded::(); - let (req_tx, mut req_recv) = mpsc::unbounded::>(); + let (req_tx, mut req_recv) = mpsc::unbounded::>(); let mut incoming_stream = crate::io::ReadStreamer::new(futures::io::BufReader::new(reader)).fuse(); - let mut state = State::new(); + let mut state = RequestTracker::new(); let mut next_id = 0_u32; let fut = async move { @@ -84,7 +84,7 @@ impl AsyncClient { incoming_opt = incoming_stream.next() => match incoming_opt { Some(incoming_res) => { let event_opt = state - .process_incoming(incoming_res?) + .handle_incoming(incoming_res?) .map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error))?; if let Some(event) = event_opt { if let Err(_err) = event_tx.unbounded_send(event) { @@ -275,9 +275,9 @@ impl BlockingClient { { use std::sync::mpsc::*; let (event_tx, event_recv) = channel::(); - let (req_tx, req_recv) = channel::>(); + let (req_tx, req_recv) = channel::>(); let incoming_stream = crate::io::ReadStreamer::new(std::io::BufReader::new(reader)); - let read_state = std::sync::Arc::new(std::sync::Mutex::new(State::new())); + let read_state = std::sync::Arc::new(std::sync::Mutex::new(RequestTracker::new())); let write_state = std::sync::Arc::clone(&read_state); let read_join = std::thread::spawn(move || -> std::io::Result<()> { @@ -285,7 +285,7 @@ impl BlockingClient { let event_opt = read_state .lock() .unwrap() - .process_incoming(incoming_res?) + .handle_incoming(incoming_res?) .map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error))?; if let Some(event) = event_opt { if let Err(_err) = event_tx.send(event) { diff --git a/src/io.rs b/src/io.rs index c42f8af..58a83ec 100644 --- a/src/io.rs +++ b/src/io.rs @@ -9,18 +9,18 @@ use std::{ task::{Context, Poll}, }; -use crate::{MaybeBatch, RawNotificationOrResponse, RawRequest}; +use crate::{RawIncoming, RawOneOrMany, RawRequest}; /// A streaming parser for Electrum JSON-RPC messages from an input reader. /// /// `ReadStreamer` incrementally reads from a source implementing [`std::io::BufRead`] or /// [`futures::io::AsyncBufRead`] (depending on the API used), parses incoming JSON-RPC payloads, and -/// queues deserialized [`RawNotificationOrResponse`] items for consumption. +/// queues deserialized [`RawIncoming`] items for consumption. /// /// ### Behavior /// /// - For **blocking transports**, `ReadStreamer` implements [`Iterator`], yielding one -/// [`RawNotificationOrResponse`] at a time. +/// [`RawIncoming`] at a time. /// - For **async transports**, `ReadStreamer` implements [`futures::Stream`], with the same item /// type. /// @@ -63,7 +63,7 @@ use crate::{MaybeBatch, RawNotificationOrResponse, RawRequest}; pub struct ReadStreamer { reader: Option, buf: Vec, - queue: VecDeque, + queue: VecDeque, err: Option, } @@ -86,9 +86,9 @@ impl ReadStreamer { Some(b) => assert_eq!(b, b'\n'), None => return false, } - match serde_json::from_slice::>(&self.buf) { - Ok(MaybeBatch::Single(t)) => self.queue.push_back(t), - Ok(MaybeBatch::Batch(v)) => self.queue.extend(v), + match serde_json::from_slice::>(&self.buf) { + Ok(RawOneOrMany::Single(t)) => self.queue.push_back(t), + Ok(RawOneOrMany::Batch(v)) => self.queue.extend(v), Err(err) => { self.err = Some(err.into()); return false; @@ -100,7 +100,7 @@ impl ReadStreamer { } impl Iterator for ReadStreamer { - type Item = std::io::Result; + type Item = std::io::Result; fn next(&mut self) -> Option { loop { @@ -123,7 +123,7 @@ impl Iterator for ReadStreamer { } impl futures::Stream for ReadStreamer { - type Item = std::io::Result; + type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use futures::AsyncBufReadExt; @@ -173,7 +173,7 @@ impl futures::Stream for ReadStreamer { /// Returns a [`std::io::Error`] if the write operation fails. pub fn blocking_write(mut writer: W, msg: T) -> std::io::Result<()> where - T: Into>, + T: Into>, W: std::io::Write, { let mut b = serde_json::to_vec(&msg.into()).expect("must serialize"); @@ -194,7 +194,7 @@ where /// Returns a [`std::io::Error`] if the async write operation fails. pub async fn async_write(mut writer: W, msg: T) -> std::io::Result<()> where - T: Into>, + T: Into>, W: futures::AsyncWrite + Unpin, { use futures::AsyncWriteExt; @@ -209,7 +209,7 @@ where #[cfg(feature = "tokio")] pub async fn tokio_write(mut writer: W, msg: T) -> std::io::Result<()> where - T: Into>, + T: Into>, W: tokio::io::AsyncWrite + Unpin, { use tokio::io::AsyncWriteExt; diff --git a/src/lib.rs b/src/lib.rs index 9389008..b4817bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ #![doc = include_str!("../README.md")] -mod batch_request; mod client; pub use client::*; mod custom_serde; @@ -7,22 +6,16 @@ mod hash_types; pub mod io; pub mod notification; mod pending_request; +pub mod protocol; pub mod request; +mod request_tracker; pub mod response; -mod state; -pub use batch_request::*; pub use hash_types::*; pub use pending_request::*; +pub use protocol::*; pub use request::Request; +pub use request_tracker::*; pub use serde_json; -use serde_json::Value; -pub use state::*; -use std::fmt::Display; - -/// The JSON-RPC protocol version supported by this client. -/// -/// Always set to `"2.0"` per the Electrum protocol specification. -pub const JSONRPC_VERSION_2_0: &str = "2.0"; /// An owned or borrowed static string. pub type CowStr = std::borrow::Cow<'static, str>; @@ -31,7 +24,7 @@ pub type CowStr = std::borrow::Cow<'static, str>; pub type DoubleSHA = bitcoin::hashes::sha256d::Hash; /// A method name and its corresponding parameters, as used in a JSON-RPC request. -pub type MethodAndParams = (CowStr, Vec); +pub type MethodAndParams = (CowStr, Vec); /// A server response that is either a success (`Ok`) or a JSON-RPC error (`Err`). pub type ResponseResult = Result; @@ -44,13 +37,13 @@ mod async_aliases { /// The sending half of the channel used to enqueue one or more requests from [`AsyncClient`]. /// - /// These requests are processed and forwarded to [`State::track_request`] to be assigned an ID and serialized. - pub type AsyncRequestSender = UnboundedSender>; + /// These requests are processed and forwarded to [`RequestTracker::track_request`] to be assigned an ID and serialized. + pub type AsyncRequestSender = UnboundedSender>; /// The receiving half of the request channel used internally by the async client. /// - /// Requests sent by [`AsyncClient`] are dequeued here and forwarded to [`State::track_request`]. - pub type AsyncRequestReceiver = UnboundedReceiver>; + /// Requests sent by [`AsyncClient`] are dequeued here and forwarded to [`RequestTracker::track_request`]. + pub type AsyncRequestReceiver = UnboundedReceiver>; /// The error returned by [`AsyncClient::send_request`] when a request fails. /// @@ -60,7 +53,7 @@ mod async_aliases { /// The error that occurs when a request cannot be sent into the async request channel. /// /// This typically means the client's background task has shut down or the queue is disconnected. - pub type AsyncRequestSendError = TrySendError>; + pub type AsyncRequestSendError = TrySendError>; /// The sending half of the internal event stream, used to emit [`Event`]s from the client worker loop. pub type AsyncEventSender = UnboundedSender; @@ -79,10 +72,10 @@ mod blocking_aliases { use std::sync::mpsc::{Receiver, SendError, Sender}; /// Channel sender for sending blocking requests from [`BlockingClient`] to the write thread. - pub type BlockingRequestSender = Sender>; + pub type BlockingRequestSender = Sender>; /// Channel receiver used by the write thread to dequeue pending requests. - pub type BlockingRequestReceiver = Receiver>; + pub type BlockingRequestReceiver = Receiver>; /// Error returned by [`BlockingClient::send_request`] if the request fails or is canceled. pub type BlockingRequestError = request::Error; @@ -90,7 +83,7 @@ mod blocking_aliases { /// Error that occurs when a blocking request cannot be sent to the internal request channel. /// /// Typically indicates that the client has been shut down. - pub type BlockingRequestSendError = SendError>; + pub type BlockingRequestSendError = SendError>; /// Channel sender used by the read thread to emit [`Event`]s. pub type BlockingEventSender = Sender; @@ -99,196 +92,3 @@ mod blocking_aliases { pub type BlockingEventReceiver = Receiver; } pub use blocking_aliases::*; - -/// Represents the `jsonrpc` version field in JSON-RPC messages. -/// -/// In Electrum, this is always the string `"2.0"`, as required by the JSON-RPC 2.0 specification. -/// It appears in all standard requests, responses, and notifications. -/// -/// This type ensures consistent serialization and deserialization of the version field. -#[derive(Debug, Clone, Copy)] -pub struct Version; - -impl Display for Version { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(JSONRPC_VERSION_2_0) - } -} - -impl AsRef for Version { - fn as_ref(&self) -> &str { - JSONRPC_VERSION_2_0 - } -} - -/// A raw server-initiated JSON-RPC notification. -/// -/// These are Electrum messages that have a `"method"` and `"params"`, but no `"id"` field. -/// Typically emitted for subscriptions like `blockchain.headers.subscribe`. -#[derive(Debug, Clone, serde::Deserialize)] -#[allow(clippy::manual_non_exhaustive)] -pub struct RawNotification { - /// The JSON-RPC protocol version (should always be `"2.0"`). - #[serde( - rename(deserialize = "jsonrpc"), - deserialize_with = "crate::custom_serde::version" - )] - pub version: Version, - - /// The method name of the notification (e.g., `"blockchain.headers.subscribe"`). - pub method: CowStr, - - /// The raw parameters associated with the notification. - pub params: Value, -} - -/// A raw JSON-RPC response from the Electrum server. -/// -/// This is the server's response to a client-issued request. It may contain either a `result` -/// or an `error` (as per the JSON-RPC spec). -#[derive(Debug, Clone, serde::Deserialize)] -#[allow(clippy::manual_non_exhaustive)] -pub struct RawResponse { - /// The JSON-RPC protocol version (should always be `"2.0"`). - #[serde( - rename(deserialize = "jsonrpc"), - deserialize_with = "crate::custom_serde::version" - )] - pub version: Version, - - /// The ID that matches the request this response is answering. - pub id: u32, - - /// The result if the request succeeded, or the error object if it failed. - #[serde(flatten, deserialize_with = "crate::custom_serde::result")] - pub result: Result, -} - -/// A raw incoming message from the Electrum server. -/// -/// This type represents either a JSON-RPC notification (e.g., for a subscription) -/// or a response to a previously issued request. -#[derive(Debug, Clone, serde::Deserialize)] -#[serde(untagged)] -pub enum RawNotificationOrResponse { - /// A server-initiated notification (e.g., from a subscription). - Notification(RawNotification), - - /// A response to a previously sent request. - Response(RawResponse), -} - -/// A raw JSON-RPC request to be sent to the Electrum server. -/// -/// This struct is constructed before serialization and sending. It includes all required -/// JSON-RPC fields for method calls. -#[derive(Debug, Clone, serde::Serialize)] -pub struct RawRequest { - /// The JSON-RPC version string (usually `"2.0"`). - pub jsonrpc: CowStr, - - /// The client-assigned request ID (used to correlate with responses). - pub id: u32, - - /// The method to be invoked (e.g., `"blockchain.headers.subscribe"`). - pub method: CowStr, - - /// The parameters passed to the method. - pub params: Vec, -} - -impl RawRequest { - /// Constructs a new JSON-RPC request with the given ID, method, and parameters. - /// - /// This sets the JSON-RPC version to `"2.0"`. - pub fn new(id: u32, method: CowStr, params: Vec) -> Self { - Self { - jsonrpc: JSONRPC_VERSION_2_0.into(), - id, - method, - params, - } - } - - pub fn from_request(id: u32, req: Req) -> Self { - (id, req).into() - } -} - -/// Represents either a single item or a batch of items. -/// -/// This enum is used to generalize over sending one or many requests in the same operation. I.e. -/// to the Electrum server. -/// -/// Use `From` implementations to easily convert from `T` or `Vec`. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[serde(untagged)] -pub enum MaybeBatch { - Single(T), - Batch(Vec), -} - -impl MaybeBatch { - /// Converts this `MaybeBatch` into a `Vec`. - /// - /// If it is a `Single`, returns a one-element vector. If it is a `Batch`, returns the inner vector. - pub fn into_vec(self) -> Vec { - match self { - MaybeBatch::Single(item) => vec![item], - MaybeBatch::Batch(batch) => batch, - } - } - - /// Pushes a new item into the given `Option>`, creating or extending the batch. - /// - /// If the option is `None`, it becomes `Some(Single(item))`. If it already contains a value, - /// it is converted into a `Batch` and the item is appended. - pub fn push_opt(opt: &mut Option, item: T) { - *opt = match opt.take() { - None => Some(Self::Single(item)), - Some(maybe_batch) => { - let mut items = maybe_batch.into_vec(); - items.push(item); - Some(MaybeBatch::Batch(items)) - } - } - } - - pub fn map(self, f: impl Fn(T) -> T2) -> MaybeBatch { - match self { - MaybeBatch::Single(t) => MaybeBatch::Single(f(t)), - MaybeBatch::Batch(items) => MaybeBatch::Batch(items.into_iter().map(f).collect()), - } - } - - pub fn map_into(self) -> MaybeBatch - where - T: Into, - { - self.map(Into::into) - } -} - -impl From for MaybeBatch { - fn from(value: T) -> Self { - Self::Single(value) - } -} - -impl From> for MaybeBatch { - fn from(value: Vec) -> Self { - Self::Batch(value) - } -} - -/// Electrum server responds with an error. -#[derive(Debug, Clone, serde::Deserialize)] -pub struct ResponseError(pub(crate) Value); - -impl std::fmt::Display for ResponseError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Response.error: {}", self.0) - } -} - -impl std::error::Error for ResponseError {} diff --git a/src/pending_request.rs b/src/pending_request.rs index aa62d0d..4686b9c 100644 --- a/src/pending_request.rs +++ b/src/pending_request.rs @@ -1,19 +1,19 @@ use crate::{Event, MethodAndParams, Request, ResponseError, ResponseResult}; use serde_json::Value; -/// Extension trait for request types that can construct [`SatisfiedRequest`] and [`ErroredRequest`]. +/// Extension trait for request types that can construct [`CompletedRequest`] and [`FailedRequest`]. /// /// This trait is automatically implemented for all built-in request types via the -/// [`gen_pending_request_types!`] macro. It bridges a typed request to the enum variants used in +/// `gen_pending_request_types!` macro. It bridges a typed request to the enum variants used in /// [`Event`]. /// /// [`Event`]: crate::Event pub trait RequestExt: Request + Sized { - /// Wraps this request and its decoded response into a [`SatisfiedRequest`]. - fn into_satisfied(self, resp: Self::Response) -> SatisfiedRequest; + /// Wraps this request and its decoded response into a [`CompletedRequest`]. + fn into_completed(self, resp: Self::Response) -> CompletedRequest; - /// Wraps this request and an error into an [`ErroredRequest`]. - fn into_errored(self, error: ResponseError) -> ErroredRequest; + /// Wraps this request and an error into an [`FailedRequest`]. + fn into_failed(self, error: ResponseError) -> FailedRequest; } macro_rules! gen_pending_request_types { @@ -24,7 +24,7 @@ macro_rules! gen_pending_request_types { /// with a valid `result`. It contains both the original request and the corresponding /// response. /// - /// `SatisfiedRequest` is used by the [`Event::Response`] variant to expose typed + /// `CompletedRequest` is used by the [`Event::Response`] variant to expose typed /// request-response pairs to the caller. /// /// You typically don't construct this manually — it is created internally by the client @@ -32,7 +32,7 @@ macro_rules! gen_pending_request_types { /// /// [`Event::Response`]: crate::Event::Response #[derive(Debug, Clone)] - pub enum SatisfiedRequest { + pub enum CompletedRequest { $($name { req: crate::request::$name, resp: ::Response, @@ -47,19 +47,19 @@ macro_rules! gen_pending_request_types { /// This is used by the [`Event::ResponseError`] variant to expose server-side failures /// in a typed manner. /// - /// Like [`SatisfiedRequest`], this is created internally by the client during response + /// Like [`CompletedRequest`], this is created internally by the client during response /// processing. /// /// [`Event::ResponseError`]: crate::Event::ResponseError #[derive(Debug, Clone)] - pub enum ErroredRequest { + pub enum FailedRequest { $($name { req: crate::request::$name, error: ResponseError, }),*, } - impl core::fmt::Display for ErroredRequest { + impl core::fmt::Display for FailedRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { $(Self::$name { req, error } => write!(f, "Server responsed to {:?} with error: {}", req, error)),*, @@ -67,15 +67,15 @@ macro_rules! gen_pending_request_types { } } - impl std::error::Error for ErroredRequest {} + impl std::error::Error for FailedRequest {} $( impl RequestExt for crate::request::$name { - fn into_satisfied(self, resp: ::Response) -> SatisfiedRequest { - SatisfiedRequest::$name { req: self, resp } + fn into_completed(self, resp: ::Response) -> CompletedRequest { + CompletedRequest::$name { req: self, resp } } - fn into_errored(self, error: ResponseError) -> ErroredRequest { - ErroredRequest::$name { req: self, error } + fn into_failed(self, error: ResponseError) -> FailedRequest { + FailedRequest::$name { req: self, error } } } )* @@ -106,6 +106,9 @@ gen_pending_request_types! { Custom } +type Handler = + Box) -> Result, serde_json::Error> + Send + Sync>; + /// A pending request that has been sent to the Electrum server and is awaiting a response. /// /// This struct holds a type-erased handler closure that knows how to deserialize the server's @@ -118,21 +121,19 @@ gen_pending_request_types! { /// [`Event`]: crate::Event pub struct PendingRequest { method_and_params: MethodAndParams, - handler: Box< - dyn FnOnce(Result) -> Result, serde_json::Error> + Send + Sync, - >, + handler: Handler, } impl PendingRequest { /// Creates a new pending request with an optional typed callback. /// /// If `callback` is `Some`, the response will be deserialized and dispatched through it, - /// and [`State::process_incoming`] will return `Ok(None)` for this request. + /// and [`RequestTracker::handle_incoming`] will return `Ok(None)` for this request. /// /// If `callback` is `None`, the response will be wrapped in an [`Event`] and returned from - /// [`State::process_incoming`]. + /// [`RequestTracker::handle_incoming`]. /// - /// [`State::process_incoming`]: crate::State::process_incoming + /// [`RequestTracker::handle_incoming`]: crate::RequestTracker::handle_incoming /// [`Event`]: crate::Event pub fn new( req: Req, @@ -149,14 +150,14 @@ impl PendingRequest { } (Ok(raw_resp), None) => { let resp = serde_json::from_value(raw_resp)?; - Ok(Some(Event::Response(req.into_satisfied(resp)))) + Ok(Some(Event::Response(req.into_completed(resp)))) } (Err(raw_err), Some(cb)) => { cb(Err(ResponseError(raw_err))); Ok(None) } (Err(raw_err), None) => Ok(Some(Event::ResponseError( - req.into_errored(ResponseError(raw_err)), + req.into_failed(ResponseError(raw_err)), ))), }), } @@ -164,9 +165,9 @@ impl PendingRequest { /// Creates a new pending request without a callback (event-style). /// - /// The server's response will be returned as an [`Event`] from [`State::process_incoming`]. + /// The server's response will be returned as an [`Event`] from [`RequestTracker::handle_incoming`]. /// - /// [`State::process_incoming`]: crate::State::process_incoming + /// [`RequestTracker::handle_incoming`]: crate::RequestTracker::handle_incoming /// [`Event`]: crate::Event pub fn event(req: Req) -> Self { Self::new(req, None::)>) @@ -198,3 +199,104 @@ impl std::fmt::Debug for PendingRequest { .finish_non_exhaustive() } } + +/// A builder for batching multiple requests to the Electrum server. +/// +/// This type allows queuing both: +/// - tracked requests via [`request`] (which take a callback to receive the typed response), and +/// - event-style requests via [`event_request`] (which emit [`Event`]s through the event receiver +/// instead of a callback). +/// +/// After building the batch, submit it using [`AsyncClient::send_batch`] or +/// [`BlockingClient::send_batch`]. The batch will be converted into a raw JSON-RPC message and +/// sent to the server. +/// +/// [`request`]: Self::request +/// [`event_request`]: Self::event_request +/// [`AsyncClient::send_batch`]: crate::AsyncClient::send_batch +/// [`BlockingClient::send_batch`]: crate::BlockingClient::send_batch +/// [`Event`]: crate::Event +#[must_use] +#[derive(Debug, Default)] +pub struct BatchRequest { + inner: Option>, +} + +impl BatchRequest { + /// Creates a new empty batch request builder. + pub fn new() -> Self { + Self::default() + } + + /// Consumes the batch and returns its raw contents, if any requests were added. + /// + /// Returns `Some` if the batch is non-empty, or `None` if it was empty. + pub fn into_inner(self) -> Option> { + self.inner + } + + /// Adds a tracked request to the batch with a typed callback. + /// + /// The callback will be invoked with the deserialized response (or error) once the server + /// replies. The callback is type-erased internally, so it works for both async and blocking + /// clients. + pub fn request(&mut self, req: Req, callback: F) + where + Req: RequestExt + Send + Sync + 'static, + F: FnOnce(ResponseResult) + Send + Sync + 'static, + { + crate::RawOneOrMany::push_opt(&mut self.inner, PendingRequest::new(req, Some(callback))); + } + + /// Adds a tracked request and returns an async receiver for the response. + /// + /// This is a convenience wrapper around [`request`](Self::request) that creates a + /// [`futures::channel::oneshot`] channel internally. The returned receiver can be + /// `.await`ed after the batch is sent. + pub fn request_async( + &mut self, + req: Req, + ) -> futures::channel::oneshot::Receiver> + where + Req: RequestExt + Send + Sync + 'static, + Req::Response: Send, + { + let (tx, rx) = futures::channel::oneshot::channel(); + self.request(req, move |result| { + let _ = tx.send(result); + }); + rx + } + + /// Adds a tracked request and returns a blocking receiver for the response. + /// + /// This is a convenience wrapper around [`request`](Self::request) that creates a + /// [`std::sync::mpsc::sync_channel`] internally. The returned receiver can be used + /// with [`recv`](std::sync::mpsc::Receiver::recv) after the batch is sent. + pub fn request_blocking( + &mut self, + req: Req, + ) -> std::sync::mpsc::Receiver> + where + Req: RequestExt + Send + Sync + 'static, + Req::Response: Send, + { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + self.request(req, move |result| { + let _ = tx.send(result); + }); + rx + } + + /// Adds an event-style request to the batch. + /// + /// These requests do not take a callback. Any server response (including the initial result + /// and any future notifications) will be delivered as [`Event`]s through the event receiver. + /// + /// Use this for subscription-style RPCs where responses should be handled uniformly as events. + /// + /// [`Event`]: crate::Event + pub fn event_request(&mut self, req: Req) { + crate::RawOneOrMany::push_opt(&mut self.inner, PendingRequest::event(req)); + } +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..a9a3101 --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,207 @@ +//! Low-level JSON-RPC protocol types for the Electrum wire format. +//! +//! This module defines the raw message types exchanged between client and server, +//! including requests, responses, notifications, and error wrappers. + +use serde_json::Value; +use std::fmt::Display; + +use crate::{CowStr, Request}; + +/// The JSON-RPC protocol version supported by this client. +/// +/// Always set to `"2.0"` per the Electrum protocol specification. +pub const JSONRPC_VERSION_2_0: &str = "2.0"; + +/// Represents the `jsonrpc` version field in JSON-RPC messages. +/// +/// In Electrum, this is always the string `"2.0"`, as required by the JSON-RPC 2.0 specification. +/// It appears in all standard requests, responses, and notifications. +/// +/// This type ensures consistent serialization and deserialization of the version field. +#[derive(Debug, Clone, Copy)] +pub struct Version; + +impl Display for Version { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(JSONRPC_VERSION_2_0) + } +} + +impl AsRef for Version { + fn as_ref(&self) -> &str { + JSONRPC_VERSION_2_0 + } +} + +/// A raw server-initiated JSON-RPC notification. +/// +/// These are Electrum messages that have a `"method"` and `"params"`, but no `"id"` field. +/// Typically emitted for subscriptions like `blockchain.headers.subscribe`. +#[derive(Debug, Clone, serde::Deserialize)] +#[allow(clippy::manual_non_exhaustive)] +pub struct RawNotification { + /// The JSON-RPC protocol version (should always be `"2.0"`). + #[serde( + rename(deserialize = "jsonrpc"), + deserialize_with = "crate::custom_serde::version" + )] + pub version: Version, + + /// The method name of the notification (e.g., `"blockchain.headers.subscribe"`). + pub method: CowStr, + + /// The raw parameters associated with the notification. + pub params: Value, +} + +/// A raw JSON-RPC response from the Electrum server. +/// +/// This is the server's response to a client-issued request. It may contain either a `result` +/// or an `error` (as per the JSON-RPC spec). +#[derive(Debug, Clone, serde::Deserialize)] +#[allow(clippy::manual_non_exhaustive)] +pub struct RawResponse { + /// The JSON-RPC protocol version (should always be `"2.0"`). + #[serde( + rename(deserialize = "jsonrpc"), + deserialize_with = "crate::custom_serde::version" + )] + pub version: Version, + + /// The ID that matches the request this response is answering. + pub id: u32, + + /// The result if the request succeeded, or the error object if it failed. + #[serde(flatten, deserialize_with = "crate::custom_serde::result")] + pub result: Result, +} + +/// A raw incoming message from the Electrum server. +/// +/// This type represents either a JSON-RPC notification (e.g., for a subscription) +/// or a response to a previously issued request. +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(untagged)] +pub enum RawIncoming { + /// A server-initiated notification (e.g., from a subscription). + Notification(RawNotification), + + /// A response to a previously sent request. + Response(RawResponse), +} + +/// A raw JSON-RPC request to be sent to the Electrum server. +/// +/// This struct is constructed before serialization and sending. It includes all required +/// JSON-RPC fields for method calls. +#[derive(Debug, Clone, serde::Serialize)] +pub struct RawRequest { + /// The JSON-RPC version string (usually `"2.0"`). + pub jsonrpc: CowStr, + + /// The client-assigned request ID (used to correlate with responses). + pub id: u32, + + /// The method to be invoked (e.g., `"blockchain.headers.subscribe"`). + pub method: CowStr, + + /// The parameters passed to the method. + pub params: Vec, +} + +impl RawRequest { + /// Constructs a new JSON-RPC request with the given ID, method, and parameters. + /// + /// This sets the JSON-RPC version to `"2.0"`. + pub fn new(id: u32, method: CowStr, params: Vec) -> Self { + Self { + jsonrpc: JSONRPC_VERSION_2_0.into(), + id, + method, + params, + } + } + + pub fn from_request(id: u32, req: Req) -> Self { + (id, req).into() + } +} + +/// Represents either a single item or a batch of items. +/// +/// This enum is used to generalize over sending one or many requests in the same operation. I.e. +/// to the Electrum server. +/// +/// Use `From` implementations to easily convert from `T` or `Vec`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(untagged)] +pub enum RawOneOrMany { + Single(T), + Batch(Vec), +} + +impl RawOneOrMany { + /// Converts this `RawOneOrMany` into a `Vec`. + /// + /// If it is a `Single`, returns a one-element vector. If it is a `Batch`, returns the inner vector. + pub fn into_vec(self) -> Vec { + match self { + RawOneOrMany::Single(item) => vec![item], + RawOneOrMany::Batch(batch) => batch, + } + } + + /// Pushes a new item into the given `Option>`, creating or extending the batch. + /// + /// If the option is `None`, it becomes `Some(Single(item))`. If it already contains a value, + /// it is converted into a `Batch` and the item is appended. + pub fn push_opt(opt: &mut Option, item: T) { + *opt = match opt.take() { + None => Some(Self::Single(item)), + Some(maybe_batch) => { + let mut items = maybe_batch.into_vec(); + items.push(item); + Some(RawOneOrMany::Batch(items)) + } + } + } + + pub fn map(self, f: impl Fn(T) -> T2) -> RawOneOrMany { + match self { + RawOneOrMany::Single(t) => RawOneOrMany::Single(f(t)), + RawOneOrMany::Batch(items) => RawOneOrMany::Batch(items.into_iter().map(f).collect()), + } + } + + pub fn map_into(self) -> RawOneOrMany + where + T: Into, + { + self.map(Into::into) + } +} + +impl From for RawOneOrMany { + fn from(value: T) -> Self { + Self::Single(value) + } +} + +impl From> for RawOneOrMany { + fn from(value: Vec) -> Self { + Self::Batch(value) + } +} + +/// Electrum server responds with an error. +#[derive(Debug, Clone, serde::Deserialize)] +pub struct ResponseError(pub(crate) Value); + +impl std::fmt::Display for ResponseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Response.error: {}", self.0) + } +} + +impl std::error::Error for ResponseError {} diff --git a/src/state.rs b/src/request_tracker.rs similarity index 80% rename from src/state.rs rename to src/request_tracker.rs index 13513de..c6cc2e7 100644 --- a/src/state.rs +++ b/src/request_tracker.rs @@ -1,7 +1,7 @@ use crate::*; use bitcoin::block::Header; use notification::Notification; -use pending_request::{ErroredRequest, PendingRequest, SatisfiedRequest}; +use pending_request::{CompletedRequest, FailedRequest, PendingRequest}; use std::collections::HashMap; /// Represents a high-level event produced after processing a server notification or response. @@ -10,12 +10,12 @@ pub enum Event { /// A successfully satisfied response to a previously tracked request. /// /// Contains the original request and the parsed result. - Response(SatisfiedRequest), + Response(CompletedRequest), /// A failed response to a previously tracked request. /// /// Contains the original request and the error returned by the server. - ResponseError(ErroredRequest), + ResponseError(FailedRequest), /// A server-initiated notification that was not in response to any tracked request. /// @@ -33,13 +33,13 @@ impl Event { /// Returns `None` if the event does not include any header information. pub fn try_to_headers(&self) -> Option> { match self { - Event::Response(SatisfiedRequest::Header { req, resp }) => { + Event::Response(CompletedRequest::Header { req, resp }) => { Some(vec![(req.height, resp.header)]) } - Event::Response(SatisfiedRequest::Headers { req, resp }) => { + Event::Response(CompletedRequest::Headers { req, resp }) => { Some((req.start_height..).zip(resp.headers.clone()).collect()) } - Event::Response(SatisfiedRequest::HeadersWithCheckpoint { req, resp }) => { + Event::Response(CompletedRequest::HeadersWithCheckpoint { req, resp }) => { Some((req.start_height..).zip(resp.headers.clone()).collect()) } Event::Notification(Notification::Header(n)) => Some(vec![(n.height(), *n.header())]), @@ -50,27 +50,27 @@ impl Event { /// A sans-io structure that manages the state of an Electrum client. /// -/// The [`State`] tracks outgoing requests and handles incoming messages from the Electrum server. +/// The [`RequestTracker`] tracks outgoing requests and handles incoming messages from the Electrum server. /// -/// Use [`State::track_request`] to register a new request. This method stores the request +/// Use [`RequestTracker::track_request`] to register a new request. This method stores the request /// internally and returns a [`RawRequest`] that can be sent to the server. /// -/// Use [`State::process_incoming`] to handle messages received from the server. It updates internal +/// Use [`RequestTracker::handle_incoming`] to handle messages received from the server. It updates internal /// state as needed and may return an [`Event`] representing a notification or a response to a /// previously tracked request. #[derive(Debug)] -pub struct State { +pub struct RequestTracker { pending: HashMap, } -impl Default for State { +impl Default for RequestTracker { fn default() -> Self { Self::new() } } -impl State { - /// Creates a new [`State`] instance. +impl RequestTracker { + /// Creates a new [`RequestTracker`] instance. pub fn new() -> Self { Self { pending: HashMap::new(), @@ -83,7 +83,7 @@ impl State { } /// Returns an iterator over all pending requests that have been registered with - /// [`State::track_request`] but have not yet received a response. + /// [`RequestTracker::track_request`] but have not yet received a response. /// /// Each item in the iterator is a [`RawRequest`] containing the request ID, method name, /// and parameters, which can be serialized and sent to the Electrum server. @@ -98,24 +98,28 @@ impl State { /// or batch of [`RawRequest`]s to be sent to the Electrum server. /// /// Each request is assigned a unique ID (via `next_id`) and stored internally until a matching - /// response is received via [`State::process_incoming`]. + /// response is received via [`RequestTracker::handle_incoming`]. /// - /// Returns a [`MaybeBatch`], preserving whether the input was a single request or a + /// Returns a [`RawOneOrMany`], preserving whether the input was a single request or a /// batch. - pub fn track_request(&mut self, next_id: &mut u32, req: R) -> MaybeBatch + pub fn track_request(&mut self, next_id: &mut u32, req: R) -> RawOneOrMany where - R: Into>, + R: Into>, { - fn _add_request(state: &mut State, next_id: &mut u32, req: PendingRequest) -> RawRequest { + fn _add_request( + tracker: &mut RequestTracker, + next_id: &mut u32, + req: PendingRequest, + ) -> RawRequest { let id = *next_id; *next_id = id.wrapping_add(1); let (method, params) = req.to_method_and_params(); - state.pending.insert(id, req); + tracker.pending.insert(id, req); RawRequest::new(id, method, params) } match req.into() { - MaybeBatch::Single(req) => _add_request(self, next_id, req).into(), - MaybeBatch::Batch(v) => v + RawOneOrMany::Single(req) => _add_request(self, next_id, req).into(), + RawOneOrMany::Batch(v) => v .into_iter() .map(|req| _add_request(self, next_id, req)) .collect::>() @@ -132,12 +136,12 @@ impl State { /// /// Returns `Ok(Some(Event))` if an event was produced, `Ok(None)` if no event was needed, or /// `Err(ProcessError)` if the input could not be parsed or did not match any known request. - pub fn process_incoming( + pub fn handle_incoming( &mut self, - notification_or_response: RawNotificationOrResponse, + incoming: RawIncoming, ) -> Result, ProcessError> { - match notification_or_response { - RawNotificationOrResponse::Notification(raw) => { + match incoming { + RawIncoming::Notification(raw) => { let notification = Notification::new(&raw).map_err(|error| { ProcessError::CannotDeserializeNotification { method: raw.method, @@ -147,7 +151,7 @@ impl State { })?; Ok(Some(Event::Notification(notification))) } - RawNotificationOrResponse::Response(resp) => { + RawIncoming::Response(resp) => { let pending_req = self .pending .remove(&resp.id) diff --git a/tests/synopsis.rs b/tests/synopsis.rs index 48f2c8d..129a605 100644 --- a/tests/synopsis.rs +++ b/tests/synopsis.rs @@ -4,7 +4,7 @@ use async_std::{net::TcpStream, stream::StreamExt}; use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; use bitcoin::Amount; use electrum_streaming_client::{ - notification::Notification, request, AsyncClient, Event, SatisfiedRequest, + notification::Notification, request, AsyncClient, CompletedRequest, Event, }; use futures::{ executor::{block_on, ThreadPool}, @@ -36,12 +36,12 @@ fn synopsis() -> anyhow::Result<()> { ))?; assert!(matches!( event_rx.next().await, - Some(Event::Response(SatisfiedRequest::HeadersSubscribe { .. })) + Some(Event::Response(CompletedRequest::HeadersSubscribe { .. })) )); assert!(matches!( event_rx.next().await, Some(Event::Response( - SatisfiedRequest::ScriptHashSubscribe { .. } + CompletedRequest::ScriptHashSubscribe { .. } )) )); From d9521fe78ec28ff1698a71a49e8bd2d43303814e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 30 Jan 2026 11:23:18 +0000 Subject: [PATCH 3/5] refactor!: Move client type aliases into client module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move async/blocking type aliases from lib.rs into the client module. Delete 4 unused aliases (AsyncEventSender, AsyncRequestReceiver, BlockingRequestReceiver, BlockingEventSender). Re-export only the client structs and error types at the crate root. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/client.rs | 56 +++++++++++++++++++++++++++++++++------- src/lib.rs | 71 ++++----------------------------------------------- 2 files changed, 52 insertions(+), 75 deletions(-) diff --git a/src/client.rs b/src/client.rs index 56a42f2..2c55200 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,44 @@ use crate::pending_request::{PendingRequest, RequestExt}; use crate::*; +// --- Async client type aliases --- + +/// The sending half of the channel used to enqueue one or more requests from [`AsyncClient`]. +/// +/// These requests are processed and forwarded to [`RequestTracker::track_request`] to be assigned an ID and serialized. +pub type AsyncRequestSender = futures::channel::mpsc::UnboundedSender>; + +/// The error returned by [`AsyncClient::send_request`] when a request fails. +/// +/// This may occur if the server responds with an error, the request is canceled, or the client is shut down. +pub type AsyncRequestError = crate::request::Error; + +/// The error that occurs when a request cannot be sent into the async request channel. +/// +/// This typically means the client's background task has shut down or the queue is disconnected. +pub type AsyncRequestSendError = futures::channel::mpsc::TrySendError>; + +/// The receiving half of the internal event stream, returned to users of [`AsyncClient`]. +/// +/// This yields all incoming [`Event`]s from the Electrum server, including notifications and responses. +pub type AsyncEventReceiver = futures::channel::mpsc::UnboundedReceiver; + +// --- Blocking client type aliases --- + +/// Channel sender for sending blocking requests from [`BlockingClient`] to the write thread. +pub type BlockingRequestSender = std::sync::mpsc::Sender>; + +/// Error returned by [`BlockingClient::send_request`] if the request fails or is canceled. +pub type BlockingRequestError = crate::request::Error; + +/// Error that occurs when a blocking request cannot be sent to the internal request channel. +/// +/// Typically indicates that the client has been shut down. +pub type BlockingRequestSendError = std::sync::mpsc::SendError>; + +/// Channel receiver used to receive [`Event`]s from the Electrum server. +pub type BlockingEventReceiver = std::sync::mpsc::Receiver; + /// An asynchronous Electrum client built on the [`futures`] I/O ecosystem. /// /// This client allows sending JSON-RPC requests and receiving [`Event`]s from an Electrum server @@ -21,7 +59,7 @@ use crate::*; /// [`Event`]: crate::Event /// [`AsyncBufRead`]: futures::io::AsyncBufRead /// [`AsyncWrite`]: futures::io::AsyncWrite -/// [`AsyncEventReceiver`]: crate::AsyncEventReceiver +/// [`AsyncEventReceiver`]: crate::client::AsyncEventReceiver #[derive(Debug, Clone)] pub struct AsyncClient { tx: AsyncRequestSender, @@ -48,7 +86,7 @@ impl AsyncClient { /// - A `Future`: the client worker loop. This must be polled (e.g., via `tokio::spawn`) /// to drive the connection. /// - /// [`AsyncEventReceiver`]: crate::AsyncEventReceiver + /// [`AsyncEventReceiver`]: crate::client::AsyncEventReceiver /// [`Event`]: crate::Event pub fn new( reader: R, @@ -116,7 +154,7 @@ impl AsyncClient { /// - A `Future`: the client worker loop. This must be spawned or polled to keep the client /// alive. /// - /// [`AsyncEventReceiver`]: crate::AsyncEventReceiver + /// [`AsyncEventReceiver`]: crate::client::AsyncEventReceiver /// [`Event`]: crate::Event /// [`AsyncClient::new`]: crate::AsyncClient::new #[cfg(feature = "tokio")] @@ -179,7 +217,7 @@ impl AsyncClient { /// /// [`send_request`]: Self::send_request /// [`Event`]: crate::Event - /// [`AsyncEventReceiver`]: crate::AsyncEventReceiver + /// [`AsyncEventReceiver`]: crate::client::AsyncEventReceiver /// [`AsyncRequestSendError`]: crate::AsyncRequestSendError pub fn send_event_request(&self, request: Req) -> Result<(), AsyncRequestSendError> where @@ -209,7 +247,7 @@ impl AsyncClient { /// [`BatchRequest`]: crate::BatchRequest /// [`BatchRequest::request`]: crate::BatchRequest::request /// [`BatchRequest::event_request`]: crate::BatchRequest::event_request - /// [`AsyncEventReceiver`]: crate::AsyncEventReceiver + /// [`AsyncEventReceiver`]: crate::client::AsyncEventReceiver pub fn send_batch(&self, batch_req: BatchRequest) -> Result { match batch_req.into_inner() { Some(batch) => self.tx.unbounded_send(batch).map(|_| true), @@ -229,7 +267,7 @@ impl AsyncClient { /// Use the associated [`BlockingEventReceiver`] to receive [`Event`]s emitted by the server. /// /// [`Event`]: crate::Event -/// [`BlockingEventReceiver`]: crate::BlockingEventReceiver +/// [`BlockingEventReceiver`]: crate::client::BlockingEventReceiver #[derive(Debug, Clone)] pub struct BlockingClient { tx: BlockingRequestSender, @@ -258,7 +296,7 @@ impl BlockingClient { /// used to monitor or explicitly join the background threads if desired. /// /// [`Event`]: crate::Event - /// [`BlockingEventReceiver`]: crate::BlockingEventReceiver + /// [`BlockingEventReceiver`]: crate::client::BlockingEventReceiver /// [`JoinHandle`]: std::thread::JoinHandle pub fn new( reader: R, @@ -347,7 +385,7 @@ impl BlockingClient { /// Returns [`BlockingRequestSendError`] if the request could not be queued for sending. /// /// [`Event`]: crate::Event - /// [`BlockingEventReceiver`]: crate::BlockingEventReceiver + /// [`BlockingEventReceiver`]: crate::client::BlockingEventReceiver /// [`BlockingRequestSendError`]: crate::BlockingRequestSendError pub fn send_event_request(&self, request: Req) -> Result<(), BlockingRequestSendError> where @@ -377,7 +415,7 @@ impl BlockingClient { /// [`BatchRequest`]: crate::BatchRequest /// [`BatchRequest::request`]: crate::BatchRequest::request /// [`BatchRequest::event_request`]: crate::BatchRequest::event_request - /// [`BlockingEventReceiver`]: crate::BlockingEventReceiver + /// [`BlockingEventReceiver`]: crate::client::BlockingEventReceiver pub fn send_batch(&self, batch_req: BatchRequest) -> Result { match batch_req.into_inner() { Some(batch) => self.tx.send(batch).map(|_| true), diff --git a/src/lib.rs b/src/lib.rs index b4817bd..c48897e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,9 @@ #![doc = include_str!("../README.md")] -mod client; -pub use client::*; +pub mod client; +pub use client::{ + AsyncClient, AsyncRequestError, AsyncRequestSendError, BlockingClient, BlockingRequestError, + BlockingRequestSendError, +}; mod custom_serde; mod hash_types; pub mod io; @@ -28,67 +31,3 @@ pub type MethodAndParams = (CowStr, Vec); /// A server response that is either a success (`Ok`) or a JSON-RPC error (`Err`). pub type ResponseResult = Result; - -/// Internal type aliases for asynchronous client components. -mod async_aliases { - use super::*; - use futures::channel::mpsc::{TrySendError, UnboundedReceiver, UnboundedSender}; - use pending_request::PendingRequest; - - /// The sending half of the channel used to enqueue one or more requests from [`AsyncClient`]. - /// - /// These requests are processed and forwarded to [`RequestTracker::track_request`] to be assigned an ID and serialized. - pub type AsyncRequestSender = UnboundedSender>; - - /// The receiving half of the request channel used internally by the async client. - /// - /// Requests sent by [`AsyncClient`] are dequeued here and forwarded to [`RequestTracker::track_request`]. - pub type AsyncRequestReceiver = UnboundedReceiver>; - - /// The error returned by [`AsyncClient::send_request`] when a request fails. - /// - /// This may occur if the server responds with an error, the request is canceled, or the client is shut down. - pub type AsyncRequestError = request::Error; - - /// The error that occurs when a request cannot be sent into the async request channel. - /// - /// This typically means the client's background task has shut down or the queue is disconnected. - pub type AsyncRequestSendError = TrySendError>; - - /// The sending half of the internal event stream, used to emit [`Event`]s from the client worker loop. - pub type AsyncEventSender = UnboundedSender; - - /// The receiving half of the internal event stream, returned to users of [`AsyncClient`]. - /// - /// This yields all incoming [`Event`]s from the Electrum server, including notifications and responses. - pub type AsyncEventReceiver = UnboundedReceiver; -} -pub use async_aliases::*; - -/// Internal type aliases for blocking client components. -mod blocking_aliases { - use super::*; - use pending_request::PendingRequest; - use std::sync::mpsc::{Receiver, SendError, Sender}; - - /// Channel sender for sending blocking requests from [`BlockingClient`] to the write thread. - pub type BlockingRequestSender = Sender>; - - /// Channel receiver used by the write thread to dequeue pending requests. - pub type BlockingRequestReceiver = Receiver>; - - /// Error returned by [`BlockingClient::send_request`] if the request fails or is canceled. - pub type BlockingRequestError = request::Error; - - /// Error that occurs when a blocking request cannot be sent to the internal request channel. - /// - /// Typically indicates that the client has been shut down. - pub type BlockingRequestSendError = SendError>; - - /// Channel sender used by the read thread to emit [`Event`]s. - pub type BlockingEventSender = Sender; - - /// Channel receiver used to receive [`Event`]s from the Electrum server. - pub type BlockingEventReceiver = Receiver; -} -pub use blocking_aliases::*; From aab555301c548d7cc5920cca40418cf313177b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 30 Jan 2026 11:51:59 +0000 Subject: [PATCH 4/5] feat!: Add Electrum protocol v1.6 method support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add `ServerVersion` request type (`server.version`) - Add optional `mode` parameter to `EstimateFee` (breaking: new field) - Support both pre-1.6 (concatenated hex) and v1.6 (list of hex strings) response formats for `blockchain.block.headers` - Add `BroadcastPackage` request type (`blockchain.transaction.broadcast_package`) - Add `GetMempoolInfo` request type (`mempool.get_info`) - Deprecate `RelayFee` in favor of `GetMempoolInfo` - Add missing `Features` to `gen_pending_request_types!` macro Closes #8 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/custom_serde.rs | 45 +++++++++++++----- src/pending_request.rs | 7 +++ src/request.rs | 102 ++++++++++++++++++++++++++++++++++++++++- src/response.rs | 63 ++++++++++++++++++++++--- 4 files changed, 197 insertions(+), 20 deletions(-) diff --git a/src/custom_serde.rs b/src/custom_serde.rs index 722fffb..7e307b6 100644 --- a/src/custom_serde.rs +++ b/src/custom_serde.rs @@ -19,23 +19,44 @@ where deserialize_hex(&hex_str).map_err(serde::de::Error::custom) } -pub fn from_cancat_consensus_hex<'de, T, D>(deserializer: D) -> Result, D::Error> +/// Deserializes headers from either: +/// - A single concatenated hex string (pre-1.6: `"hex"` field) +/// - An array of individual hex strings (v1.6+: `"headers"` field) +pub fn headers_from_hex_or_list<'de, T, D>(deserializer: D) -> Result, D::Error> where T: bitcoin::consensus::encode::Decodable, D: Deserializer<'de>, { - let hex_str = String::deserialize(deserializer)?; - let data = Vec::::from_hex(&hex_str).map_err(serde::de::Error::custom)?; - - let mut items = Vec::::new(); - let mut read_start = 0_usize; - while read_start < data.len() { - let (item, read_count) = - deserialize_partial::(&data[read_start..]).map_err(serde::de::Error::custom)?; - read_start += read_count; - items.push(item); + let value = Value::deserialize(deserializer)?; + match value { + Value::String(hex_str) => { + // Pre-1.6: single concatenated hex string + let data = Vec::::from_hex(&hex_str).map_err(serde::de::Error::custom)?; + let mut items = Vec::::new(); + let mut read_start = 0_usize; + while read_start < data.len() { + let (item, read_count) = + deserialize_partial::(&data[read_start..]).map_err(serde::de::Error::custom)?; + read_start += read_count; + items.push(item); + } + Ok(items) + } + Value::Array(arr) => { + // v1.6: array of hex strings + arr.into_iter() + .map(|v| { + let hex_str = v.as_str().ok_or_else(|| { + serde::de::Error::custom("expected hex string in headers array") + })?; + deserialize_hex(hex_str).map_err(serde::de::Error::custom) + }) + .collect() + } + _ => Err(serde::de::Error::custom( + "expected a hex string or array of hex strings for headers", + )), } - Ok(items) } pub fn feerate_opt_from_btc_per_kb<'de, D>( diff --git a/src/pending_request.rs b/src/pending_request.rs index 4686b9c..d7e9e33 100644 --- a/src/pending_request.rs +++ b/src/pending_request.rs @@ -32,6 +32,7 @@ macro_rules! gen_pending_request_types { /// /// [`Event::Response`]: crate::Event::Response #[derive(Debug, Clone)] + #[allow(deprecated)] pub enum CompletedRequest { $($name { req: crate::request::$name, @@ -52,6 +53,7 @@ macro_rules! gen_pending_request_types { /// /// [`Event::ResponseError`]: crate::Event::ResponseError #[derive(Debug, Clone)] + #[allow(deprecated)] pub enum FailedRequest { $($name { req: crate::request::$name, @@ -70,6 +72,7 @@ macro_rules! gen_pending_request_types { impl std::error::Error for FailedRequest {} $( + #[allow(deprecated)] impl RequestExt for crate::request::$name { fn into_completed(self, resp: ::Response) -> CompletedRequest { CompletedRequest::$name { req: self, resp } @@ -97,11 +100,15 @@ gen_pending_request_types! { ScriptHashSubscribe, ScriptHashUnsubscribe, BroadcastTx, + BroadcastPackage, GetTx, GetTxMerkle, GetTxidFromPos, GetFeeHistogram, + GetMempoolInfo, + ServerVersion, Banner, + Features, Ping, Custom } diff --git a/src/request.rs b/src/request.rs index 9973529..9d23602 100644 --- a/src/request.rs +++ b/src/request.rs @@ -235,17 +235,29 @@ impl Request for HeadersWithCheckpoint { /// fee rate (in BTC per kilobyte) required to be included within the specified number of blocks. /// /// See: -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct EstimateFee { /// The number of blocks to target for confirmation. pub number: usize, + + /// An optional estimation mode passed to the server's `estimatesmartfee` RPC. + /// + /// Common values include `"ECONOMICAL"` and `"CONSERVATIVE"`. + /// If `None`, the server uses its default mode. + /// + /// Added in Electrum protocol v1.6. + pub mode: Option, } impl Request for EstimateFee { type Response = response::EstimateFeeResp; fn to_method_and_params(&self) -> MethodAndParams { - ("blockchain.estimatefee".into(), vec![self.number.into()]) + let mut params: Vec = vec![self.number.into()]; + if let Some(mode) = &self.mode { + params.push(mode.as_ref().into()); + } + ("blockchain.estimatefee".into(), params) } } @@ -272,9 +284,14 @@ impl Request for HeadersSubscribe { /// fee rate (in BTC per kilobyte) that the server will accept for relaying transactions. /// /// See: +#[deprecated( + since = "0.5.0", + note = "Removed in Electrum protocol v1.6. Use GetMempoolInfo instead (mempool.get_info)." +)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct RelayFee; +#[allow(deprecated)] impl Request for RelayFee { type Response = response::RelayFeeResp; @@ -589,6 +606,32 @@ impl Request for GetTxidFromPos { } } +/// A request to broadcast a package of transactions to the network. +/// +/// This corresponds to the `"blockchain.transaction.broadcast_package"` Electrum RPC method, +/// which submits a package of related transactions (e.g., for CPFP or package relay). +/// +/// Added in Electrum protocol v1.6. +/// +/// See: +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BroadcastPackage { + /// The raw transactions to broadcast, each as a hex-encoded string. + pub raw_txs: Vec, +} + +impl Request for BroadcastPackage { + type Response = response::BroadcastPackageResp; + + fn to_method_and_params(&self) -> MethodAndParams { + let txs: Vec = self.raw_txs.iter().map(|s| s.as_str().into()).collect(); + ( + "blockchain.transaction.broadcast_package".into(), + vec![txs.into()], + ) + } +} + /// A request for the current mempool fee histogram. /// /// This corresponds to the `"mempool.get_fee_histogram"` Electrum RPC method. It returns a compact @@ -607,6 +650,61 @@ impl Request for GetFeeHistogram { } } +/// A request to negotiate the protocol version with the Electrum server. +/// +/// This corresponds to the `"server.version"` Electrum RPC method. It identifies the client and +/// negotiates a compatible protocol version with the server. According to the Electrum protocol +/// specification, this should be the first message sent after connecting. +/// +/// The server will select the highest protocol version that both client and server support. +/// +/// See: +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ServerVersion { + /// A string identifying the client software (e.g., `"electrum_streaming_client/0.5"`). + pub client_name: CowStr, + + /// The protocol version or version range the client supports. + /// + /// Can be a single version string (e.g., `"1.6"`) or an array-style string for a range. + pub protocol_version: CowStr, +} + +impl Request for ServerVersion { + type Response = response::ServerVersionResp; + + fn to_method_and_params(&self) -> MethodAndParams { + ( + "server.version".into(), + vec![ + self.client_name.as_ref().into(), + self.protocol_version.as_ref().into(), + ], + ) + } +} + +/// A request for general mempool information from the Electrum server. +/// +/// This corresponds to the `"mempool.get_info"` Electrum RPC method. It returns fee-related +/// parameters including `mempoolminfee`, `minrelaytxfee`, and `incrementalrelayfee`. +/// +/// This replaces the deprecated `blockchain.relayfee` method. +/// +/// Added in Electrum protocol v1.6. +/// +/// See: +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct GetMempoolInfo; + +impl Request for GetMempoolInfo { + type Response = response::MempoolInfoResp; + + fn to_method_and_params(&self) -> MethodAndParams { + ("mempool.get_info".into(), vec![]) + } +} + /// A request for the Electrum server's banner message. /// /// This corresponds to the `"server.banner"` Electrum RPC method, which returns a server-defined diff --git a/src/response.rs b/src/response.rs index 7510ec4..3731ed7 100644 --- a/src/response.rs +++ b/src/response.rs @@ -14,6 +14,20 @@ use bitcoin::{ use crate::DoubleSHA; +/// Response to the `"server.version"` method. +/// +/// Returns the server's software version and the negotiated protocol version. +/// +/// See: +#[derive(Debug, Clone, serde::Deserialize)] +pub struct ServerVersionResp { + /// The server's software version string. + pub server_software: String, + + /// The negotiated protocol version string. + pub protocol_version: String, +} + /// Response to the `"blockchain.block.header"` method (without checkpoint). #[derive(Debug, Clone, serde::Deserialize, PartialEq, Eq)] #[serde(transparent)] @@ -38,6 +52,9 @@ pub struct HeaderWithProofResp { } /// Response to the `"blockchain.block.headers"` method (without checkpoint). +/// +/// Supports both the pre-1.6 format (concatenated hex in `"hex"` field) and the v1.6 format +/// (array of hex strings in `"headers"` field). #[derive(Debug, Clone, serde::Deserialize)] pub struct HeadersResp { /// The number of headers returned. @@ -45,16 +62,20 @@ pub struct HeadersResp { /// The deserialized headers returned by the server. #[serde( - rename = "hex", - deserialize_with = "crate::custom_serde::from_cancat_consensus_hex" + alias = "hex", + alias = "headers", + deserialize_with = "crate::custom_serde::headers_from_hex_or_list" )] pub headers: Vec, - /// The server’s maximum allowed headers per request. + /// The server's maximum allowed headers per request. pub max: usize, } /// Response to the `"blockchain.block.headers"` method with a `cp_height` parameter. +/// +/// Supports both the pre-1.6 format (concatenated hex in `"hex"` field) and the v1.6 format +/// (array of hex strings in `"headers"` field). #[derive(Debug, Clone, serde::Deserialize)] pub struct HeadersWithCheckpointResp { /// The number of headers returned. @@ -62,12 +83,13 @@ pub struct HeadersWithCheckpointResp { /// The deserialized headers returned by the server. #[serde( - rename = "hex", - deserialize_with = "crate::custom_serde::from_cancat_consensus_hex" + alias = "hex", + alias = "headers", + deserialize_with = "crate::custom_serde::headers_from_hex_or_list" )] pub headers: Vec, - /// The server’s maximum allowed headers per request. + /// The server's maximum allowed headers per request. pub max: usize, /// The Merkle root of all headers up to the checkpoint height. @@ -281,6 +303,35 @@ pub struct FeePair { pub weight: bitcoin::Weight, } +/// Response to the `"blockchain.transaction.broadcast_package"` method (non-verbose mode). +/// +/// See: +#[derive(Debug, Clone, serde::Deserialize)] +pub struct BroadcastPackageResp { + /// Whether the package was accepted by the server. + pub success: bool, +} + +/// Response to the `"mempool.get_info"` method. +/// +/// Provides fee-related information about the server's mempool. All fee rates are in BTC/kvB. +/// +/// See: +#[derive(Debug, Clone, serde::Deserialize)] +pub struct MempoolInfoResp { + /// The minimum fee rate (BTC/kvB) for a transaction to be accepted into the mempool. + #[serde(deserialize_with = "crate::custom_serde::feerate_opt_from_btc_per_kb")] + pub mempoolminfee: Option, + + /// The minimum relay fee rate (BTC/kvB). + #[serde(deserialize_with = "crate::custom_serde::feerate_opt_from_btc_per_kb")] + pub minrelaytxfee: Option, + + /// The incremental relay fee rate (BTC/kvB). + #[serde(deserialize_with = "crate::custom_serde::feerate_opt_from_btc_per_kb")] + pub incrementalrelayfee: Option, +} + /// Response to the `"server.features"` method. #[derive(Debug, Clone, serde::Deserialize)] pub struct ServerFeatures { From 39ba0592a4fe09a300a58c1bcb644b92f79f8a89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 30 Jan 2026 12:01:05 +0000 Subject: [PATCH 5/5] fix: Address PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace EstimateFee mode Option with EstimateFeeMode enum (Conservative, Economical) - Change BroadcastPackage to take Vec instead of Vec, matching BroadcastTx's pattern of consensus-encoding internally 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/request.rs | 44 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/src/request.rs b/src/request.rs index 9d23602..23db450 100644 --- a/src/request.rs +++ b/src/request.rs @@ -235,18 +235,39 @@ impl Request for HeadersWithCheckpoint { /// fee rate (in BTC per kilobyte) required to be included within the specified number of blocks. /// /// See: -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// The fee estimation mode passed to the server's `estimatesmartfee` RPC. +/// +/// Added in Electrum protocol v1.6. +/// +/// See: +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum EstimateFeeMode { + /// Conservative fee estimation (less likely to underestimate). + Conservative, + /// Economical fee estimation (may underestimate for faster inclusion). + Economical, +} + +impl EstimateFeeMode { + fn as_str(&self) -> &'static str { + match self { + Self::Conservative => "CONSERVATIVE", + Self::Economical => "ECONOMICAL", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct EstimateFee { /// The number of blocks to target for confirmation. pub number: usize, /// An optional estimation mode passed to the server's `estimatesmartfee` RPC. /// - /// Common values include `"ECONOMICAL"` and `"CONSERVATIVE"`. /// If `None`, the server uses its default mode. /// /// Added in Electrum protocol v1.6. - pub mode: Option, + pub mode: Option, } impl Request for EstimateFee { @@ -255,7 +276,7 @@ impl Request for EstimateFee { fn to_method_and_params(&self) -> MethodAndParams { let mut params: Vec = vec![self.number.into()]; if let Some(mode) = &self.mode { - params.push(mode.as_ref().into()); + params.push(mode.as_str().into()); } ("blockchain.estimatefee".into(), params) } @@ -615,16 +636,21 @@ impl Request for GetTxidFromPos { /// /// See: #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct BroadcastPackage { - /// The raw transactions to broadcast, each as a hex-encoded string. - pub raw_txs: Vec, -} +pub struct BroadcastPackage(pub Vec); impl Request for BroadcastPackage { type Response = response::BroadcastPackageResp; fn to_method_and_params(&self) -> MethodAndParams { - let txs: Vec = self.raw_txs.iter().map(|s| s.as_str().into()).collect(); + let txs: Vec = self + .0 + .iter() + .map(|tx| { + let mut tx_bytes = Vec::::new(); + tx.consensus_encode(&mut tx_bytes).expect("must encode"); + tx_bytes.to_lower_hex_string().into() + }) + .collect(); ( "blockchain.transaction.broadcast_package".into(), vec![txs.into()],