Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub struct EthereumLogFilter {

impl From<EthereumLogFilter> for Vec<LogFilter> {
fn from(val: EthereumLogFilter) -> Self {
val.eth_get_logs_filters()
val.eth_get_logs_filters(ENV_VARS.get_logs_max_contracts)
.map(
|EthGetLogsFilter {
contracts,
Expand Down Expand Up @@ -546,7 +546,10 @@ impl EthereumLogFilter {
/// Filters for `eth_getLogs` calls. The filters will not return false positives. This attempts
/// to balance between having granular filters but too many calls and having few calls but too
/// broad filters causing the Ethereum endpoint to timeout.
pub fn eth_get_logs_filters(self) -> impl Iterator<Item = EthGetLogsFilter> {
pub fn eth_get_logs_filters(
self,
get_logs_max_contracts: usize,
) -> impl Iterator<Item = EthGetLogsFilter> {
let mut filters = Vec::new();

// Start with the wildcard event filters.
Expand Down Expand Up @@ -596,7 +599,7 @@ impl EthereumLogFilter {
for neighbor in g.neighbors(max_vertex) {
match neighbor {
LogFilterNode::Contract(address) => {
if filter.contracts.len() == ENV_VARS.get_logs_max_contracts {
if filter.contracts.len() == get_logs_max_contracts {
// The batch size was reached, register the filter and start a new one.
let event = filter.event_signatures[0];
push_filter(filter);
Expand Down Expand Up @@ -1766,7 +1769,7 @@ fn complete_log_filter() {
wildcard_events: HashMap::new(),
events_with_topic_filters: HashMap::new(),
}
.eth_get_logs_filters()
.eth_get_logs_filters(ENV_VARS.get_logs_max_contracts)
.collect();

// Assert that a contract or event is filtered on iff it was present in the graph.
Expand Down
52 changes: 45 additions & 7 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,44 @@ use graph::blockchain::block_stream::{
/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320];

/// Resolved per-chain settings. Populated at chain initialisation from the config file (with
/// ENV_VAR fallbacks) and stored on [`Chain`] and [`crate::EthereumAdapter`].
#[derive(Clone, Debug)]
pub struct ChainSettings {
pub polling_interval: Duration,
pub json_rpc_timeout: Duration,
pub request_retries: usize,
pub max_block_range_size: BlockNumber,
pub block_batch_size: usize,
pub block_ptr_batch_size: usize,
pub max_event_only_range: BlockNumber,
pub target_triggers_per_block_range: u64,
pub get_logs_max_contracts: usize,
pub block_ingestor_max_concurrent_json_rpc_calls: usize,
pub genesis_block_number: u64,
}

impl ChainSettings {
/// Constructs a [`ChainSettings`] from environment variable defaults.
/// Used in tests and for firehose-only chains that have no RPC config.
pub fn from_env_defaults() -> Self {
ChainSettings {
polling_interval: graph::env::ENV_VARS.ingestor_polling_interval,
json_rpc_timeout: ENV_VARS.json_rpc_timeout,
request_retries: ENV_VARS.request_retries,
max_block_range_size: ENV_VARS.max_block_range_size,
block_batch_size: ENV_VARS.block_batch_size,
block_ptr_batch_size: ENV_VARS.block_ptr_batch_size,
max_event_only_range: ENV_VARS.max_event_only_range,
target_triggers_per_block_range: ENV_VARS.target_triggers_per_block_range,
get_logs_max_contracts: ENV_VARS.get_logs_max_contracts,
block_ingestor_max_concurrent_json_rpc_calls: ENV_VARS
.block_ingestor_max_concurrent_json_rpc_calls,
genesis_block_number: ENV_VARS.genesis_block_number,
}
}
}

pub struct EthereumStreamBuilder {}

#[async_trait]
Expand Down Expand Up @@ -192,9 +230,9 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
};

let max_block_range_size = if is_using_subgraph_composition {
ENV_VARS.max_block_range_size * 10
chain.settings.max_block_range_size * 10
} else {
ENV_VARS.max_block_range_size
chain.settings.max_block_range_size
};

Ok(Box::new(PollingBlockStream::new(
Expand All @@ -206,7 +244,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
reorg_threshold,
logger,
max_block_range_size,
ENV_VARS.target_triggers_per_block_range,
chain.settings.target_triggers_per_block_range,
unified_api_version,
subgraph_current_block,
)))
Expand Down Expand Up @@ -325,13 +363,13 @@ pub struct Chain {
call_cache: Arc<dyn EthereumCallCache>,
chain_head_update_listener: Arc<dyn ChainHeadUpdateListener>,
reorg_threshold: BlockNumber,
polling_ingestor_interval: Duration,
pub is_ingestible: bool,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
block_refetcher: Arc<dyn BlockRefetcher<Self>>,
adapter_selector: Arc<dyn TriggersAdapterSelector<Self>>,
runtime_adapter_builder: Arc<dyn RuntimeAdapterBuilder>,
eth_adapters: Arc<EthereumNetworkAdapters>,
pub settings: Arc<ChainSettings>,
}

impl std::fmt::Debug for Chain {
Expand Down Expand Up @@ -388,8 +426,8 @@ impl Chain {
runtime_adapter_builder: Arc<dyn RuntimeAdapterBuilder>,
eth_adapters: Arc<EthereumNetworkAdapters>,
reorg_threshold: BlockNumber,
polling_ingestor_interval: Duration,
is_ingestible: bool,
settings: Arc<ChainSettings>,
) -> Self {
Chain {
logger_factory,
Expand All @@ -406,7 +444,7 @@ impl Chain {
eth_adapters,
reorg_threshold,
is_ingestible,
polling_ingestor_interval,
settings,
}
}

Expand Down Expand Up @@ -637,7 +675,7 @@ impl Blockchain for Chain {
graph::env::ENV_VARS.reorg_threshold(),
self.chain_client(),
self.chain_store.cheap_clone(),
self.polling_ingestor_interval,
self.settings.polling_interval,
self.name.clone(),
)?)
}
Expand Down
Loading