From 20b235ac5704b9f64ff0c4f28775e879335272e6 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 26 Mar 2026 14:13:09 +0200 Subject: [PATCH 1/3] ethereum, node: Per-chain RPC settings via TOML config Introduce a ChainSettings struct that holds Ethereum RPC tuning parameters (timeouts, retries, batch sizes, block ranges, etc.) per chain instead of reading them from global ENV_VARS. Settings are parsed from [chains.] TOML sections with serde defaults falling back to the existing environment variables, preserving full backwards compatibility. Thread ChainSettings through Chain, EthereumAdapter, and the free receipt-fetching functions so each chain can be independently tuned. --- chain/ethereum/src/adapter.rs | 11 +- chain/ethereum/src/chain.rs | 52 +++++++-- chain/ethereum/src/ethereum_adapter.rs | 153 ++++++++++++++++--------- chain/ethereum/src/network.rs | 11 +- gnd/src/commands/test/runner.rs | 7 +- node/resources/tests/full_config.toml | 10 ++ node/src/chain.rs | 52 +++++++-- node/src/config.rs | 143 +++++++++++++++++++++-- node/src/network_setup.rs | 9 +- tests/src/fixture/ethereum.rs | 4 +- 10 files changed, 356 insertions(+), 96 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 34b360d9674..f829f161b51 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -364,7 +364,7 @@ pub struct EthereumLogFilter { impl From for Vec { fn from(val: EthereumLogFilter) -> Self { - val.eth_get_logs_filters() + val.eth_get_logs_filters(ENV_VARS.get_logs_max_contracts) .map( |EthGetLogsFilter { contracts, @@ -546,7 +546,10 @@ impl EthereumLogFilter { /// Filters for `eth_getLogs` calls. The filters will not return false positives. This attempts /// to balance between having granular filters but too many calls and having few calls but too /// broad filters causing the Ethereum endpoint to timeout. - pub fn eth_get_logs_filters(self) -> impl Iterator { + pub fn eth_get_logs_filters( + self, + get_logs_max_contracts: usize, + ) -> impl Iterator { let mut filters = Vec::new(); // Start with the wildcard event filters. @@ -596,7 +599,7 @@ impl EthereumLogFilter { for neighbor in g.neighbors(max_vertex) { match neighbor { LogFilterNode::Contract(address) => { - if filter.contracts.len() == ENV_VARS.get_logs_max_contracts { + if filter.contracts.len() == get_logs_max_contracts { // The batch size was reached, register the filter and start a new one. let event = filter.event_signatures[0]; push_filter(filter); @@ -1766,7 +1769,7 @@ fn complete_log_filter() { wildcard_events: HashMap::new(), events_with_topic_filters: HashMap::new(), } - .eth_get_logs_filters() + .eth_get_logs_filters(ENV_VARS.get_logs_max_contracts) .collect(); // Assert that a contract or event is filtered on iff it was present in the graph. diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 14119ddc8fa..e04c5371b0f 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -68,6 +68,44 @@ use graph::blockchain::block_stream::{ /// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320]; +/// Resolved per-chain settings. Populated at chain initialisation from the config file (with +/// ENV_VAR fallbacks) and stored on [`Chain`] and [`crate::EthereumAdapter`]. +#[derive(Clone, Debug)] +pub struct ChainSettings { + pub polling_interval: Duration, + pub json_rpc_timeout: Duration, + pub request_retries: usize, + pub max_block_range_size: BlockNumber, + pub block_batch_size: usize, + pub block_ptr_batch_size: usize, + pub max_event_only_range: BlockNumber, + pub target_triggers_per_block_range: u64, + pub get_logs_max_contracts: usize, + pub block_ingestor_max_concurrent_json_rpc_calls: usize, + pub genesis_block_number: u64, +} + +impl ChainSettings { + /// Constructs a [`ChainSettings`] from environment variable defaults. + /// Used in tests and for firehose-only chains that have no RPC config. + pub fn from_env_defaults() -> Self { + ChainSettings { + polling_interval: graph::env::ENV_VARS.ingestor_polling_interval, + json_rpc_timeout: ENV_VARS.json_rpc_timeout, + request_retries: ENV_VARS.request_retries, + max_block_range_size: ENV_VARS.max_block_range_size, + block_batch_size: ENV_VARS.block_batch_size, + block_ptr_batch_size: ENV_VARS.block_ptr_batch_size, + max_event_only_range: ENV_VARS.max_event_only_range, + target_triggers_per_block_range: ENV_VARS.target_triggers_per_block_range, + get_logs_max_contracts: ENV_VARS.get_logs_max_contracts, + block_ingestor_max_concurrent_json_rpc_calls: ENV_VARS + .block_ingestor_max_concurrent_json_rpc_calls, + genesis_block_number: ENV_VARS.genesis_block_number, + } + } +} + pub struct EthereumStreamBuilder {} #[async_trait] @@ -192,9 +230,9 @@ impl BlockStreamBuilder for EthereumStreamBuilder { }; let max_block_range_size = if is_using_subgraph_composition { - ENV_VARS.max_block_range_size * 10 + chain.settings.max_block_range_size * 10 } else { - ENV_VARS.max_block_range_size + chain.settings.max_block_range_size }; Ok(Box::new(PollingBlockStream::new( @@ -206,7 +244,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { reorg_threshold, logger, max_block_range_size, - ENV_VARS.target_triggers_per_block_range, + chain.settings.target_triggers_per_block_range, unified_api_version, subgraph_current_block, ))) @@ -325,13 +363,13 @@ pub struct Chain { call_cache: Arc, chain_head_update_listener: Arc, reorg_threshold: BlockNumber, - polling_ingestor_interval: Duration, pub is_ingestible: bool, block_stream_builder: Arc>, block_refetcher: Arc>, adapter_selector: Arc>, runtime_adapter_builder: Arc, eth_adapters: Arc, + pub settings: Arc, } impl std::fmt::Debug for Chain { @@ -388,8 +426,8 @@ impl Chain { runtime_adapter_builder: Arc, eth_adapters: Arc, reorg_threshold: BlockNumber, - polling_ingestor_interval: Duration, is_ingestible: bool, + settings: Arc, ) -> Self { Chain { logger_factory, @@ -406,7 +444,7 @@ impl Chain { eth_adapters, reorg_threshold, is_ingestible, - polling_ingestor_interval, + settings, } } @@ -637,7 +675,7 @@ impl Blockchain for Chain { graph::env::ENV_VARS.reorg_threshold(), self.chain_client(), self.chain_store.cheap_clone(), - self.polling_ingestor_interval, + self.settings.polling_interval, self.name.clone(), )?) } diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index a5ab31ae73e..5373255cf0f 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -57,7 +57,7 @@ use std::convert::TryFrom; use std::iter::FromIterator; use std::pin::Pin; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::sync::RwLock; use tokio::time::timeout; @@ -66,6 +66,7 @@ use crate::adapter::EthereumRpcError; use crate::adapter::ProviderStatus; use crate::call_helper::interpret_eth_call_error; use crate::chain::BlockFinality; +use crate::chain::ChainSettings; use crate::trigger::{LogPosition, LogRef}; use crate::Chain; use crate::NodeCapabilities; @@ -98,6 +99,7 @@ pub struct EthereumAdapter { supports_eip_1898: bool, call_only: bool, supports_block_receipts: Arc>>, + pub(crate) settings: Arc, } impl std::fmt::Debug for EthereumAdapter { @@ -110,6 +112,7 @@ impl std::fmt::Debug for EthereumAdapter { .field("supports_eip_1898", &self.supports_eip_1898) .field("call_only", &self.call_only) .field("supports_block_receipts", &self.supports_block_receipts) + .field("settings", &self.settings) .finish() } } @@ -124,6 +127,7 @@ impl CheapClone for EthereumAdapter { supports_eip_1898: self.supports_eip_1898, call_only: self.call_only, supports_block_receipts: self.supports_block_receipts.cheap_clone(), + settings: self.settings.clone(), } } } @@ -163,6 +167,7 @@ impl EthereumAdapter { provider_metrics: Arc, supports_eip_1898: bool, call_only: bool, + settings: Arc, ) -> Self { let alloy = match &transport { Transport::RPC(client) => Arc::new( @@ -197,6 +202,7 @@ impl EthereumAdapter { supports_eip_1898, call_only, supports_block_receipts: Arc::new(RwLock::new(None)), + settings, } } @@ -216,8 +222,8 @@ impl EthereumAdapter { retry(retry_log_message, &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(self.settings.request_retries) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let eth = eth.clone(); let logger = logger.clone(); @@ -410,8 +416,8 @@ impl EthereumAdapter { .any(|f| e.to_string().contains(f)), }, ) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(self.settings.request_retries) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let eth_adapter = eth_adapter.cheap_clone(); let subgraph_metrics = subgraph_metrics.clone(); @@ -470,6 +476,7 @@ impl EthereumAdapter { ranges }; + let block_batch_size = self.settings.block_batch_size; let eth = self; futures03::stream::iter(ranges.into_iter().map(move |(start, end)| { @@ -489,7 +496,7 @@ impl EthereumAdapter { .await } })) - .buffered(ENV_VARS.block_batch_size) + .buffered(block_batch_size) .map_ok(|traces| futures03::stream::iter(traces.into_iter().map(Ok))) .try_flatten() } @@ -525,7 +532,7 @@ impl EthereumAdapter { let step = match filter.contracts.is_empty() { // `to - from + 1` blocks will be scanned. false => to - from, - true => (to - from).min(ENV_VARS.max_event_only_range - 1), + true => (to - from).min(self.settings.max_event_only_range - 1), }; // Typically this will loop only once and fetch the entire range in one request. But if the @@ -610,8 +617,8 @@ impl EthereumAdapter { retry(retry_log_message, &logger) .redact_log_urls(true) .when(|result| result.is_err()) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(self.settings.request_retries) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -640,8 +647,8 @@ impl EthereumAdapter { retry(retry_log_message, &logger) .redact_log_urls(true) .when(|result| result.is_err()) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(self.settings.request_retries) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -669,8 +676,8 @@ impl EthereumAdapter { let retry_log_message = format!("eth_call RPC call for block {}", block_ptr); retry(retry_log_message, &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(self.settings.request_retries) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let call_data = call_data.clone(); let alloy = alloy.cheap_clone(); @@ -736,6 +743,8 @@ impl EthereumAdapter { ids: Vec, ) -> impl futures03::Stream, Error>> + Send { let alloy = self.alloy.clone(); + let request_retries = self.settings.request_retries; + let json_rpc_timeout_secs = self.settings.json_rpc_timeout.as_secs(); futures03::stream::iter(ids.into_iter().map(move |hash| { let alloy = alloy.clone(); @@ -744,8 +753,8 @@ impl EthereumAdapter { async move { retry(format!("load block {}", hash), &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(request_retries) + .timeout_secs(json_rpc_timeout_secs) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -774,7 +783,7 @@ impl EthereumAdapter { }) } })) - .buffered(ENV_VARS.block_batch_size) + .buffered(self.settings.block_batch_size) } /// Request blocks by number through JSON-RPC. @@ -784,6 +793,8 @@ impl EthereumAdapter { numbers: Vec, ) -> impl futures03::Stream, Error>> + Send { let alloy = self.alloy.clone(); + let request_retries = self.settings.request_retries; + let json_rpc_timeout_secs = self.settings.json_rpc_timeout.as_secs(); futures03::stream::iter(numbers.into_iter().map(move |number| { let alloy = alloy.clone(); @@ -792,8 +803,8 @@ impl EthereumAdapter { async move { retry(format!("load block {}", number), &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(request_retries) + .timeout_secs(json_rpc_timeout_secs) .run(move || { let alloy = alloy.cheap_clone(); @@ -834,7 +845,7 @@ impl EthereumAdapter { }) } })) - .buffered(ENV_VARS.block_ptr_batch_size) + .buffered(self.settings.block_ptr_batch_size) } /// Request blocks ptrs for numbers through JSON-RPC. @@ -846,6 +857,7 @@ impl EthereumAdapter { block_nums: Vec, ) -> impl Stream + Send { let alloy = self.alloy.clone(); + let json_rpc_timeout_secs = self.settings.json_rpc_timeout.as_secs(); stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| { let alloy = alloy.clone(); @@ -853,7 +865,7 @@ impl EthereumAdapter { .redact_log_urls(true) .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .timeout_secs(json_rpc_timeout_secs) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -879,7 +891,7 @@ impl EthereumAdapter { } }) })) - .buffered(ENV_VARS.block_batch_size) + .buffered(self.settings.block_batch_size) .filter_map(|b| b) .map(|b| BlockPtr::from((b.header.hash, b.header.number))) } @@ -918,17 +930,22 @@ impl EthereumAdapter { let eth: Self = self.cheap_clone(); let logger = self.provider_logger(logger); - futures03::stream::iter(log_filter.eth_get_logs_filters().map(move |filter| { - eth.cheap_clone().log_stream( - logger.cheap_clone(), - subgraph_metrics.cheap_clone(), - from, - to, - filter, - ) - })) + let max_contracts = eth.settings.get_logs_max_contracts; + futures03::stream::iter( + log_filter + .eth_get_logs_filters(max_contracts) + .map(move |filter| { + eth.cheap_clone().log_stream( + logger.cheap_clone(), + subgraph_metrics.cheap_clone(), + from, + to, + filter, + ) + }), + ) // Real limits on the number of parallel requests are imposed within the adapter. - .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls) + .buffered(self.settings.block_ingestor_max_concurrent_json_rpc_calls) .try_concat() .boxed() } @@ -1136,7 +1153,7 @@ impl EthereumAdapter { retry("chain_id RPC call", &logger) .redact_log_urls(true) .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); async move { alloy.get_chain_id().await.map_err(Error::from) } @@ -1193,9 +1210,10 @@ impl EthereumAdapterTrait for EthereumAdapter { let alloy_provider = self.alloy.clone(); let metrics = self.metrics.clone(); let provider = self.provider().to_string(); + let genesis_block_number = self.settings.genesis_block_number; let retry_log_message = format!( "eth_getBlockByNumber({}, false) RPC call", - ENV_VARS.genesis_block_number + genesis_block_number ); let gen_block_hash_future = retry(retry_log_message, &logger) .redact_log_urls(true) @@ -1208,7 +1226,7 @@ impl EthereumAdapterTrait for EthereumAdapter { async move { alloy_genesis .get_block_by_number(alloy::rpc::types::BlockNumberOrTag::Number( - ENV_VARS.genesis_block_number, + genesis_block_number, )) .await .inspect_err(|_| { @@ -1247,7 +1265,7 @@ impl EthereumAdapterTrait for EthereumAdapter { retry("eth_getBlockByNumber(latest) no txs RPC call", logger) .redact_log_urls(true) .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -1284,8 +1302,8 @@ impl EthereumAdapterTrait for EthereumAdapter { retry(retry_log_message, &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(self.settings.request_retries) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -1318,7 +1336,7 @@ impl EthereumAdapterTrait for EthereumAdapter { retry(retry_log_message, &logger) .redact_log_urls(true) .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.clone(); async move { @@ -1372,12 +1390,19 @@ impl EthereumAdapterTrait for EthereumAdapter { ) .await; - fetch_receipts_with_retry(alloy, hashes, block_hash, logger, supports_block_receipts) - .await - .map(|transaction_receipts| EthereumBlock { - block: Arc::new(LightEthereumBlock::new(block)), - transaction_receipts, - }) + fetch_receipts_with_retry( + alloy, + hashes, + block_hash, + logger, + supports_block_receipts, + &self.settings, + ) + .await + .map(|transaction_receipts| EthereumBlock { + block: Arc::new(LightEthereumBlock::new(block)), + transaction_receipts, + }) } async fn get_balance( @@ -1427,7 +1452,7 @@ impl EthereumAdapterTrait for EthereumAdapter { .redact_log_urls(true) .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .timeout_secs(self.settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); async move { @@ -2203,6 +2228,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry( hashes: Vec, block_hash: B256, logger: ProviderLogger, + settings: &ChainSettings, ) -> Result>, IngestorError> { let retry_log_message = format!( "batch eth_getTransactionReceipt RPC call for block {:?}", @@ -2210,9 +2236,9 @@ async fn fetch_transaction_receipts_in_batch_with_retry( ); retry(retry_log_message, &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) + .limit(settings.request_retries) .no_logging() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .timeout_secs(settings.json_rpc_timeout.as_secs()) .run(move || { let alloy = alloy.cheap_clone(); let hashes = hashes.clone(); @@ -2319,11 +2345,12 @@ async fn fetch_receipts_with_retry( block_hash: B256, logger: ProviderLogger, supports_block_receipts: bool, + settings: &ChainSettings, ) -> Result>, IngestorError> { if supports_block_receipts { - return fetch_block_receipts_with_retry(alloy, hashes, block_hash, logger).await; + return fetch_block_receipts_with_retry(alloy, hashes, block_hash, logger, settings).await; } - fetch_individual_receipts_with_retry(alloy, hashes, block_hash, logger).await + fetch_individual_receipts_with_retry(alloy, hashes, block_hash, logger, settings).await } // Fetches receipts for each transaction in the block individually. @@ -2332,12 +2359,19 @@ async fn fetch_individual_receipts_with_retry( hashes: Vec, block_hash: B256, logger: ProviderLogger, + settings: &ChainSettings, ) -> Result>, IngestorError> { if ENV_VARS.fetch_receipts_in_batches { - return fetch_transaction_receipts_in_batch_with_retry(alloy, hashes, block_hash, logger) - .await; + return fetch_transaction_receipts_in_batch_with_retry( + alloy, hashes, block_hash, logger, settings, + ) + .await; } + let request_retries = settings.request_retries; + let json_rpc_timeout = settings.json_rpc_timeout; + let concurrent_requests = settings.block_ingestor_max_concurrent_json_rpc_calls; + // Use a stream to fetch receipts individually let hash_stream = tokio_stream::iter(hashes); let receipt_stream = hash_stream @@ -2347,9 +2381,11 @@ async fn fetch_individual_receipts_with_retry( tx_hash, block_hash, logger.cheap_clone(), + request_retries, + json_rpc_timeout, ) }) - .buffered(ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls); + .buffered(concurrent_requests); tokio_stream::StreamExt::collect::>, IngestorError>>( receipt_stream, @@ -2363,6 +2399,7 @@ async fn fetch_block_receipts_with_retry( hashes: Vec, block_hash: B256, logger: ProviderLogger, + settings: &ChainSettings, ) -> Result>, IngestorError> { use graph::prelude::alloy::rpc::types::BlockId; let retry_log_message = format!("eth_getBlockReceipts RPC call for block {:?}", block_hash); @@ -2370,8 +2407,8 @@ async fn fetch_block_receipts_with_retry( // Perform the retry operation let receipts_option = retry(retry_log_message, &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(settings.request_retries) + .timeout_secs(settings.json_rpc_timeout.as_secs()) .run(move || alloy.get_block_receipts(BlockId::from(block_hash)).boxed()) .await .map_err(|_timeout| -> IngestorError { anyhow!(block_hash).into() })?; @@ -2407,6 +2444,8 @@ async fn fetch_transaction_receipt_with_retry( transaction_hash: B256, block_hash: B256, logger: ProviderLogger, + request_retries: usize, + json_rpc_timeout: Duration, ) -> Result, IngestorError> { let retry_log_message = format!( "eth_getTransactionReceipt RPC call for transaction {:?}", @@ -2415,8 +2454,8 @@ async fn fetch_transaction_receipt_with_retry( retry(retry_log_message, &logger) .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .limit(request_retries) + .timeout_secs(json_rpc_timeout.as_secs()) .run(move || { let alloy_clone = alloy.clone(); async move { alloy_clone.get_transaction_receipt(transaction_hash).await }.boxed() @@ -2593,6 +2632,8 @@ async fn get_transaction_receipts_for_transaction_hashes( *transaction_hash, *block_hash, logger.cheap_clone(), + adapter.settings.request_retries, + adapter.settings.json_rpc_timeout, ); receipt_futures.push(receipt_future) } diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 98240ddea07..9f3d09c21cb 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -326,7 +326,8 @@ mod tests { use std::sync::Arc; use crate::{ - Compression, EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport, + chain::ChainSettings, Compression, EthereumAdapter, EthereumAdapterTrait, + ProviderEthRpcMetrics, Transport, }; use super::{EthereumNetworkAdapter, EthereumNetworkAdapters, NodeCapabilities}; @@ -410,6 +411,7 @@ mod tests { provider_metrics.clone(), true, true, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -422,6 +424,7 @@ mod tests { provider_metrics.clone(), true, false, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -515,6 +518,7 @@ mod tests { provider_metrics.clone(), true, true, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -527,6 +531,7 @@ mod tests { provider_metrics.clone(), true, false, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -588,6 +593,7 @@ mod tests { provider_metrics.clone(), true, true, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -600,6 +606,7 @@ mod tests { provider_metrics.clone(), true, false, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -654,6 +661,7 @@ mod tests { provider_metrics.clone(), true, false, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -942,6 +950,7 @@ mod tests { provider_metrics.clone(), true, call_only, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ) diff --git a/gnd/src/commands/test/runner.rs b/gnd/src/commands/test/runner.rs index a4694198844..a02502e1f16 100644 --- a/gnd/src/commands/test/runner.rs +++ b/gnd/src/commands/test/runner.rs @@ -44,7 +44,8 @@ use graph::slog::{info, o, Logger}; use graph_chain_ethereum::chain::EthereumRuntimeAdapterBuilder; use graph_chain_ethereum::network::{EthereumNetworkAdapter, EthereumNetworkAdapters}; use graph_chain_ethereum::{ - Chain, EthereumAdapter, NodeCapabilities, ProviderEthRpcMetrics, Transport, + chain::ChainSettings, Chain, EthereumAdapter, NodeCapabilities, ProviderEthRpcMetrics, + Transport, }; use graph_core::polling_monitor::{arweave_service, ipfs_service}; use graph_graphql::prelude::GraphQlRunner; @@ -521,6 +522,7 @@ async fn setup_chain( provider_metrics, true, false, + Arc::new(ChainSettings::from_env_defaults()), ) .await, ); @@ -545,6 +547,7 @@ async fn setup_chain( None, )); + let chain_settings = Arc::new(ChainSettings::from_env_defaults()); let chain = Chain::new( logger_factory, stores.network_name.clone(), @@ -563,8 +566,8 @@ async fn setup_chain( Arc::new(EthereumRuntimeAdapterBuilder {}), eth_adapters, graph::prelude::ENV_VARS.reorg_threshold(), - graph::prelude::ENV_VARS.ingestor_polling_interval, true, + chain_settings, ); Ok(Arc::new(chain)) diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 88ba990c1e6..4624af467c7 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -55,6 +55,16 @@ provider = [ [chains.ropsten] shard = "primary" +json_rpc_timeout = 60 +request_retries = 5 +max_block_range_size = 500 +block_batch_size = 5 +block_ptr_batch_size = 10 +max_event_only_range = 200 +target_triggers_per_block_range = 50 +get_logs_max_contracts = 1000 +block_ingestor_max_concurrent_json_rpc_calls = 500 +genesis_block_number = 0 provider = [ { label = "ropsten-0", url = "http://rpc.ropsten.io", transport = "rpc", features = ["archive", "traces"] } ] diff --git a/node/src/chain.rs b/node/src/chain.rs index 7d851918c70..2e52d966311 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -1,7 +1,8 @@ -use crate::config::{Config, ProviderDetails}; +use crate::config::{ChainSettings as ConfigChainSettings, Config, ProviderDetails}; use crate::network_setup::{ AdapterConfiguration, EthAdapterConfig, FirehoseAdapterConfig, Networks, }; +use ethereum::chain::ChainSettings; use ethereum::chain::{ EthereumAdapterSelector, EthereumBlockRefetcher, EthereumRuntimeAdapterBuilder, EthereumStreamBuilder, @@ -146,6 +147,37 @@ pub fn create_firehose_networks( .collect() } +impl From for ChainSettings { + fn from(c: ConfigChainSettings) -> Self { + let ConfigChainSettings { + polling_interval, + json_rpc_timeout, + request_retries, + max_block_range_size, + block_batch_size, + block_ptr_batch_size, + max_event_only_range, + target_triggers_per_block_range, + get_logs_max_contracts, + block_ingestor_max_concurrent_json_rpc_calls, + genesis_block_number, + } = c; + ChainSettings { + polling_interval, + json_rpc_timeout, + request_retries, + max_block_range_size, + block_batch_size, + block_ptr_batch_size, + max_event_only_range, + target_triggers_per_block_range, + get_logs_max_contracts, + block_ingestor_max_concurrent_json_rpc_calls, + genesis_block_number, + } + } +} + /// Parses all Ethereum connection strings and returns their network names and /// `EthereumAdapter`. pub async fn create_ethereum_networks( @@ -188,6 +220,7 @@ pub async fn create_ethereum_networks_for_chain( .chains .get(network_name) .ok_or_else(|| anyhow!("unknown network {}", network_name))?; + let settings = Arc::new(ChainSettings::from(chain.settings.clone())); let mut adapters = vec![]; let mut call_only_adapters = vec![]; @@ -243,6 +276,7 @@ pub async fn create_ethereum_networks_for_chain( eth_rpc_metrics.clone(), supports_eip_1898, call_only, + settings.clone(), ) .await, ), @@ -269,7 +303,7 @@ pub async fn create_ethereum_networks_for_chain( chain_id: network_name.into(), adapters, call_only: call_only_adapters, - polling_interval: Some(chain.polling_interval), + settings, })) } @@ -335,12 +369,12 @@ pub async fn networks_as_chains( match kind { BlockchainKind::Ethereum => { - // polling interval is set per chain so if set all adapter configuration will have - // the same value. - let polling_interval = adapters - .first() - .and_then(|a| a.as_rpc().and_then(|a| a.polling_interval)) - .unwrap_or(config.ingestor_polling_interval); + // settings come from the first RPC adapter config; falls back to ENV_VAR + // defaults for firehose-only chains. + let settings = adapters + .iter() + .find_map(|a| a.as_rpc().map(|r| r.settings.clone())) + .unwrap_or_else(|| Arc::new(ChainSettings::from_env_defaults())); let firehose_endpoints = networks.firehose_endpoints(chain_id.clone()); let eth_adapters = networks.ethereum_rpcs(chain_id.clone()); @@ -377,8 +411,8 @@ pub async fn networks_as_chains( Arc::new(EthereumRuntimeAdapterBuilder {}), eth_adapters, ENV_VARS.reorg_threshold(), - polling_interval, true, + settings, ); blockchain_map diff --git a/node/src/config.rs b/node/src/config.rs index 69c174e5a76..8443ce2780c 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -573,9 +573,9 @@ impl ChainSection { let entry = chains.entry(name.to_string()).or_insert_with(|| Chain { shard: PRIMARY_SHARD.to_string(), protocol: BlockchainKind::Ethereum, - polling_interval: default_polling_interval(), providers: vec![], amp: None, + settings: ChainSettings::default(), }); entry.providers.push(provider); } @@ -584,22 +584,86 @@ impl ChainSection { } } +/// Per-chain settings. Flattened into `Chain` so all fields appear as top-level keys in +/// `[chains.X]` TOML sections. Absent fields fall back to the corresponding `GRAPH_ETHEREUM_*` / +/// `ETHEREUM_*` environment variable defaults via serde default functions — the same pattern as +/// `polling_interval`. #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct Chain { - pub shard: String, - #[serde(default = "default_blockchain_kind")] - pub protocol: BlockchainKind, +pub struct ChainSettings { + /// Set by `polling_interval` (milliseconds). Defaults to `GRAPH_ETHEREUM_POLLING_INTERVAL`. #[serde( default = "default_polling_interval", deserialize_with = "deserialize_duration_millis" )] pub polling_interval: Duration, + /// Set by `json_rpc_timeout` (seconds). Defaults to `GRAPH_ETHEREUM_JSON_RPC_TIMEOUT`. + #[serde( + default = "default_json_rpc_timeout", + deserialize_with = "deserialize_duration_secs" + )] + pub json_rpc_timeout: Duration, + /// Defaults to `GRAPH_ETHEREUM_REQUEST_RETRIES`. + #[serde(default = "default_request_retries")] + pub request_retries: usize, + /// Defaults to `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE`. + #[serde(default = "default_max_block_range_size")] + pub max_block_range_size: i32, + /// Defaults to `ETHEREUM_BLOCK_BATCH_SIZE`. + #[serde(default = "default_block_batch_size")] + pub block_batch_size: usize, + /// Defaults to `ETHEREUM_BLOCK_PTR_BATCH_SIZE`. + #[serde(default = "default_block_ptr_batch_size")] + pub block_ptr_batch_size: usize, + /// Defaults to `GRAPH_ETHEREUM_MAX_EVENT_ONLY_RANGE`. + #[serde(default = "default_max_event_only_range")] + pub max_event_only_range: i32, + /// Defaults to `GRAPH_ETHEREUM_TARGET_TRIGGERS_PER_BLOCK_RANGE`. + #[serde(default = "default_target_triggers_per_block_range")] + pub target_triggers_per_block_range: u64, + /// Defaults to `GRAPH_ETH_GET_LOGS_MAX_CONTRACTS`. + #[serde(default = "default_get_logs_max_contracts")] + pub get_logs_max_contracts: usize, + /// Defaults to `GRAPH_ETHEREUM_BLOCK_INGESTOR_MAX_CONCURRENT_JSON_RPC_CALLS_FOR_TXN_RECEIPTS`. + #[serde(default = "default_block_ingestor_max_concurrent_json_rpc_calls")] + pub block_ingestor_max_concurrent_json_rpc_calls: usize, + /// Defaults to `GRAPH_ETHEREUM_GENESIS_BLOCK_NUMBER`. + #[serde(default = "default_genesis_block_number")] + pub genesis_block_number: u64, +} + +impl Default for ChainSettings { + fn default() -> Self { + ChainSettings { + polling_interval: default_polling_interval(), + json_rpc_timeout: default_json_rpc_timeout(), + request_retries: default_request_retries(), + max_block_range_size: default_max_block_range_size(), + block_batch_size: default_block_batch_size(), + block_ptr_batch_size: default_block_ptr_batch_size(), + max_event_only_range: default_max_event_only_range(), + target_triggers_per_block_range: default_target_triggers_per_block_range(), + get_logs_max_contracts: default_get_logs_max_contracts(), + block_ingestor_max_concurrent_json_rpc_calls: + default_block_ingestor_max_concurrent_json_rpc_calls(), + genesis_block_number: default_genesis_block_number(), + } + } +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct Chain { + pub shard: String, + #[serde(default = "default_blockchain_kind")] + pub protocol: BlockchainKind, #[serde(rename = "provider")] pub providers: Vec, /// AMP network name alias. When set, AMP manifests using this name will /// resolve to this chain. Defaults to the chain name. #[serde(default)] pub amp: Option, + /// Per-chain settings (flat fields). Absent fields fall back to ENV_VAR defaults. + #[serde(flatten)] + pub settings: ChainSettings, } fn default_blockchain_kind() -> BlockchainKind { @@ -1243,6 +1307,46 @@ fn default_polling_interval() -> Duration { ENV_VARS.ingestor_polling_interval } +fn default_json_rpc_timeout() -> Duration { + ethereum::ENV_VARS.json_rpc_timeout +} + +fn default_request_retries() -> usize { + ethereum::ENV_VARS.request_retries +} + +fn default_max_block_range_size() -> i32 { + ethereum::ENV_VARS.max_block_range_size +} + +fn default_block_batch_size() -> usize { + ethereum::ENV_VARS.block_batch_size +} + +fn default_block_ptr_batch_size() -> usize { + ethereum::ENV_VARS.block_ptr_batch_size +} + +fn default_max_event_only_range() -> i32 { + ethereum::ENV_VARS.max_event_only_range +} + +fn default_target_triggers_per_block_range() -> u64 { + ethereum::ENV_VARS.target_triggers_per_block_range +} + +fn default_get_logs_max_contracts() -> usize { + ethereum::ENV_VARS.get_logs_max_contracts +} + +fn default_block_ingestor_max_concurrent_json_rpc_calls() -> usize { + ethereum::ENV_VARS.block_ingestor_max_concurrent_json_rpc_calls +} + +fn default_genesis_block_number() -> u64 { + ethereum::ENV_VARS.genesis_block_number +} + fn deserialize_duration_millis<'de, D>(data: D) -> Result where D: Deserializer<'de>, @@ -1251,6 +1355,14 @@ where Ok(Duration::from_millis(millis)) } +fn deserialize_duration_secs<'de, D>(data: D) -> Result +where + D: Deserializer<'de>, +{ + let secs = u64::deserialize(data)?; + Ok(Duration::from_secs(secs)) +} + // From https://github.com/serde-rs/serde/issues/889#issuecomment-295988865 fn string_or_vec<'de, D>(deserializer: D) -> Result, D::Error> where @@ -1289,7 +1401,8 @@ mod tests { use crate::config::{default_polling_interval, ChainSection, Web3Rule}; use super::{ - Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, + Chain, ChainSettings, Config, FirehoseProvider, Provider, ProviderDetails, Shard, + Transport, Web3Provider, }; use graph::blockchain::BlockchainKind; use graph::firehose::SubgraphLimit; @@ -1331,9 +1444,9 @@ mod tests { Chain { shard: "primary".to_string(), protocol: BlockchainKind::Ethereum, - polling_interval: default_polling_interval(), providers: vec![], amp: None, + settings: ChainSettings::default(), }, actual ); @@ -1354,9 +1467,9 @@ mod tests { Chain { shard: "primary".to_string(), protocol: BlockchainKind::Near, - polling_interval: default_polling_interval(), providers: vec![], amp: None, + settings: ChainSettings::default(), }, actual ); @@ -1891,7 +2004,12 @@ mod tests { assert_eq!( default, - actual.chains.get("mainnet").unwrap().polling_interval + actual + .chains + .get("mainnet") + .unwrap() + .settings + .polling_interval ); // Polling interval set explicitly, use that @@ -1911,7 +2029,12 @@ mod tests { assert_eq!( different, - actual.chains.get("mainnet").unwrap().polling_interval + actual + .chains + .get("mainnet") + .unwrap() + .settings + .polling_interval ); } diff --git a/node/src/network_setup.rs b/node/src/network_setup.rs index 8de5532e8ff..2191aaea0d9 100644 --- a/node/src/network_setup.rs +++ b/node/src/network_setup.rs @@ -1,4 +1,5 @@ use ethereum::{ + chain::ChainSettings, network::{EthereumNetworkAdapter, EthereumNetworkAdapters}, BlockIngestor, }; @@ -27,7 +28,7 @@ use graph::{ use graph_chain_ethereum as ethereum; use graph_store_postgres::{BlockStore, ChainHeadUpdateListener}; -use std::{any::Any, cmp::Ordering, sync::Arc, time::Duration}; +use std::{any::Any, cmp::Ordering, sync::Arc}; use crate::chain::{ create_ethereum_networks, create_firehose_networks, networks_as_chains, AnyChainFilter, @@ -39,9 +40,7 @@ pub struct EthAdapterConfig { pub chain_id: ChainName, pub adapters: Vec, pub call_only: Vec, - // polling interval is set per chain so if set all adapter configuration will have - // the same value. - pub polling_interval: Option, + pub settings: Arc, } #[derive(Debug, Clone)] @@ -236,7 +235,7 @@ impl Networks { chain_id, mut adapters, call_only: _, - polling_interval: _, + settings: _, }| { adapters.sort_by(|a, b| { a.capabilities diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index fa8468d164d..40bf54467cc 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -22,11 +22,11 @@ use graph::prelude::{ use graph::schema::EntityType; use graph_chain_ethereum::network::EthereumNetworkAdapters; use graph_chain_ethereum::trigger::LogRef; -use graph_chain_ethereum::Chain; use graph_chain_ethereum::{ chain::BlockFinality, trigger::{EthereumBlockTriggerType, EthereumTrigger}, }; +use graph_chain_ethereum::{chain::ChainSettings, Chain}; pub async fn chain( test_name: &str, @@ -67,9 +67,9 @@ pub async fn chain( Arc::new(NoopRuntimeAdapterBuilder {}), eth_adapters, ENV_VARS.reorg_threshold(), - ENV_VARS.ingestor_polling_interval, // We assume the tested chain is always ingestible for now true, + Arc::new(ChainSettings::from_env_defaults()), ); TestChain { From 5f2c14f58c3d7155fa84a907276c931f666d279d Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 26 Mar 2026 14:13:24 +0200 Subject: [PATCH 2/3] docs: Document per-chain RPC settings in config and env vars Add the full list of per-chain Ethereum RPC tuning settings to docs/config.md with env var fallback defaults, and add a cross-reference note in docs/environment-variables.md. --- docs/config.md | 35 ++++++++++++++++++++++++++++++++++- docs/environment-variables.md | 5 +++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/docs/config.md b/docs/config.md index bcf15fc2c56..d6280e1bc53 100644 --- a/docs/config.md +++ b/docs/config.md @@ -113,12 +113,41 @@ The configuration for a chain `name` is specified in the section - `shard`: where chain data is stored - `protocol`: the protocol type being indexed, default `ethereum` (alternatively `near`, `cosmos`,`arweave`,`starknet`) -- `polling_interval`: the polling interval for the block ingestor (default 500ms) - `amp`: the network name used by AMP for this chain; defaults to the chain name. Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"` on a chain named `mainnet`). - `provider`: a list of providers for that chain +Additionally, Ethereum chains support per-chain RPC tuning settings. When +omitted, each setting falls back to its corresponding environment variable +default (see [environment-variables.md](environment-variables.md) for +details): + +- `polling_interval`: block ingestor polling interval in milliseconds. + Default: `ETHEREUM_POLLING_INTERVAL` (500ms). +- `json_rpc_timeout`: timeout for JSON-RPC requests in seconds. + Default: `GRAPH_ETHEREUM_JSON_RPC_TIMEOUT` (180s). +- `request_retries`: number of times to retry failed JSON-RPC requests. + Default: `GRAPH_ETHEREUM_REQUEST_RETRIES` (10). +- `max_block_range_size`: maximum number of blocks to scan for triggers per + request. Default: `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE` (1000). +- `block_batch_size`: number of blocks to request in parallel. + Default: `ETHEREUM_BLOCK_BATCH_SIZE` (10). +- `block_ptr_batch_size`: number of block pointers to request in parallel. + Default: `ETHEREUM_BLOCK_PTR_BATCH_SIZE` (100). +- `max_event_only_range`: maximum range for `eth_getLogs` requests that + don't filter on contract address. + Default: `GRAPH_ETHEREUM_MAX_EVENT_ONLY_RANGE` (500). +- `target_triggers_per_block_range`: ideal number of triggers per batch. + Default: `GRAPH_ETHEREUM_TARGET_TRIGGERS_PER_BLOCK_RANGE` (100). +- `get_logs_max_contracts`: maximum contracts per `eth_getLogs` call. + Default: `GRAPH_ETH_GET_LOGS_MAX_CONTRACTS` (2000). +- `block_ingestor_max_concurrent_json_rpc_calls`: maximum concurrent + JSON-RPC calls for transaction receipts during block ingestion. + Default: `GRAPH_ETHEREUM_BLOCK_INGESTOR_MAX_CONCURRENT_JSON_RPC_CALLS_FOR_TXN_RECEIPTS` (1000). +- `genesis_block_number`: genesis block number for this chain. + Default: `GRAPH_ETHEREUM_GENESIS_BLOCK_NUMBER` (0). + A `provider` is an object with the following characteristics: - `label`: the name of the provider, which will appear in logs @@ -168,6 +197,10 @@ ingestor = "block_ingestor_node" [chains.mainnet] shard = "vip" amp = "ethereum-mainnet" +# Per-chain RPC tuning (all optional — omitted fields use env var defaults) +json_rpc_timeout = 300 +request_retries = 15 +max_block_range_size = 2000 provider = [ { label = "mainnet1", url = "http://..", features = [], headers = { Authorization = "Bearer foo" } }, { label = "mainnet2", url = "http://..", features = [ "archive", "traces" ] } diff --git a/docs/environment-variables.md b/docs/environment-variables.md index b78c4ada314..a11534bbd00 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -10,6 +10,11 @@ those. ## JSON-RPC configuration for EVM chains +> **Note**: Many of these settings can now be overridden per-chain in the TOML +> configuration file under `[chains.]`. When a per-chain value is set in +> the config file it takes precedence over the environment variable. See +> [config.md](config.md#configuring-chains) for details. + - `ETHEREUM_REORG_THRESHOLD`: Maximum expected reorg size, if a larger reorg happens, subgraphs might process inconsistent data. Defaults to 250. - `ETHEREUM_POLLING_INTERVAL`: how often to poll Ethereum for new blocks (in ms, From 728bef55274470dc1b0cb665b15fca23efe510d6 Mon Sep 17 00:00:00 2001 From: Maksim Dimitrov Date: Thu, 26 Mar 2026 15:55:19 +0200 Subject: [PATCH 3/3] node: Validate per-chain settings on config load Reject invalid ChainSettings values at startup: max_block_range_size and max_event_only_range must be positive (i32), and block_batch_size, block_ptr_batch_size, block_ingestor_max_concurrent_json_rpc_calls, and get_logs_max_contracts must be non-zero (used as .buffered() args or filter batch limits where 0 would panic or silently break). --- node/src/config.rs | 282 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 277 insertions(+), 5 deletions(-) diff --git a/node/src/config.rs b/node/src/config.rs index 8443ce2780c..2e988736678 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -445,8 +445,8 @@ impl ChainSection { fn validate(&mut self) -> Result<()> { NodeId::new(&self.ingestor) .map_err(|node| anyhow!("invalid node id for ingestor {}", node))?; - for (_, chain) in self.chains.iter_mut() { - chain.validate()? + for (name, chain) in self.chains.iter_mut() { + chain.validate(name)? } // Validate that effective AMP names are unique and don't collide @@ -650,6 +650,33 @@ impl Default for ChainSettings { } } +impl ChainSettings { + fn validate(&self) -> Result<()> { + anyhow::ensure!( + self.max_block_range_size > 0, + "max_block_range_size must be > 0" + ); + anyhow::ensure!( + self.max_event_only_range > 0, + "max_event_only_range must be > 0" + ); + anyhow::ensure!(self.block_batch_size > 0, "block_batch_size must be > 0"); + anyhow::ensure!( + self.block_ptr_batch_size > 0, + "block_ptr_batch_size must be > 0" + ); + anyhow::ensure!( + self.block_ingestor_max_concurrent_json_rpc_calls > 0, + "block_ingestor_max_concurrent_json_rpc_calls must be > 0" + ); + anyhow::ensure!( + self.get_logs_max_contracts > 0, + "get_logs_max_contracts must be > 0" + ); + Ok(()) + } +} + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct Chain { pub shard: String, @@ -671,12 +698,12 @@ fn default_blockchain_kind() -> BlockchainKind { } impl Chain { - fn validate(&mut self) -> Result<()> { + fn validate(&mut self, name: &str) -> Result<()> { let mut labels = self.providers.iter().map(|p| &p.label).collect_vec(); labels.sort(); labels.dedup(); if labels.len() != self.providers.len() { - return Err(anyhow!("Provider labels must be unique")); + return Err(anyhow!("chain {}: provider labels must be unique", name)); } // `Config` validates that `self.shard` references a configured shard @@ -684,6 +711,10 @@ impl Chain { provider.validate()? } + self.settings + .validate() + .map_err(|e| anyhow!("chain {}: {}", name, e))?; + Ok(()) } } @@ -1398,7 +1429,15 @@ where #[cfg(test)] mod tests { - use crate::config::{default_polling_interval, ChainSection, Web3Rule}; + use std::time::Duration; + + use crate::config::{ + default_block_batch_size, default_block_ingestor_max_concurrent_json_rpc_calls, + default_block_ptr_batch_size, default_genesis_block_number, default_get_logs_max_contracts, + default_json_rpc_timeout, default_max_block_range_size, default_max_event_only_range, + default_polling_interval, default_request_retries, default_target_triggers_per_block_range, + ChainSection, Web3Rule, + }; use super::{ Chain, ChainSettings, Config, FirehoseProvider, Provider, ProviderDetails, Shard, @@ -2213,4 +2252,237 @@ fdw_pool_size = [ graph::components::network_provider::ChainName::from("unknown") ); } + + #[test] + fn chain_settings_valid_overrides() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + json_rpc_timeout = 300 + request_retries = 15 + max_block_range_size = 2000 + block_batch_size = 20 + block_ptr_batch_size = 50 + max_event_only_range = 1000 + target_triggers_per_block_range = 200 + get_logs_max_contracts = 5000 + block_ingestor_max_concurrent_json_rpc_calls = 500 + genesis_block_number = 1 + provider = [] + "#, + ) + .unwrap(); + + assert!(section.validate().is_ok()); + let settings = §ion.chains.get("mainnet").unwrap().settings; + assert_eq!(settings.json_rpc_timeout, Duration::from_secs(300)); + assert_eq!(settings.request_retries, 15); + assert_eq!(settings.max_block_range_size, 2000); + assert_eq!(settings.block_batch_size, 20); + assert_eq!(settings.block_ptr_batch_size, 50); + assert_eq!(settings.max_event_only_range, 1000); + assert_eq!(settings.target_triggers_per_block_range, 200); + assert_eq!(settings.get_logs_max_contracts, 5000); + assert_eq!(settings.block_ingestor_max_concurrent_json_rpc_calls, 500); + assert_eq!(settings.genesis_block_number, 1); + } + + #[test] + fn chain_settings_defaults_match_env_vars() { + let settings = ChainSettings::default(); + assert_eq!(settings.polling_interval, default_polling_interval()); + assert_eq!(settings.json_rpc_timeout, default_json_rpc_timeout()); + assert_eq!(settings.request_retries, default_request_retries()); + assert_eq!( + settings.max_block_range_size, + default_max_block_range_size() + ); + assert_eq!(settings.block_batch_size, default_block_batch_size()); + assert_eq!( + settings.block_ptr_batch_size, + default_block_ptr_batch_size() + ); + assert_eq!( + settings.max_event_only_range, + default_max_event_only_range() + ); + assert_eq!( + settings.target_triggers_per_block_range, + default_target_triggers_per_block_range() + ); + assert_eq!( + settings.get_logs_max_contracts, + default_get_logs_max_contracts() + ); + assert_eq!( + settings.block_ingestor_max_concurrent_json_rpc_calls, + default_block_ingestor_max_concurrent_json_rpc_calls() + ); + assert_eq!( + settings.genesis_block_number, + default_genesis_block_number() + ); + } + + #[test] + fn chain_settings_rejects_zero_max_block_range_size() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + max_block_range_size = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("max_block_range_size must be > 0"), + "expected max_block_range_size error, got: {err}" + ); + } + + #[test] + fn chain_settings_rejects_negative_max_block_range_size() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + max_block_range_size = -1 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("max_block_range_size must be > 0"), + "expected max_block_range_size error, got: {err}" + ); + } + + #[test] + fn chain_settings_rejects_zero_max_event_only_range() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + max_event_only_range = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("max_event_only_range must be > 0"), + "expected max_event_only_range error, got: {err}" + ); + } + + #[test] + fn chain_settings_rejects_zero_block_batch_size() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + block_batch_size = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("block_batch_size must be > 0"), + "expected block_batch_size error, got: {err}" + ); + } + + #[test] + fn chain_settings_rejects_zero_block_ptr_batch_size() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + block_ptr_batch_size = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("block_ptr_batch_size must be > 0"), + "expected block_ptr_batch_size error, got: {err}" + ); + } + + #[test] + fn chain_settings_rejects_zero_concurrent_json_rpc_calls() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + block_ingestor_max_concurrent_json_rpc_calls = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("block_ingestor_max_concurrent_json_rpc_calls must be > 0"), + "expected block_ingestor_max_concurrent_json_rpc_calls error, got: {err}" + ); + } + + #[test] + fn chain_settings_rejects_zero_get_logs_max_contracts() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + get_logs_max_contracts = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("get_logs_max_contracts must be > 0"), + "expected get_logs_max_contracts error, got: {err}" + ); + } + + #[test] + fn chain_settings_validation_error_includes_chain_name() { + let mut section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [my_chain] + shard = "primary" + block_batch_size = 0 + provider = [] + "#, + ) + .unwrap(); + + let err = section.validate().unwrap_err().to_string(); + assert!( + err.contains("my_chain"), + "expected chain name in error, got: {err}" + ); + } }