Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions crates/bridges/evm/erigon-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,25 @@ impl ErigonFlightBridge {
// Initialize metrics
let metrics = BridgeMetrics::new("erigon_bridge", chain_id, "erigon");

// Create global semaphore for ExecuteBlocks calls and add it to config
let mut final_segment_config = segment_config.unwrap_or_default();
let execute_blocks_semaphore = Arc::new(tokio::sync::Semaphore::new(
final_segment_config.global_max_execute_blocks,
));
final_segment_config.execute_blocks_semaphore = Some(execute_blocks_semaphore);
info!(
"Created global ExecuteBlocks semaphore with {} permits",
final_segment_config.global_max_execute_blocks
);

Ok(Self {
client: Arc::new(tokio::sync::Mutex::new(client)),
blockdata_pool: Arc::new(blockdata_pool),
trie_client,
chain_id,
streaming_service,
validator,
segment_config: segment_config.unwrap_or_default(),
segment_config: final_segment_config,
metrics,
})
}
Expand Down Expand Up @@ -225,8 +236,9 @@ impl ErigonFlightBridge {
start: u64,
end: u64,
validate: bool,
) -> impl Stream<Item = Result<phaser_bridge::BatchWithRange, arrow_flight::error::FlightError>> + Send
{
) -> impl Stream<Item = Result<phaser_bridge::BatchWithRange, arrow_flight::error::FlightError>>
+ Send
+ 'static {
let max_concurrent = config.max_concurrent_segments;
let should_validate = validate && validator.is_some();
let metrics = self.metrics.clone();
Expand Down Expand Up @@ -356,8 +368,9 @@ impl ErigonFlightBridge {
start: u64,
end: u64,
validate: bool,
) -> impl Stream<Item = Result<phaser_bridge::BatchWithRange, arrow_flight::error::FlightError>> + Send
{
) -> impl Stream<Item = Result<phaser_bridge::BatchWithRange, arrow_flight::error::FlightError>>
+ Send
+ 'static {
let max_concurrent = config.max_concurrent_segments;
let should_validate = validate && validator.is_some();
let metrics = self.metrics.clone();
Expand Down
25 changes: 24 additions & 1 deletion crates/bridges/evm/erigon-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ struct Args {
#[arg(long, env = "VALIDATION_BATCH_SIZE", default_value_t = 100)]
validation_batch_size: usize,

/// Maximum number of concurrent ExecuteBlocks calls within a segment (default: num_cpus)
/// Controls how many block ranges can be executed in parallel when fetching logs/receipts.
/// Higher values utilize more CPU cores but increase memory usage and connection pressure.
#[arg(long, env = "MAX_CONCURRENT_EXECUTIONS")]
max_concurrent_executions: Option<usize>,

/// Global maximum number of concurrent ExecuteBlocks calls across ALL segments (default: 64)
/// This prevents overwhelming Erigon with too many concurrent gRPC streams.
/// Set based on your Erigon server capacity (e.g., 64-256 for production).
#[arg(long, env = "GLOBAL_MAX_EXECUTE_BLOCKS", default_value_t = 64)]
global_max_execute_blocks: usize,

/// Prometheus metrics port (default: 9091)
#[arg(long, env = "METRICS_PORT", default_value_t = 9091)]
metrics_port: u16,
Expand Down Expand Up @@ -124,14 +136,17 @@ async fn main() -> Result<()> {
info!("Validation enabled with executor: {:?}", config);
}

// Build segment config
// Build segment config (semaphore will be added by bridge)
let segment_config = SegmentConfig {
segment_size: args.segment_size,
max_concurrent_segments: args
.max_concurrent_segments
.unwrap_or_else(|| (num_cpus::get() / 4).max(1)),
connection_pool_size: args.connection_pool_size,
validation_batch_size: args.validation_batch_size,
max_concurrent_executions: args.max_concurrent_executions.unwrap_or_else(num_cpus::get),
global_max_execute_blocks: args.global_max_execute_blocks,
execute_blocks_semaphore: None, // Will be set by bridge
};

info!("Segment configuration:");
Expand All @@ -148,6 +163,14 @@ async fn main() -> Result<()> {
" Validation batch size: {} blocks",
segment_config.validation_batch_size
);
info!(
" Max concurrent executions per segment: {}",
segment_config.max_concurrent_executions
);
info!(
" Global max ExecuteBlocks: {}",
segment_config.global_max_execute_blocks
);

// Start Prometheus metrics server
let metrics_port = args.metrics_port;
Expand Down
Loading
Loading