diff --git a/Cargo.lock b/Cargo.lock index 4aea798c1..153565d2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3175,6 +3175,7 @@ dependencies = [ "amp-datasets-registry", "amp-object-store", "amp-parquet", + "amp-providers-common", "amp-providers-registry", "arrow", "async-stream", @@ -3195,6 +3196,7 @@ dependencies = [ "itertools 0.14.0", "js-runtime", "metadata-db", + "monitoring", "object_store", "parking_lot", "pgtemp", diff --git a/crates/bin/ampd/src/controller_cmd.rs b/crates/bin/ampd/src/controller_cmd.rs index 8609b47b6..9600cb6d1 100644 --- a/crates/bin/ampd/src/controller_cmd.rs +++ b/crates/bin/ampd/src/controller_cmd.rs @@ -60,7 +60,7 @@ pub async fn run( (datasets_registry, providers_registry) }; let datasets_cache = DatasetsCache::new(datasets_registry.clone()); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone()); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), meter.as_ref()); let (addr, server) = controller::service::new( build_info, diff --git a/crates/bin/ampd/src/server_cmd.rs b/crates/bin/ampd/src/server_cmd.rs index 80472f125..763c5282d 100644 --- a/crates/bin/ampd/src/server_cmd.rs +++ b/crates/bin/ampd/src/server_cmd.rs @@ -62,7 +62,7 @@ pub async fn run( let isolate_pool = IsolatePool::new(); let datasets_cache = DatasetsCache::new(datasets_registry); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry, meter.as_ref()); let server_config = config_from_common(&config); diff --git a/crates/bin/ampd/src/solo_cmd.rs b/crates/bin/ampd/src/solo_cmd.rs index 9ef623895..28c191ab1 100644 --- a/crates/bin/ampd/src/solo_cmd.rs +++ b/crates/bin/ampd/src/solo_cmd.rs @@ -122,7 +122,7 @@ pub async fn run( let isolate_pool = IsolatePool::new(); let datasets_cache = DatasetsCache::new(datasets_registry.clone()); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone()); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), meter.as_ref()); // Spawn controller (Admin API) if enabled let controller_fut: ControllerFuture = if admin_server { diff --git a/crates/bin/ampd/src/worker_cmd.rs b/crates/bin/ampd/src/worker_cmd.rs index 3f45e818e..d832cbbdd 100644 --- a/crates/bin/ampd/src/worker_cmd.rs +++ b/crates/bin/ampd/src/worker_cmd.rs @@ -62,7 +62,7 @@ pub async fn run( let isolate_pool = IsolatePool::new(); let datasets_cache = DatasetsCache::new(datasets_registry.clone()); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone()); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), meter.as_ref()); // Convert common config to worker-specific config let worker_config = config_from_common(&config); diff --git a/crates/core/common/Cargo.toml b/crates/core/common/Cargo.toml index b602adf2e..dae6e8169 100644 --- a/crates/core/common/Cargo.toml +++ b/crates/core/common/Cargo.toml @@ -9,6 +9,7 @@ alloy.workspace = true amp-data-store = { path = "../data-store" } amp-datasets-registry = { path = "../datasets-registry" } amp-parquet = { path = "../parquet" } +amp-providers-common = { path = "../providers-common" } amp-providers-registry = { path = "../providers-registry" } arrow.workspace = true async-stream.workspace = true @@ -27,6 +28,7 @@ futures.workspace = true itertools.workspace = true js-runtime = { path = "../js-runtime" } metadata-db = { path = "../metadata-db" } +monitoring = { path = "../monitoring" } object_store.workspace = true parking_lot = "0.12.4" rand.workspace = true diff --git a/crates/core/common/src/udfs/eth_call.rs b/crates/core/common/src/udfs/eth_call.rs index 9c10ed7ad..4f4eadacd 100644 --- a/crates/core/common/src/udfs/eth_call.rs +++ b/crates/core/common/src/udfs/eth_call.rs @@ -1,5 +1,10 @@ mod cache; +mod metrics; mod udf; pub use cache::{EthCallForNetworkError, EthCallUdfsCache}; +pub use metrics::{EthCallErrorClass, EthCallMetrics}; pub use udf::EthCall; + +#[cfg(test)] +mod tests; diff --git a/crates/core/common/src/udfs/eth_call/cache.rs b/crates/core/common/src/udfs/eth_call/cache.rs index 7974fc8d9..d033c083c 100644 --- a/crates/core/common/src/udfs/eth_call/cache.rs +++ b/crates/core/common/src/udfs/eth_call/cache.rs @@ -11,9 +11,10 @@ use datafusion::{ logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}, }; use datasets_common::network_id::NetworkId; +use monitoring::telemetry::metrics::Meter; use parking_lot::RwLock; -use super::udf::EthCall; +use super::{metrics::EthCallMetrics, udf::EthCall}; /// Manages creation and caching of `eth_call` scalar UDFs keyed by network. /// @@ -22,6 +23,7 @@ use super::udf::EthCall; pub struct EthCallUdfsCache { registry: ProvidersRegistry, cache: Arc>>, + metrics: Option>, } impl std::fmt::Debug for EthCallUdfsCache { @@ -32,10 +34,14 @@ impl std::fmt::Debug for EthCallUdfsCache { impl EthCallUdfsCache { /// Creates a new EthCall UDFs cache. - pub fn new(registry: ProvidersRegistry) -> Self { + /// + /// If a `meter` is provided, metrics will be recorded for every eth_call invocation. + pub fn new(registry: ProvidersRegistry, meter: Option<&Meter>) -> Self { + let metrics = meter.map(|m| Arc::new(EthCallMetrics::new(m))); Self { registry, cache: Default::default(), + metrics, } } @@ -61,7 +67,7 @@ impl EthCallUdfsCache { // TODO: Always selects the first provider. Rotation across retries would // require rethinking the cache key (currently NetworkId only) since the // cached UDF holds a reference to a specific provider. - let Some((name, config)) = self + let Some((provider_name, config)) = self .registry .find_providers(EvmRpcProviderKind, network) .await @@ -72,16 +78,30 @@ impl EthCallUdfsCache { provider_network = %network, "no EVM RPC provider found for network" ); + if let Some(metrics) = &self.metrics { + metrics.record_error( + network.as_str(), + "unknown", + super::metrics::EthCallErrorClass::NetworkNotConfigured, + ); + } return Err(EthCallForNetworkError::ProviderNotFound { network: network.clone(), }); }; - let provider = amp_providers_registry::create_evm_rpc_client(name.to_string(), config) - .await - .map_err(EthCallForNetworkError::ProviderCreation)?; + let provider = + amp_providers_registry::create_evm_rpc_client(provider_name.to_string(), config) + .await + .map_err(EthCallForNetworkError::ProviderCreation)?; - let udf = AsyncScalarUDF::new(Arc::new(EthCall::new(udf_name.to_string(), provider))) - .into_scalar_udf(); + let udf = AsyncScalarUDF::new(Arc::new(EthCall::new( + udf_name.to_string(), + provider, + provider_name, + network.clone(), + self.metrics.clone(), + ))) + .into_scalar_udf(); self.cache.write().insert(network.clone(), udf.clone()); diff --git a/crates/core/common/src/udfs/eth_call/metrics.rs b/crates/core/common/src/udfs/eth_call/metrics.rs new file mode 100644 index 000000000..def91e261 --- /dev/null +++ b/crates/core/common/src/udfs/eth_call/metrics.rs @@ -0,0 +1,111 @@ +//! Metrics for eth_call UDF observability. + +use monitoring::telemetry::metrics::{Counter, Histogram, KeyValue, Meter}; + +/// Error classification for eth_call metrics. +#[derive(Debug, Clone, Copy)] +pub enum EthCallErrorClass { + /// JSON-RPC error codes 3 or -32000 (non-retryable). + RpcErrorNonRetryable, + /// All retry attempts exhausted. + RpcErrorRetryExhausted, + /// Request timed out. + Timeout, + /// Invalid arguments (bad address, calldata, block). + InvalidRequest, + /// No provider configured for the requested network. + NetworkNotConfigured, + /// Unclassified error. + Other, +} + +impl EthCallErrorClass { + /// Returns the string label used in metric attributes. + pub fn as_str(&self) -> &'static str { + match self { + Self::RpcErrorNonRetryable => "rpc_error_non_retryable", + Self::RpcErrorRetryExhausted => "rpc_error_retry_exhausted", + Self::Timeout => "timeout", + Self::InvalidRequest => "invalid_request", + Self::NetworkNotConfigured => "network_not_configured", + Self::Other => "other", + } + } +} + +/// Metrics registry for eth_call UDF observability. +#[derive(Debug, Clone)] +pub struct EthCallMetrics { + /// Total eth_call UDF invocations. + requests_total: Counter, + /// Wall-clock latency including retries (milliseconds). + latency_ms: Histogram, + /// Failed invocations by error class. + errors_total: Counter, + /// Individual retry attempts. + retries_total: Counter, +} + +impl EthCallMetrics { + /// Creates a new metrics registry from the given meter. + pub fn new(meter: &Meter) -> Self { + Self { + requests_total: Counter::new( + meter, + "eth_call_requests_total", + "Total number of eth_call UDF invocations", + ), + latency_ms: Histogram::new_f64( + meter, + "eth_call_latency_ms", + "Wall-clock latency of eth_call including retries", + "milliseconds", + ), + errors_total: Counter::new( + meter, + "eth_call_errors_total", + "Total number of failed eth_call invocations", + ), + retries_total: Counter::new( + meter, + "eth_call_retries_total", + "Total number of eth_call retry attempts", + ), + } + } + + /// Record a single eth_call request. + pub(crate) fn record_request(&self, network: &str, provider: &str) { + self.requests_total + .inc_with_kvs(&self.base_kvs(network, provider)); + } + + /// Record wall-clock latency for a completed invocation. + pub(crate) fn record_latency(&self, latency_ms: f64, network: &str, provider: &str) { + self.latency_ms + .record_with_kvs(latency_ms, &self.base_kvs(network, provider)); + } + + /// Record a failed invocation with the given error class. + pub(crate) fn record_error(&self, network: &str, provider: &str, class: EthCallErrorClass) { + let kvs = [ + KeyValue::new("network", network.to_string()), + KeyValue::new("provider", provider.to_string()), + KeyValue::new("class", class.as_str().to_string()), + ]; + self.errors_total.inc_with_kvs(&kvs); + } + + /// Record a single retry attempt. + pub(crate) fn record_retry(&self, network: &str, provider: &str) { + self.retries_total + .inc_with_kvs(&self.base_kvs(network, provider)); + } + + fn base_kvs(&self, network: &str, provider: &str) -> [KeyValue; 2] { + [ + KeyValue::new("network", network.to_string()), + KeyValue::new("provider", provider.to_string()), + ] + } +} diff --git a/crates/core/common/src/udfs/eth_call/tests.rs b/crates/core/common/src/udfs/eth_call/tests.rs new file mode 100644 index 000000000..d108c563f --- /dev/null +++ b/crates/core/common/src/udfs/eth_call/tests.rs @@ -0,0 +1,185 @@ +use std::sync::{ + Arc, + atomic::{AtomicU32, Ordering}, +}; + +use alloy::{eips::BlockNumberOrTag, primitives::Bytes, rpc::json_rpc::ErrorPayload}; + +use super::{ + metrics::EthCallErrorClass, + udf::{EthCallRetryError, MAX_RETRY_ATTEMPTS, RpcCallOutcome, eth_call_with_retry}, +}; + +type TransactionRequest = ::TransactionRequest; + +fn default_req() -> TransactionRequest { + Default::default() +} + +#[tokio::test] +async fn eth_call_with_retry_on_first_attempt_succeeds() { + //* Given + let call_fn = |_req: TransactionRequest, _blk: BlockNumberOrTag| async { + RpcCallOutcome::Ok(Bytes::from_static(b"\x01\x02")) + }; + + //* When + let result = eth_call_with_retry( + call_fn, + BlockNumberOrTag::Latest, + default_req(), + "mainnet", + "test_provider", + None, + ) + .await; + + //* Then + assert!(result.result.is_ok(), "expected successful result"); + assert_eq!(result.attempts, 1, "expected exactly one attempt"); +} + +#[tokio::test] +async fn eth_call_with_retry_after_transient_errors_succeeds() { + //* Given + let call_count = Arc::new(AtomicU32::new(0)); + + //* When + let result = eth_call_with_retry( + { + let call_count = call_count.clone(); + move |_req, _blk| { + let count = call_count.fetch_add(1, Ordering::SeqCst); + async move { + if count < 2 { + RpcCallOutcome::RetryableError("transient error".into()) + } else { + RpcCallOutcome::Ok(Bytes::from_static(b"\xab")) + } + } + } + }, + BlockNumberOrTag::Latest, + default_req(), + "mainnet", + "test_provider", + None, + ) + .await; + + //* Then + assert!(result.result.is_ok(), "expected success after retries"); + assert_eq!( + result.attempts, 3, + "expected 3 attempts (2 retries + success)" + ); +} + +#[tokio::test] +async fn eth_call_with_retry_with_non_retryable_rpc_error_returns_immediately() { + //* Given + let call_count = Arc::new(AtomicU32::new(0)); + + //* When + let result = eth_call_with_retry( + { + let call_count = call_count.clone(); + move |_req, _blk| { + call_count.fetch_add(1, Ordering::SeqCst); + async { + RpcCallOutcome::NonRetryableError(ErrorPayload { + code: 3, + message: "execution reverted".into(), + data: None, + }) + } + } + }, + BlockNumberOrTag::Latest, + default_req(), + "mainnet", + "test_provider", + None, + ) + .await; + + //* Then + assert!( + matches!(result.result, Err(EthCallRetryError::RpcError(_))), + "expected non-retryable RPC error" + ); + assert_eq!( + result.attempts, 1, + "expected single attempt for non-retryable error" + ); + assert_eq!( + call_count.load(Ordering::SeqCst), + 1, + "expected call_fn invoked exactly once" + ); +} + +#[tokio::test] +async fn eth_call_with_retry_when_retries_exhausted_returns_retries_failed() { + //* Given + let call_count = Arc::new(AtomicU32::new(0)); + + //* When + let result = eth_call_with_retry( + { + let call_count = call_count.clone(); + move |_req, _blk| { + call_count.fetch_add(1, Ordering::SeqCst); + async { RpcCallOutcome::RetryableError("always fails".into()) } + } + }, + BlockNumberOrTag::Latest, + default_req(), + "mainnet", + "test_provider", + None, + ) + .await; + + //* Then + assert!( + matches!(result.result, Err(EthCallRetryError::RetriesFailed)), + "expected RetriesFailed after exhausting retries" + ); + assert_eq!( + result.attempts, MAX_RETRY_ATTEMPTS, + "expected {MAX_RETRY_ATTEMPTS} attempts" + ); + assert_eq!( + call_count.load(Ordering::SeqCst), + MAX_RETRY_ATTEMPTS, + "expected call_fn invoked {MAX_RETRY_ATTEMPTS} times" + ); +} + +#[test] +fn error_class_as_str_returns_expected_labels() { + //* When + let cases = [ + ( + EthCallErrorClass::RpcErrorNonRetryable, + "rpc_error_non_retryable", + ), + ( + EthCallErrorClass::RpcErrorRetryExhausted, + "rpc_error_retry_exhausted", + ), + (EthCallErrorClass::Timeout, "timeout"), + (EthCallErrorClass::InvalidRequest, "invalid_request"), + ( + EthCallErrorClass::NetworkNotConfigured, + "network_not_configured", + ), + (EthCallErrorClass::Other, "other"), + ]; + + //* Then + for (class, expected) in cases { + assert_eq!(class.as_str(), expected, "wrong label for {class:?}"); + } +} diff --git a/crates/core/common/src/udfs/eth_call/udf.rs b/crates/core/common/src/udfs/eth_call/udf.rs index 92edfe9d4..76a5541d4 100644 --- a/crates/core/common/src/udfs/eth_call/udf.rs +++ b/crates/core/common/src/udfs/eth_call/udf.rs @@ -1,4 +1,4 @@ -use std::{any::Any, str::FromStr}; +use std::{any::Any, str::FromStr, sync::Arc, time::Instant}; use alloy::{ eips::BlockNumberOrTag, @@ -9,6 +9,7 @@ use alloy::{ rpc::{json_rpc::ErrorPayload, types::TransactionInput}, transports::RpcError, }; +use amp_providers_common::provider_name::ProviderName; use async_trait::async_trait; use datafusion::{ arrow::{ @@ -25,10 +26,14 @@ use datafusion::{ async_udf::AsyncScalarUDFImpl, }, }; +use datasets_common::network_id::NetworkId; use itertools::izip; +use super::metrics::{EthCallErrorClass, EthCallMetrics}; use crate::plan; +pub(crate) const MAX_RETRY_ATTEMPTS: u32 = 3; + type TransactionRequest = ::TransactionRequest; /// DataFusion UDF that executes an `eth_call` against an Ethereum JSON-RPC endpoint. @@ -74,13 +79,20 @@ type TransactionRequest = ::TransactionRequ pub struct EthCall { name: String, client: alloy::providers::RootProvider, + provider_name: ProviderName, + network: NetworkId, + metrics: Option>, signature: Signature, fields: Fields, } impl PartialEq for EthCall { fn eq(&self, other: &Self) -> bool { - self.name == other.name && self.signature == other.signature && self.fields == other.fields + self.name == other.name + && self.provider_name == other.provider_name + && self.network == other.network + && self.signature == other.signature + && self.fields == other.fields } } @@ -89,20 +101,31 @@ impl Eq for EthCall {} impl std::hash::Hash for EthCall { fn hash(&self, state: &mut H) { self.name.hash(state); + self.provider_name.hash(state); + self.network.hash(state); self.signature.hash(state); self.fields.hash(state); } } impl EthCall { - /// Creates an `EthCall` UDF with the given name and RPC client. + /// Creates an `EthCall` UDF with the given name, RPC client, and observability context. /// /// The name must match the flat lookup key that DataFusion's planner constructs /// for the function reference, e.g., `rpc.mainnet.eth_call`. - pub fn new(name: String, client: alloy::providers::RootProvider) -> Self { + pub fn new( + name: String, + client: alloy::providers::RootProvider, + provider_name: ProviderName, + network: NetworkId, + metrics: Option>, + ) -> Self { EthCall { name, client, + provider_name, + network, + metrics, signature: Signature { type_signature: TypeSignature::Any(4), volatility: Volatility::Volatile, @@ -156,6 +179,9 @@ impl AsyncScalarUDFImpl for EthCall { let name = self.name().to_string(); let client = self.client.clone(); let fields = self.fields.clone(); + let network_label = self.network.as_str(); + let provider_label = self.provider_name.as_str(); + // Decode the arguments. let args: Vec<_> = ColumnarValue::values_to_arrays(&args.args)?; let [from, to, input_data, block] = args.as_slice() else { @@ -248,12 +274,39 @@ impl AsyncScalarUDFImpl for EthCall { let Some(to) = to else { return plan_err!("to address is NULL"); }; - let result = eth_call_retry( - &client, + + let block_selector = block.to_string(); + let calldata_len = input_data.map(|d| d.len()).unwrap_or(0); + let to_hex = hex::encode(to); + let from_hex = from.map(hex::encode); + + let span = tracing::info_span!( + "eth_call", + network = %network_label, + provider = %provider_label, + udf_name = %name, + block_selector_type = block_selector_type(&block), + block_selector_value = %block_selector, + to = %to_hex, + calldata_len, + attempts_total = tracing::field::Empty, + outcome = tracing::field::Empty, + error_class = tracing::field::Empty, + ); + + if let Some(metrics) = &self.metrics { + metrics.record_request(network_label, provider_label); + } + + let invocation_start = Instant::now(); + + let retry_result = eth_call_with_retry( + |req, blk| { + let c = client.clone(); + async move { classify_rpc_result(c.call(req.into()).block(blk.into()).await) } + }, block, TransactionRequest { - // `eth_call` only requires the following fields - // (https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_call) from: match from { Some(from) => Some(Address::new(from.try_into().map_err(|_| { DataFusionError::Execution(format!( @@ -276,13 +329,71 @@ impl AsyncScalarUDFImpl for EthCall { input: input_data.map(Bytes::copy_from_slice), data: None, }, - // `eth_call` does not require any other fields. ..Default::default() }, + network_label, + provider_label, + self.metrics.as_deref(), ) .await; + + let latency_ms = invocation_start.elapsed().as_secs_f64() * 1000.0; + + if let Some(metrics) = &self.metrics { + metrics.record_latency(latency_ms, network_label, provider_label); + } + + let (success, error_class, rpc_error_code, message): ( + bool, + Option, + Option, + Option, + ) = match &retry_result.result { + Ok(_) => (true, None, None, None), + Err(EthCallRetryError::RpcError(resp)) => ( + false, + Some(EthCallErrorClass::RpcErrorNonRetryable), + Some(resp.code), + Some(resp.message.to_string()), + ), + Err(EthCallRetryError::RetriesFailed) => ( + false, + Some(EthCallErrorClass::RpcErrorRetryExhausted), + None, + Some("unexpected rpc error".to_string()), + ), + }; + + if let (Some(metrics), Some(class)) = (&self.metrics, error_class) { + metrics.record_error(network_label, provider_label, class); + } + + span.record("attempts_total", retry_result.attempts); + span.record("outcome", if success { "ok" } else { "error" }); + if let Some(class) = error_class { + span.record("error_class", class.as_str()); + } + + let truncated_message = message.as_deref().map(|m| &m[..m.len().min(500)]); + tracing::debug!( + network = %network_label, + provider = %provider_label, + udf_name = %name, + block_selector = %block_selector, + to = %to_hex, + from = ?from_hex, + calldata_len_bytes = calldata_len, + success, + attempts_total = retry_result.attempts, + latency_ms, + error_class = error_class.map(|c| c.as_str()), + rpc_error_code, + error = truncated_message, + "eth_call invocation completed" + ); + // Build the response row. - match result { + match retry_result.result { Ok(bytes) => { result_builder .field_builder::(0) @@ -349,30 +460,127 @@ impl AsyncScalarUDFImpl for EthCall { } } -async fn eth_call_retry( - client: &alloy::providers::RootProvider, +// --------------------------------------------------------------------------- +// Retry logic +// --------------------------------------------------------------------------- + +pub(crate) struct EthCallRetryResult { + pub(crate) result: Result, + pub(crate) attempts: u32, +} + +pub(crate) enum EthCallRetryError { + RpcError(ErrorPayload), + RetriesFailed, +} + +/// Outcome of a single RPC call attempt, abstracting over transport details. +/// +/// Used to decouple retry logic from the concrete alloy transport so that +/// unit tests can supply scripted responses. +pub(crate) enum RpcCallOutcome { + Ok(Bytes), + NonRetryableError(ErrorPayload), + RetryableError(String), +} + +/// Execute an eth_call with retries, recording per-attempt metrics and logs. +/// +/// `call_fn` is invoked for each attempt. In production it wraps +/// `client.call(...)`. In tests it returns scripted [`RpcCallOutcome`]s. +pub(crate) async fn eth_call_with_retry( + call_fn: F, block: BlockNumberOrTag, req: TransactionRequest, -) -> Result { - for _ in 0..3 { - let result = client.call(req.clone().into()).block(block.into()).await; - match result { - Ok(bytes) => { - return Ok(bytes); + network: &str, + provider: &str, + metrics: Option<&EthCallMetrics>, +) -> EthCallRetryResult +where + F: Fn(TransactionRequest, BlockNumberOrTag) -> Fut, + Fut: std::future::Future, +{ + for attempt in 1..=MAX_RETRY_ATTEMPTS { + let attempt_start = Instant::now(); + let outcome = call_fn(req.clone(), block).await; + let attempt_latency_ms = attempt_start.elapsed().as_secs_f64() * 1000.0; + + match outcome { + RpcCallOutcome::Ok(bytes) => { + return EthCallRetryResult { + result: Ok(bytes), + attempts: attempt, + }; } - Err(RpcError::ErrorResp(resp)) if [3, -32000].contains(&resp.code) => { - return Err(EthCallRetryError::RpcError(resp)); + RpcCallOutcome::NonRetryableError(resp) => { + tracing::debug!( + attempt, + network, + provider, + latency_ms_attempt = attempt_latency_ms, + error_class_attempt = "rpc_error_non_retryable", + rpc_error_code_attempt = resp.code, + error = %resp.message, + will_retry = false, + "eth_call attempt returned non-retryable error" + ); + return EthCallRetryResult { + result: Err(EthCallRetryError::RpcError(resp)), + attempts: attempt, + }; } - other => { - tracing::info!("unexpected RPC error: {other:?}, retrying"); + RpcCallOutcome::RetryableError(err_msg) => { + let will_retry = attempt < MAX_RETRY_ATTEMPTS; + + tracing::debug!( + attempt, + network, + provider, + latency_ms_attempt = attempt_latency_ms, + error_class_attempt = "retryable", + will_retry, + error = %err_msg, + "eth_call attempt failed with retryable error" + ); + + if let Some(metrics) = metrics { + metrics.record_retry(network, provider); + } + + if !will_retry { + tracing::warn!( + attempts = MAX_RETRY_ATTEMPTS, + network, + provider, + error = %err_msg, + "eth_call retries exhausted" + ); + } } } } - tracing::info!("RPC error: retries failed for request {req:?}"); - Err(EthCallRetryError::RetriesFailed) + EthCallRetryResult { + result: Err(EthCallRetryError::RetriesFailed), + attempts: MAX_RETRY_ATTEMPTS, + } } -enum EthCallRetryError { - RpcError(ErrorPayload), - RetriesFailed, +/// Map an alloy `RpcError` to our transport-agnostic [`RpcCallOutcome`]. +fn classify_rpc_result( + result: Result>, +) -> RpcCallOutcome { + match result { + Ok(bytes) => RpcCallOutcome::Ok(bytes), + Err(RpcError::ErrorResp(resp)) if [3, -32000].contains(&resp.code) => { + RpcCallOutcome::NonRetryableError(resp) + } + Err(err) => RpcCallOutcome::RetryableError(format!("{err:?}")), + } +} + +fn block_selector_type(block: &BlockNumberOrTag) -> &'static str { + match block { + BlockNumberOrTag::Number(_) => "number", + _ => "tag", + } } diff --git a/crates/core/common/tests/it_session_async_resolution.rs b/crates/core/common/tests/it_session_async_resolution.rs index 29d236775..c3ab0c3d2 100644 --- a/crates/core/common/tests/it_session_async_resolution.rs +++ b/crates/core/common/tests/it_session_async_resolution.rs @@ -394,7 +394,7 @@ async fn exec_statement_to_plan_with_qualified_function_uses_async_pre_resolutio ProviderConfigsStore::new(Arc::new(InMemory::new()) as Arc); let providers_registry = ProvidersRegistry::new(provider_configs); let datasets_cache = DatasetsCache::new(datasets_registry); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry, None); let runtime_env: Arc = Default::default(); let exec_env = ExecEnv { @@ -496,7 +496,7 @@ async fn exec_statement_to_plan_with_overlapping_async_and_physical_tables_succe ProviderConfigsStore::new(Arc::new(InMemory::new()) as Arc); let providers_registry = ProvidersRegistry::new(provider_configs); let datasets_cache = DatasetsCache::new(datasets_registry); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry, None); let runtime_env: Arc = Default::default(); let exec_env = ExecEnv { diff --git a/tests/src/main.rs b/tests/src/main.rs index c4f5d4a56..338bd2e56 100644 --- a/tests/src/main.rs +++ b/tests/src/main.rs @@ -195,7 +195,7 @@ async fn main() { let datasets_registry = DatasetsRegistry::new(sysdb.conn_pool().clone(), dataset_manifests_store); let datasets_cache = DatasetsCache::new(datasets_registry.clone()); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone()); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), None); ( datasets_cache, ethcall_udfs_cache, diff --git a/tests/src/testlib/ctx.rs b/tests/src/testlib/ctx.rs index 1d5b6789d..9b541c674 100644 --- a/tests/src/testlib/ctx.rs +++ b/tests/src/testlib/ctx.rs @@ -423,7 +423,7 @@ impl TestCtxBuilder { (datasets_registry, providers_registry) }; let datasets_cache = DatasetsCache::new(datasets_registry.clone()); - let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone()); + let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), None); // Create Anvil fixture (if enabled) and capture provider config for later registration let (anvil, anvil_provider_config) = match self.anvil_fixture {