Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bin/ampd/src/controller_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn run(
(datasets_registry, providers_registry)
};
let datasets_cache = DatasetsCache::new(datasets_registry.clone());
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone());
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), meter.as_ref());

let (addr, server) = controller::service::new(
build_info,
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/ampd/src/server_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn run(
let isolate_pool = IsolatePool::new();

let datasets_cache = DatasetsCache::new(datasets_registry);
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry);
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry, meter.as_ref());

let server_config = config_from_common(&config);

Expand Down
2 changes: 1 addition & 1 deletion crates/bin/ampd/src/solo_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub async fn run(
let isolate_pool = IsolatePool::new();

let datasets_cache = DatasetsCache::new(datasets_registry.clone());
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone());
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), meter.as_ref());

// Spawn controller (Admin API) if enabled
let controller_fut: ControllerFuture = if admin_server {
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/ampd/src/worker_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn run(
let isolate_pool = IsolatePool::new();

let datasets_cache = DatasetsCache::new(datasets_registry.clone());
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone());
let ethcall_udfs_cache = EthCallUdfsCache::new(providers_registry.clone(), meter.as_ref());

// Convert common config to worker-specific config
let worker_config = config_from_common(&config);
Expand Down
2 changes: 2 additions & 0 deletions crates/core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ alloy.workspace = true
amp-data-store = { path = "../data-store" }
amp-datasets-registry = { path = "../datasets-registry" }
amp-parquet = { path = "../parquet" }
amp-providers-common = { path = "../providers-common" }
amp-providers-registry = { path = "../providers-registry" }
arrow.workspace = true
async-stream.workspace = true
Expand All @@ -27,6 +28,7 @@ futures.workspace = true
itertools.workspace = true
js-runtime = { path = "../js-runtime" }
metadata-db = { path = "../metadata-db" }
monitoring = { path = "../monitoring" }
object_store.workspace = true
parking_lot = "0.12.4"
rand.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions crates/core/common/src/udfs/eth_call.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
mod cache;
mod metrics;
mod udf;

pub use cache::{EthCallForNetworkError, EthCallUdfsCache};
pub use metrics::{EthCallErrorClass, EthCallMetrics};
pub use udf::EthCall;

#[cfg(test)]
mod tests;
36 changes: 28 additions & 8 deletions crates/core/common/src/udfs/eth_call/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use datafusion::{
logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF},
};
use datasets_common::network_id::NetworkId;
use monitoring::telemetry::metrics::Meter;
use parking_lot::RwLock;

use super::udf::EthCall;
use super::{metrics::EthCallMetrics, udf::EthCall};

/// Manages creation and caching of `eth_call` scalar UDFs keyed by network.
///
Expand All @@ -22,6 +23,7 @@ use super::udf::EthCall;
pub struct EthCallUdfsCache {
registry: ProvidersRegistry,
cache: Arc<RwLock<HashMap<NetworkId, ScalarUDF>>>,
metrics: Option<Arc<EthCallMetrics>>,
}

impl std::fmt::Debug for EthCallUdfsCache {
Expand All @@ -32,10 +34,14 @@ impl std::fmt::Debug for EthCallUdfsCache {

impl EthCallUdfsCache {
/// Creates a new EthCall UDFs cache.
pub fn new(registry: ProvidersRegistry) -> Self {
///
/// If a `meter` is provided, metrics will be recorded for every eth_call invocation.
pub fn new(registry: ProvidersRegistry, meter: Option<&Meter>) -> Self {
let metrics = meter.map(|m| Arc::new(EthCallMetrics::new(m)));
Self {
registry,
cache: Default::default(),
metrics,
}
}

Expand All @@ -61,7 +67,7 @@ impl EthCallUdfsCache {
// TODO: Always selects the first provider. Rotation across retries would
// require rethinking the cache key (currently NetworkId only) since the
// cached UDF holds a reference to a specific provider.
let Some((name, config)) = self
let Some((provider_name, config)) = self
.registry
.find_providers(EvmRpcProviderKind, network)
.await
Expand All @@ -72,16 +78,30 @@ impl EthCallUdfsCache {
provider_network = %network,
"no EVM RPC provider found for network"
);
if let Some(metrics) = &self.metrics {
metrics.record_error(
network.as_str(),
"unknown",
super::metrics::EthCallErrorClass::NetworkNotConfigured,
);
}
return Err(EthCallForNetworkError::ProviderNotFound {
network: network.clone(),
});
};
let provider = amp_providers_registry::create_evm_rpc_client(name.to_string(), config)
.await
.map_err(EthCallForNetworkError::ProviderCreation)?;
let provider =
amp_providers_registry::create_evm_rpc_client(provider_name.to_string(), config)
.await
.map_err(EthCallForNetworkError::ProviderCreation)?;

let udf = AsyncScalarUDF::new(Arc::new(EthCall::new(udf_name.to_string(), provider)))
.into_scalar_udf();
let udf = AsyncScalarUDF::new(Arc::new(EthCall::new(
udf_name.to_string(),
provider,
provider_name,
network.clone(),
self.metrics.clone(),
)))
.into_scalar_udf();

self.cache.write().insert(network.clone(), udf.clone());

Expand Down
111 changes: 111 additions & 0 deletions crates/core/common/src/udfs/eth_call/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//! Metrics for eth_call UDF observability.

use monitoring::telemetry::metrics::{Counter, Histogram, KeyValue, Meter};

/// Error classification for eth_call metrics.
#[derive(Debug, Clone, Copy)]
pub enum EthCallErrorClass {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this error enum in the metrics module? Shouldn't it be co-located with the logic reporting this error?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why isn't this deriving thiserror::Error?

/// JSON-RPC error codes 3 or -32000 (non-retryable).
RpcErrorNonRetryable,
/// All retry attempts exhausted.
RpcErrorRetryExhausted,
/// Request timed out.
Timeout,
/// Invalid arguments (bad address, calldata, block).
InvalidRequest,
/// No provider configured for the requested network.
NetworkNotConfigured,
/// Unclassified error.
Other,
}

impl EthCallErrorClass {
/// Returns the string label used in metric attributes.
pub fn as_str(&self) -> &'static str {
match self {
Self::RpcErrorNonRetryable => "rpc_error_non_retryable",
Self::RpcErrorRetryExhausted => "rpc_error_retry_exhausted",
Self::Timeout => "timeout",
Self::InvalidRequest => "invalid_request",
Self::NetworkNotConfigured => "network_not_configured",
Self::Other => "other",
}
}
}

/// Metrics registry for eth_call UDF observability.
#[derive(Debug, Clone)]
pub struct EthCallMetrics {
/// Total eth_call UDF invocations.
requests_total: Counter,
/// Wall-clock latency including retries (milliseconds).
latency_ms: Histogram<f64>,
/// Failed invocations by error class.
errors_total: Counter,
/// Individual retry attempts.
retries_total: Counter,
}

impl EthCallMetrics {
/// Creates a new metrics registry from the given meter.
pub fn new(meter: &Meter) -> Self {
Self {
requests_total: Counter::new(
meter,
"eth_call_requests_total",
"Total number of eth_call UDF invocations",
),
latency_ms: Histogram::new_f64(
meter,
"eth_call_latency_ms",
"Wall-clock latency of eth_call including retries",
"milliseconds",
),
errors_total: Counter::new(
meter,
"eth_call_errors_total",
"Total number of failed eth_call invocations",
),
retries_total: Counter::new(
meter,
"eth_call_retries_total",
"Total number of eth_call retry attempts",
),
}
}

/// Record a single eth_call request.
pub(crate) fn record_request(&self, network: &str, provider: &str) {
self.requests_total
.inc_with_kvs(&self.base_kvs(network, provider));
}

/// Record wall-clock latency for a completed invocation.
pub(crate) fn record_latency(&self, latency_ms: f64, network: &str, provider: &str) {
self.latency_ms
.record_with_kvs(latency_ms, &self.base_kvs(network, provider));
}

/// Record a failed invocation with the given error class.
pub(crate) fn record_error(&self, network: &str, provider: &str, class: EthCallErrorClass) {
let kvs = [
KeyValue::new("network", network.to_string()),
KeyValue::new("provider", provider.to_string()),
KeyValue::new("class", class.as_str().to_string()),
];
self.errors_total.inc_with_kvs(&kvs);
}

/// Record a single retry attempt.
pub(crate) fn record_retry(&self, network: &str, provider: &str) {
self.retries_total
.inc_with_kvs(&self.base_kvs(network, provider));
}

fn base_kvs(&self, network: &str, provider: &str) -> [KeyValue; 2] {
[
KeyValue::new("network", network.to_string()),
KeyValue::new("provider", provider.to_string()),
]
}
}
Loading
Loading