From 1ea5ea5420bde622dc02f981117d221b09a7a3a4 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 10 Nov 2025 19:50:10 -0800 Subject: [PATCH 1/5] Rename max_logs_segments_sync to max_concurrent_log_segments - Rename config field for clarity (better describes what it does) - Change default from 1 to 16 segments - Rationale: Bridge already limits concurrent Erigon requests via buffering - Streaming architecture means data flows through without accumulation - 16 concurrent segments provides good parallelism while preventing the 48-segment explosion that caused memory/stream issues The semaphore limits how many segments can process logs concurrently, but the bridge's max_concurrent_executions prevents too much in-flight work at the Erigon level. This allows better throughput without risking resource exhaustion. --- .../bridges/evm/erigon-bridge/src/bridge.rs | 23 +- crates/bridges/evm/erigon-bridge/src/main.rs | 25 +- .../evm/erigon-bridge/src/segment_worker.rs | 298 +++++++++------- crates/phaser-query/proto/admin/sync.proto | 31 +- crates/phaser-query/src/bin/phaser-cli.rs | 33 -- .../src/generated/phaser.admin.rs | 32 +- crates/phaser-query/src/lib.rs | 6 + crates/phaser-query/src/sync/error.rs | 53 +++ crates/phaser-query/src/sync/mod.rs | 2 +- crates/phaser-query/src/sync/service.rs | 103 +----- crates/phaser-query/src/sync/worker.rs | 323 ++++++++++-------- 11 files changed, 482 insertions(+), 447 deletions(-) diff --git a/crates/bridges/evm/erigon-bridge/src/bridge.rs b/crates/bridges/evm/erigon-bridge/src/bridge.rs index e1d3829..3f42bb9 100644 --- a/crates/bridges/evm/erigon-bridge/src/bridge.rs +++ b/crates/bridges/evm/erigon-bridge/src/bridge.rs @@ -111,6 +111,17 @@ impl ErigonFlightBridge { // Initialize metrics let metrics = BridgeMetrics::new("erigon_bridge", chain_id, "erigon"); + // Create global semaphore for ExecuteBlocks calls and add it to config + let mut final_segment_config = segment_config.unwrap_or_default(); + let execute_blocks_semaphore = Arc::new(tokio::sync::Semaphore::new( + final_segment_config.global_max_execute_blocks, + )); + final_segment_config.execute_blocks_semaphore = Some(execute_blocks_semaphore); + info!( + "Created global ExecuteBlocks semaphore with {} permits", + final_segment_config.global_max_execute_blocks + ); + Ok(Self { client: Arc::new(tokio::sync::Mutex::new(client)), blockdata_pool: Arc::new(blockdata_pool), @@ -118,7 +129,7 @@ impl ErigonFlightBridge { chain_id, streaming_service, validator, - segment_config: segment_config.unwrap_or_default(), + segment_config: final_segment_config, metrics, }) } @@ -225,8 +236,9 @@ impl ErigonFlightBridge { start: u64, end: u64, validate: bool, - ) -> impl Stream> + Send - { + ) -> impl Stream> + + Send + + 'static { let max_concurrent = config.max_concurrent_segments; let should_validate = validate && validator.is_some(); let metrics = self.metrics.clone(); @@ -356,8 +368,9 @@ impl ErigonFlightBridge { start: u64, end: u64, validate: bool, - ) -> impl Stream> + Send - { + ) -> impl Stream> + + Send + + 'static { let max_concurrent = config.max_concurrent_segments; let should_validate = validate && validator.is_some(); let metrics = self.metrics.clone(); diff --git a/crates/bridges/evm/erigon-bridge/src/main.rs b/crates/bridges/evm/erigon-bridge/src/main.rs index 708fdb1..8728d6d 100644 --- a/crates/bridges/evm/erigon-bridge/src/main.rs +++ b/crates/bridges/evm/erigon-bridge/src/main.rs @@ -72,6 +72,18 @@ struct Args { #[arg(long, env = "VALIDATION_BATCH_SIZE", default_value_t = 100)] validation_batch_size: usize, + /// Maximum number of concurrent ExecuteBlocks calls within a segment (default: num_cpus) + /// Controls how many block ranges can be executed in parallel when fetching logs/receipts. + /// Higher values utilize more CPU cores but increase memory usage and connection pressure. + #[arg(long, env = "MAX_CONCURRENT_EXECUTIONS")] + max_concurrent_executions: Option, + + /// Global maximum number of concurrent ExecuteBlocks calls across ALL segments (default: 64) + /// This prevents overwhelming Erigon with too many concurrent gRPC streams. + /// Set based on your Erigon server capacity (e.g., 64-256 for production). + #[arg(long, env = "GLOBAL_MAX_EXECUTE_BLOCKS", default_value_t = 64)] + global_max_execute_blocks: usize, + /// Prometheus metrics port (default: 9091) #[arg(long, env = "METRICS_PORT", default_value_t = 9091)] metrics_port: u16, @@ -124,7 +136,7 @@ async fn main() -> Result<()> { info!("Validation enabled with executor: {:?}", config); } - // Build segment config + // Build segment config (semaphore will be added by bridge) let segment_config = SegmentConfig { segment_size: args.segment_size, max_concurrent_segments: args @@ -132,6 +144,9 @@ async fn main() -> Result<()> { .unwrap_or_else(|| (num_cpus::get() / 4).max(1)), connection_pool_size: args.connection_pool_size, validation_batch_size: args.validation_batch_size, + max_concurrent_executions: args.max_concurrent_executions.unwrap_or_else(num_cpus::get), + global_max_execute_blocks: args.global_max_execute_blocks, + execute_blocks_semaphore: None, // Will be set by bridge }; info!("Segment configuration:"); @@ -148,6 +163,14 @@ async fn main() -> Result<()> { " Validation batch size: {} blocks", segment_config.validation_batch_size ); + info!( + " Max concurrent executions per segment: {}", + segment_config.max_concurrent_executions + ); + info!( + " Global max ExecuteBlocks: {}", + segment_config.global_max_execute_blocks + ); // Start Prometheus metrics server let metrics_port = args.metrics_port; diff --git a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs index 2ad977c..5df9394 100644 --- a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs +++ b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs @@ -22,7 +22,7 @@ use tracing::{debug, error, info, warn}; use validators_evm::ValidationExecutor; /// Configuration for segment-based processing and validation -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct SegmentConfig { /// Size of each segment in blocks (default: 500_000, aligned with Erigon snapshots) pub segment_size: u64, @@ -36,6 +36,32 @@ pub struct SegmentConfig { /// Validation batch size within a segment (default: 100 blocks) /// How many blocks to collect before executing validations and converting to Arrow pub validation_batch_size: usize, + + /// Maximum number of concurrent ExecuteBlocks calls within a segment (default: num_cpus) + /// Controls parallelism for block execution when fetching logs/receipts + pub max_concurrent_executions: usize, + + /// Global maximum number of concurrent ExecuteBlocks calls across ALL segments (default: 64) + /// This prevents overwhelming Erigon with too many concurrent gRPC streams + pub global_max_execute_blocks: usize, + + /// Global semaphore for limiting ExecuteBlocks calls across all workers + /// Not included in Default implementation - must be set by bridge + pub execute_blocks_semaphore: Option>, +} + +impl std::fmt::Debug for SegmentConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SegmentConfig") + .field("segment_size", &self.segment_size) + .field("max_concurrent_segments", &self.max_concurrent_segments) + .field("connection_pool_size", &self.connection_pool_size) + .field("validation_batch_size", &self.validation_batch_size) + .field("max_concurrent_executions", &self.max_concurrent_executions) + .field("global_max_execute_blocks", &self.global_max_execute_blocks) + .field("execute_blocks_semaphore", &self.execute_blocks_semaphore.is_some()) + .finish() + } } impl Default for SegmentConfig { @@ -45,6 +71,9 @@ impl Default for SegmentConfig { max_concurrent_segments: (num_cpus::get() / 4).max(1), connection_pool_size: 8, validation_batch_size: 100, + max_concurrent_executions: num_cpus::get(), + global_max_execute_blocks: 64, + execute_blocks_semaphore: None, } } } @@ -417,7 +446,7 @@ impl SegmentWorker { ) -> impl futures::Stream> + Send { async_stream::stream! { let worker_id = self.worker_id; - let segment_id = self.segment_start / 500_000; // Calculate segment ID + let segment_id = self.segment_start / 500_000; let total_blocks = self.segment_end - self.segment_start + 1; let phase_start = Instant::now(); let mut yielded_batches = 0u64; @@ -437,148 +466,148 @@ impl SegmentWorker { .set_worker_stage(worker_id, segment_id, WorkerStage::Logs); self.metrics.active_workers_inc("logs"); - // Clone the client at the start (shares underlying HTTP/2 connection) - let mut client = self.blockdata_client.clone(); - - // Process blocks in chunks, fetching headers and executing blocks concurrently + // Split entire segment into work units upfront + let mut work_units = Vec::new(); let mut current_block = self.segment_start; while current_block <= self.segment_end { - // Fetch a chunk of headers (batch_size blocks at a time) - let chunk_end = (current_block + self.config.validation_batch_size as u64 - 1).min(self.segment_end); + let chunk_end = (current_block + self.config.validation_batch_size as u64 - 1) + .min(self.segment_end); + work_units.push((current_block, chunk_end)); + current_block = chunk_end + 1; + } - let chunk_headers = match Self::fetch_headers(current_block, chunk_end, &mut client, self.config.validation_batch_size as u32, &self.metrics).await { - Ok(h) => { - if h.is_empty() { - warn!("Process logs: Received EMPTY header response for blocks {}-{}", - current_block, chunk_end); - } - h - }, - Err(e) => { - error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}", - worker_id, segment_id, current_block, chunk_end, e); + info!( + "Worker {} segment {}: Created {} work units for parallel execution", + worker_id, segment_id, work_units.len() + ); - // Track error type for monitoring - let error_type = Self::categorize_error(&e); - self.metrics.error(&error_type, "logs"); + // Execute work units with bounded concurrency, yielding results in order + // Use buffered() to automatically handle concurrency and ordering + use futures::stream::{self, StreamExt}; - self.metrics.active_workers_dec("logs"); - yield Err(e); - return; - } - }; + let max_concurrent = self.config.max_concurrent_executions; - debug!( - "Segment {}-{}: Fetched {} headers for chunk {}-{}, spawning {} concurrent ExecuteBlocks calls", - self.segment_start, - self.segment_end, - chunk_headers.len(), - current_block, - chunk_end, - chunk_headers.len() - ); + // Extract needed fields to avoid borrowing self in closure + let blockdata_client = self.blockdata_client.clone(); + let metrics = self.metrics.clone(); + let validation_batch_size = self.config.validation_batch_size; + let execute_blocks_semaphore = self.config.execute_blocks_semaphore.clone(); - // Sort headers by block number - let mut sorted_chunk: Vec<_> = chunk_headers.into_iter().collect(); - sorted_chunk.sort_by_key(|(block_num, _)| *block_num); - - const RECEIPT_BATCH_SIZE: usize = 10; - - let receipt_futures: Vec<_> = sorted_chunk - .chunks(RECEIPT_BATCH_SIZE) - .map(|block_chunk| { - let mut client_clone = client.clone(); - let block_chunk: Vec<_> = block_chunk.to_vec(); - let metrics = self.metrics.clone(); - async move { - let first_block = block_chunk[0].0; - let last_block = block_chunk[block_chunk.len() - 1].0; - - debug!( - "Fetching receipts for {} blocks (range {}-{})", - block_chunk.len(), - first_block, - last_block - ); - - Self::collect_receipts_for_range(first_block, last_block, block_chunk, &mut client_clone, &metrics).await - } - }) - .collect(); - - // Await all batched ExecuteBlocks calls together - let results = futures::future::join_all(receipt_futures).await; - - // Collect successful results and flatten batched results - let mut current_batch = Vec::new(); - for result in results { - match result { - Ok(batch_data) => { - // Each result is a Vec of (block_num, receipts, header) + // Create a stream that processes work units in order with bounded concurrency + let mut work_stream = stream::iter(work_units.into_iter().enumerate()) + .map(move |(work_idx, (start, end))| { + let mut client = blockdata_client.clone(); + let metrics = metrics.clone(); + let semaphore = execute_blocks_semaphore.clone(); + + async move { + // Fetch headers + let headers_result = Self::fetch_headers( + start, + end, + &mut client, + validation_batch_size as u32, + &metrics, + ).await; + + let headers = match headers_result { + Ok(h) => { + if h.is_empty() { + warn!( + "Work unit {}: Received EMPTY header response for blocks {}-{}", + work_idx, start, end + ); + } + h + } + Err(e) => { + error!( + "Work unit {}: Failed to fetch headers for blocks {}-{}: {}", + work_idx, start, end, e + ); + return (work_idx, start, end, Err(e)); + } + }; + + // Sort headers by block number + let mut sorted_headers: Vec<_> = headers.into_iter().collect(); + sorted_headers.sort_by_key(|(block_num, _)| *block_num); + + // Execute blocks for this range + let result = Self::collect_receipts_for_range( + start, + end, + sorted_headers, + &mut client, + &metrics, + semaphore, + ).await; + + (work_idx, start, end, result) + } + }) + .buffered(max_concurrent); + + // Process results as they arrive in order + while let Some((work_idx, range_start, range_end, work_result)) = work_stream.next().await { + match work_result { + Ok(receipts_data) => { // Count receipts for metrics - for (_, receipts, _) in &batch_data { + for (_, receipts, _) in &receipts_data { total_receipts_processed += receipts.len() as u64; } - current_batch.extend(batch_data); - } - Err(e) => { - // Track error type for monitoring - let error_type = Self::categorize_error(&e); - self.metrics.error(&error_type, "logs"); - - self.metrics.active_workers_dec("logs"); - yield Err(e); - return; - } - } - } - // Validate and convert this batch - if !current_batch.is_empty() { - debug!( - "Segment {}-{}: Validating and converting batch of {} blocks", - self.segment_start, - self.segment_end, - current_batch.len() - ); + if !receipts_data.is_empty() { + + debug!( + "Segment {}-{}: Validating and converting work unit {} ({} blocks)", + self.segment_start, + self.segment_end, + work_idx, + receipts_data.len() + ); + + // Validate and convert + let batches = match Self::validate_and_convert_receipts( + self.segment_start, + self.segment_end, + self.config.clone(), + self.validator.clone(), + receipts_data, + ) + .await { + Ok(b) => b, + Err(e) => { + let error_type = Self::categorize_error(&e); + self.metrics.error(&error_type, "logs"); + self.metrics.active_workers_dec("logs"); + yield Err(e); + return; + } + }; + + // Yield batches in order + for batch in batches { + yielded_batches += 1; + yield Ok(BatchWithRange::new(batch, range_start, range_end)); + } + } - let batches = match Self::validate_and_convert_receipts( - self.segment_start, - self.segment_end, - self.config.clone(), - self.validator.clone(), - current_batch, - ) - .await { - Ok(b) => b, + // Update progress + let blocks_processed = range_end - self.segment_start + 1; + let progress_pct = (blocks_processed as f64 / total_blocks as f64) * 100.0; + self.metrics + .set_worker_progress(worker_id, segment_id, "logs", progress_pct); + } Err(e) => { - // Track error type for monitoring let error_type = Self::categorize_error(&e); self.metrics.error(&error_type, "logs"); - self.metrics.active_workers_dec("logs"); yield Err(e); return; } - }; - - // Yield each batch immediately - // Wrap with responsibility range metadata - for batch in batches { - yielded_batches += 1; - yield Ok(BatchWithRange::new(batch, current_block, chunk_end)); } - } - - // Update progress after processing chunk - let blocks_processed = chunk_end - self.segment_start + 1; - let progress_pct = (blocks_processed as f64 / total_blocks as f64) * 100.0; - self.metrics - .set_worker_progress(worker_id, segment_id, "logs", progress_pct); - - // Move to next chunk - current_block = chunk_end + 1; } // Record total receipts processed @@ -807,10 +836,33 @@ impl SegmentWorker { block_headers: Vec<(u64, (Header, i64))>, client: &mut BlockDataClient, metrics: &BridgeMetrics, + execute_blocks_semaphore: Option>, ) -> Result, Header)>, ErigonBridgeError> { let call_start = std::time::Instant::now(); let segment_id = from_block / 500_000; + // Acquire semaphore permit before making ExecuteBlocks call (if configured) + // This limits total concurrent ExecuteBlocks calls across ALL workers + let _permit = if let Some(sem) = execute_blocks_semaphore.as_ref() { + let permit = sem.acquire().await.map_err(|e| { + ErigonBridgeError::Internal(anyhow::anyhow!( + "Failed to acquire ExecuteBlocks semaphore: {}", + e + )) + })?; + + debug!( + "Blocks {}-{}: Acquired ExecuteBlocks permit (available: {})", + from_block, + to_block, + sem.available_permits() + ); + + Some(permit) + } else { + None + }; + // Spawn a monitoring task that warns if the call takes too long const SLOW_CALL_WARNING_THRESHOLD_SECS: u64 = 300; // 5 minutes let monitor_handle = { diff --git a/crates/phaser-query/proto/admin/sync.proto b/crates/phaser-query/proto/admin/sync.proto index 144a994..55f8330 100644 --- a/crates/phaser-query/proto/admin/sync.proto +++ b/crates/phaser-query/proto/admin/sync.proto @@ -141,9 +141,6 @@ message SyncStatusResponse { // Detailed progress metrics DataProgress data_progress = 13; - - // Download rate (bytes per second) - double download_rate_bytes_per_sec = 14; } message ListSyncJobsRequest { @@ -195,37 +192,21 @@ message SyncProgressUpdate { // Detailed progress metrics DataProgress data_progress = 9; - - // Download rate (bytes per second) - double download_rate_bytes_per_sec = 10; } message WorkerProgress { // Worker ID uint32 worker_id = 1; - // Current stage: "blocks", "transactions", "logs" - string stage = 2; - // Block range for this worker - uint64 from_block = 3; - uint64 to_block = 4; - - // Current progress - uint64 current_block = 5; - uint64 blocks_processed = 6; - - // Rate (blocks per second) - double rate = 7; - - // Bytes written - uint64 bytes_written = 8; - - // Number of parquet files created - uint32 files_created = 9; + uint64 from_block = 2; + uint64 to_block = 3; // When this worker started (Unix timestamp in seconds) - int64 started_at = 10; + int64 started_at = 4; + + // Worker status: "running", "completed", "failed" + string status = 5; } message AnalyzeGapsRequest { diff --git a/crates/phaser-query/src/bin/phaser-cli.rs b/crates/phaser-query/src/bin/phaser-cli.rs index 46a51fa..38b3786 100644 --- a/crates/phaser-query/src/bin/phaser-cli.rs +++ b/crates/phaser-query/src/bin/phaser-cli.rs @@ -348,22 +348,6 @@ async fn main() -> Result<()> { } println!("Active workers: {}", job.active_workers); - if job.download_rate_bytes_per_sec > 0.0 { - let rate = if job.download_rate_bytes_per_sec >= 1_000_000_000.0 { - format!( - "{:.2} GB/s", - job.download_rate_bytes_per_sec / 1_000_000_000.0 - ) - } else if job.download_rate_bytes_per_sec >= 1_000_000.0 { - format!("{:.1} MB/s", job.download_rate_bytes_per_sec / 1_000_000.0) - } else if job.download_rate_bytes_per_sec >= 1_000.0 { - format!("{:.1} KB/s", job.download_rate_bytes_per_sec / 1_000.0) - } else { - format!("{:.0} B/s", job.download_rate_bytes_per_sec) - }; - println!("Download rate: {}", rate); - } - if !job.error.is_empty() { println!("Error: {}", job.error); } @@ -608,23 +592,6 @@ async fn main() -> Result<()> { } println!("Active workers: {}", job.active_workers); - // Display download rate - if job.download_rate_bytes_per_sec > 0.0 { - let rate = if job.download_rate_bytes_per_sec >= 1_000_000_000.0 { - format!( - "{:.2} GB/s", - job.download_rate_bytes_per_sec / 1_000_000_000.0 - ) - } else if job.download_rate_bytes_per_sec >= 1_000_000.0 { - format!("{:.1} MB/s", job.download_rate_bytes_per_sec / 1_000_000.0) - } else if job.download_rate_bytes_per_sec >= 1_000.0 { - format!("{:.1} KB/s", job.download_rate_bytes_per_sec / 1_000.0) - } else { - format!("{:.0} B/s", job.download_rate_bytes_per_sec) - }; - println!("Download rate: {}", rate); - } - if !job.error.is_empty() { println!("Error: {}", job.error); } diff --git a/crates/phaser-query/src/generated/phaser.admin.rs b/crates/phaser-query/src/generated/phaser.admin.rs index 4fd5cb5..99ad302 100644 --- a/crates/phaser-query/src/generated/phaser.admin.rs +++ b/crates/phaser-query/src/generated/phaser.admin.rs @@ -129,9 +129,6 @@ pub struct SyncStatusResponse { /// Detailed progress metrics #[prost(message, optional, tag = "13")] pub data_progress: ::core::option::Option, - /// Download rate (bytes per second) - #[prost(double, tag = "14")] - pub download_rate_bytes_per_sec: f64, } #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ListSyncJobsRequest { @@ -193,40 +190,23 @@ pub struct SyncProgressUpdate { /// Detailed progress metrics #[prost(message, optional, tag = "9")] pub data_progress: ::core::option::Option, - /// Download rate (bytes per second) - #[prost(double, tag = "10")] - pub download_rate_bytes_per_sec: f64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct WorkerProgress { /// Worker ID #[prost(uint32, tag = "1")] pub worker_id: u32, - /// Current stage: "blocks", "transactions", "logs" - #[prost(string, tag = "2")] - pub stage: ::prost::alloc::string::String, /// Block range for this worker - #[prost(uint64, tag = "3")] + #[prost(uint64, tag = "2")] pub from_block: u64, - #[prost(uint64, tag = "4")] + #[prost(uint64, tag = "3")] pub to_block: u64, - /// Current progress - #[prost(uint64, tag = "5")] - pub current_block: u64, - #[prost(uint64, tag = "6")] - pub blocks_processed: u64, - /// Rate (blocks per second) - #[prost(double, tag = "7")] - pub rate: f64, - /// Bytes written - #[prost(uint64, tag = "8")] - pub bytes_written: u64, - /// Number of parquet files created - #[prost(uint32, tag = "9")] - pub files_created: u32, /// When this worker started (Unix timestamp in seconds) - #[prost(int64, tag = "10")] + #[prost(int64, tag = "4")] pub started_at: i64, + /// Worker status: "running", "completed", "failed" + #[prost(string, tag = "5")] + pub status: ::prost::alloc::string::String, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnalyzeGapsRequest { diff --git a/crates/phaser-query/src/lib.rs b/crates/phaser-query/src/lib.rs index 19c99b9..3fdb946 100644 --- a/crates/phaser-query/src/lib.rs +++ b/crates/phaser-query/src/lib.rs @@ -210,6 +210,8 @@ pub struct PhaserConfig { pub metrics_port: u16, // Port for Prometheus metrics HTTP server (9091) #[serde(default = "default_sync_parallelism")] pub sync_parallelism: u32, // Number of parallel workers for historical sync (4) + #[serde(default = "default_max_concurrent_log_segments")] + pub max_concurrent_log_segments: u32, // Max segments syncing logs concurrently (16) #[serde(default)] pub parquet: Option, #[serde(default)] @@ -244,6 +246,10 @@ fn default_sync_parallelism() -> u32 { 4 } +fn default_max_concurrent_log_segments() -> u32 { + 16 +} + impl PhaserConfig { pub fn bridge_data_dir(&self, chain_id: u64, bridge_name: &str) -> PathBuf { self.data_root diff --git a/crates/phaser-query/src/sync/error.rs b/crates/phaser-query/src/sync/error.rs index c648f79..39786b5 100644 --- a/crates/phaser-query/src/sync/error.rs +++ b/crates/phaser-query/src/sync/error.rs @@ -87,6 +87,59 @@ pub struct SyncError { pub source: Option>, } +/// Error when multiple data types fail during parallel sync +#[derive(Debug)] +pub struct MultipleDataTypeErrors { + pub from_block: u64, + pub to_block: u64, + pub errors: Vec<(DataType, SyncError)>, +} + +impl fmt::Display for MultipleDataTypeErrors { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Multiple data types failed syncing blocks {}-{}: ", + self.from_block, self.to_block + )?; + for (i, (data_type, err)) in self.errors.iter().enumerate() { + if i > 0 { + write!(f, "; ")?; + } + write!(f, "{}: {}", data_type, err)?; + } + Ok(()) + } +} + +impl std::error::Error for MultipleDataTypeErrors { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + // Return first error as source + self.errors.first().and_then(|(_, e)| e.source()) + } +} + +impl From for SyncError { + fn from(multi_err: MultipleDataTypeErrors) -> Self { + // Aggregate into a single SyncError with Unknown data type + let message = format!("{}", multi_err); + let category = multi_err + .errors + .first() + .map(|(_, e)| e.category) + .unwrap_or(ErrorCategory::Unknown); + + SyncError { + data_type: DataType::Unknown, + category, + from_block: multi_err.from_block, + to_block: multi_err.to_block, + message, + source: Some(Box::new(multi_err)), + } + } +} + impl SyncError { pub fn new( data_type: DataType, diff --git a/crates/phaser-query/src/sync/mod.rs b/crates/phaser-query/src/sync/mod.rs index 3baa65b..130a886 100644 --- a/crates/phaser-query/src/sync/mod.rs +++ b/crates/phaser-query/src/sync/mod.rs @@ -5,6 +5,6 @@ mod service; mod worker; pub use data_scanner::DataScanner; -pub use error::{DataType, ErrorCategory, SyncError}; +pub use error::{DataType, ErrorCategory, MultipleDataTypeErrors, SyncError}; pub use service::SyncServer; pub use worker::SyncWorker; diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 3c8c446..0eaeb81 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -304,6 +304,15 @@ impl SyncServer { job_config.bridge_name.clone(), ); + // Create logs semaphore to sequence log processing across workers + let logs_semaphore = Arc::new(tokio::sync::Semaphore::new( + config.max_logs_segments_sync as usize, + )); + info!( + "Created logs semaphore with capacity {} (max segments syncing logs concurrently)", + config.max_logs_segments_sync + ); + for worker_id in 0..num_workers { let bridge_endpoint = job_config.bridge_endpoint.clone(); let data_dir = data_dir.clone(); @@ -314,6 +323,7 @@ impl SyncServer { let job_complete = job_complete.clone(); let historical_boundary = job_config.historical_boundary; let metrics = sync_metrics.clone(); + let logs_semaphore = logs_semaphore.clone(); let handle: tokio::task::JoinHandle> = tokio::spawn(async move { loop { @@ -377,6 +387,7 @@ impl SyncServer { batch_size: 1000, parquet_config: parquet_config.clone(), validation_stage, + logs_semaphore: logs_semaphore.clone(), }; let mut worker = @@ -786,21 +797,6 @@ impl SyncService for SyncServer { let progress = job.progress_tracker.read().await; let active_workers = progress.len() as u32; - // Calculate download rate from active workers - let now = std::time::SystemTime::now(); - let download_rate_bytes_per_sec = progress - .values() - .filter_map(|p| { - if let Ok(elapsed) = now.duration_since(p.started_at) { - let secs = elapsed.as_secs_f64(); - if secs > 0.0 { - return Some(p.bytes_written as f64 / secs); - } - } - None - }) - .sum(); - let total_blocks = job.to_block - job.from_block + 1; // Calculate detailed progress metrics @@ -824,7 +820,6 @@ impl SyncService for SyncServer { to_block: job.to_block, gap_analysis: Some(gap_analysis_to_proto(&analysis, self.config.segment_size)), data_progress, - download_rate_bytes_per_sec, })) } @@ -883,25 +878,10 @@ impl SyncService for SyncServer { Err(_) => (0, job.from_block, None, None), }; - // Count active workers and calculate download rate + // Count active workers let progress = job.progress_tracker.read().await; let active_workers = progress.len() as u32; - // Calculate download rate from active workers - let now = std::time::SystemTime::now(); - let download_rate_bytes_per_sec = progress - .values() - .filter_map(|p| { - if let Ok(elapsed) = now.duration_since(p.started_at) { - let secs = elapsed.as_secs_f64(); - if secs > 0.0 { - return Some(p.bytes_written as f64 / secs); - } - } - None - }) - .sum(); - job_list.push(SyncStatusResponse { job_id: job.job_id.clone(), status: job.status, @@ -916,7 +896,6 @@ impl SyncService for SyncServer { to_block: job.to_block, gap_analysis, data_progress, - download_rate_bytes_per_sec, }); } @@ -925,33 +904,9 @@ impl SyncService for SyncServer { async fn cancel_sync( &self, - request: Request, + _request: Request, ) -> Result, Status> { - let req = request.into_inner(); - - let mut jobs = self.jobs.write().await; - let job = jobs - .get_mut(&req.job_id) - .ok_or_else(|| Status::not_found(format!("Job {} not found", req.job_id)))?; - - // Check if job can be cancelled - if job.status == SyncStatus::Completed as i32 - || job.status == SyncStatus::Failed as i32 - || job.status == SyncStatus::Cancelled as i32 - { - return Ok(Response::new(CancelSyncResponse { - success: false, - message: format!("Job {} cannot be cancelled (already finished)", req.job_id), - })); - } - - // TODO: Actually cancel the running workers - job.status = SyncStatus::Cancelled as i32; - - Ok(Response::new(CancelSyncResponse { - success: true, - message: format!("Job {} cancelled", req.job_id), - })) + todo!("unimplemented - cancel sync job") } async fn analyze_gaps( @@ -1082,21 +1037,6 @@ impl SyncService for SyncServer { // Read worker progress for UX visibility let progress_lock = job.progress_tracker.read().await; - // Calculate download rate from active workers - let now = std::time::SystemTime::now(); - let download_rate_bytes_per_sec = progress_lock - .values() - .filter_map(|p| { - if let Ok(elapsed) = now.duration_since(p.started_at) { - let secs = elapsed.as_secs_f64(); - if secs > 0.0 { - return Some(p.bytes_written as f64 / secs); - } - } - None - }) - .sum(); - let workers: Vec = progress_lock .values() .map(|p| { @@ -1105,24 +1045,12 @@ impl SyncService for SyncServer { .unwrap_or_default() .as_secs() as i64; - let elapsed = p.started_at.elapsed().unwrap_or_default().as_secs(); - let rate = if elapsed > 0 { - p.blocks_processed as f64 / elapsed as f64 - } else { - 0.0 - }; - WorkerProgress { worker_id: p.worker_id, - stage: p.current_phase.clone(), from_block: p.from_block, to_block: p.to_block, - current_block: p.current_block, - blocks_processed: p.blocks_processed, - rate, - bytes_written: p.bytes_written, - files_created: p.files_created, started_at, + status: p.current_phase.clone(), } }) .collect(); @@ -1140,7 +1068,6 @@ impl SyncService for SyncServer { overall_rate: 0.0, total_bytes_written: 0, data_progress, - download_rate_bytes_per_sec, }) } else { None diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index 5edfe21..b3de868 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -71,6 +71,7 @@ pub struct SyncWorkerConfig { pub batch_size: u32, pub parquet_config: Option, pub validation_stage: ValidationStage, + pub logs_semaphore: Arc, } /// A worker that syncs a specific block range from erigon-bridge @@ -89,6 +90,7 @@ pub struct SyncWorker { validation_stage: ValidationStage, segment_work: crate::sync::data_scanner::SegmentWork, // Pre-computed missing ranges current_progress: Arc>, // Real-time progress state + logs_semaphore: Arc, } impl SyncWorker { @@ -147,6 +149,7 @@ impl SyncWorker { validation_stage: config.validation_stage, segment_work, current_progress, + logs_semaphore: config.logs_semaphore, } } @@ -230,10 +233,6 @@ impl SyncWorker { let missing_txs = self.segment_work.missing_transactions.clone(); let missing_logs = self.segment_work.missing_logs.clone(); - let blocks_complete = missing_blocks.is_empty(); - let txs_complete = missing_txs.is_empty(); - let logs_complete = missing_logs.is_empty(); - info!( "Worker {} segment work: blocks={} ranges, txs={} ranges, logs={} ranges", self.worker_id, @@ -242,26 +241,63 @@ impl SyncWorker { missing_logs.len() ); - // Initialize progress - self.update_progress(ProgressUpdate { - phase: "connecting".to_string(), - blocks_done: blocks_complete, - txs_done: txs_complete, - logs_done: logs_complete, - current_block: self.from_block, - blocks_processed: 0, - bytes_written: 0, - files_created: 0, - }) - .await; + // Spawn all data types in parallel + let blocks_fut = self.sync_all_blocks(missing_blocks); + let txs_fut = self.sync_all_transactions(missing_txs); + let logs_fut = self.sync_all_logs(missing_logs); + + // Wait for all to complete and collect results + let (blocks_res, txs_res, logs_res) = tokio::join!(blocks_fut, txs_fut, logs_fut); + + // Collect all errors + let mut errors = Vec::new(); + if let Err(e) = blocks_res { + errors.push((DataType::Blocks, e)); + } + if let Err(e) = txs_res { + errors.push((DataType::Transactions, e)); + } + if let Err(e) = logs_res { + errors.push((DataType::Logs, e)); + } + + // If any failed, return MultipleDataTypeErrors + if !errors.is_empty() { + let multi_err = crate::sync::error::MultipleDataTypeErrors { + from_block: self.from_block, + to_block: self.to_block, + errors, + }; + + error!( + "Worker {} failed with {} data type error(s): {}", + self.worker_id, + multi_err.errors.len(), + multi_err + ); + + return Err(multi_err.into()); + } + + info!("Worker {} completed sync successfully", self.worker_id); + Ok(()) + } - // Connect to bridge via Arrow Flight + /// Sync all blocks ranges + async fn sync_all_blocks( + &self, + missing_blocks: Vec, + ) -> Result<(), SyncError> { + if missing_blocks.is_empty() { + return Ok(()); + } + + // Connect to bridge let mut client = FlightBridgeClient::connect(self.bridge_endpoint.clone()) .await .map_err(|e| { - // Don't know exact data type yet, use Unknown with block range SyncError::from_anyhow_with_context( - DataType::Unknown, + DataType::Blocks, self.from_block, self.to_block, "Failed to connect to bridge", @@ -269,121 +305,118 @@ impl SyncWorker { ) })?; - info!("Worker {} connected to bridge", self.worker_id); - - let mut total_batches = 0u64; - let mut total_bytes = 0u64; - let mut files_created = 0u32; - - // Sync missing blocks ranges - if !blocks_complete { - self.update_stage("blocks").await; - self.update_progress(ProgressUpdate { - phase: "blocks".to_string(), - blocks_done: false, - txs_done: txs_complete, - logs_done: logs_complete, - current_block: self.from_block, - blocks_processed: total_batches, - bytes_written: total_bytes, - files_created, - }) - .await; - - for range in &missing_blocks { - info!( - "Worker {} syncing blocks {}-{}", - self.worker_id, range.start, range.end - ); - let (batches, bytes) = self - .sync_blocks_range(&mut client, range.start, range.end) - .await?; - total_batches += batches; - total_bytes += bytes; - files_created += 1; - } + for range in &missing_blocks { + info!( + "Worker {} syncing blocks {}-{}", + self.worker_id, range.start, range.end + ); + self.sync_blocks_range(&mut client, range.start, range.end) + .await?; } - // Sync missing transaction ranges - if !txs_complete { - self.update_stage("transactions").await; - self.update_progress(ProgressUpdate { - phase: "transactions".to_string(), - blocks_done: true, - txs_done: false, - logs_done: logs_complete, - current_block: self.to_block, - blocks_processed: total_batches, - bytes_written: total_bytes, - files_created, - }) - .await; - - for range in &missing_txs { - info!( - "Worker {} syncing transactions {}-{}", - self.worker_id, range.start, range.end - ); - let (batches, bytes) = self - .sync_transactions_range(&mut client, range.start, range.end) - .await?; - total_batches += batches; - total_bytes += bytes; - files_created += 1; - } + Ok(()) + } + + /// Sync all transactions ranges + async fn sync_all_transactions( + &self, + missing_txs: Vec, + ) -> Result<(), SyncError> { + if missing_txs.is_empty() { + return Ok(()); } - // Sync missing log ranges - if !logs_complete { - self.update_stage("logs").await; - self.update_progress(ProgressUpdate { - phase: "logs".to_string(), - blocks_done: true, - txs_done: true, - logs_done: false, - current_block: self.to_block, - blocks_processed: total_batches, - bytes_written: total_bytes, - files_created, - }) - .await; - - for range in &missing_logs { - info!( - "Worker {} syncing logs {}-{}", - self.worker_id, range.start, range.end - ); - let (batches, bytes) = self - .sync_logs_range(&mut client, range.start, range.end) - .await?; - total_batches += batches; - total_bytes += bytes; - files_created += 1; - } + // Connect to bridge + let mut client = FlightBridgeClient::connect(self.bridge_endpoint.clone()) + .await + .map_err(|e| { + SyncError::from_anyhow_with_context( + DataType::Transactions, + self.from_block, + self.to_block, + "Failed to connect to bridge", + e, + ) + })?; + + for range in &missing_txs { + info!( + "Worker {} syncing transactions {}-{}", + self.worker_id, range.start, range.end + ); + self.sync_transactions_range(&mut client, range.start, range.end) + .await?; } - self.update_progress(ProgressUpdate { - phase: "completed".to_string(), - blocks_done: true, - txs_done: true, - logs_done: true, - current_block: self.to_block, - blocks_processed: total_batches, - bytes_written: total_bytes, - files_created, - }) - .await; + Ok(()) + } + + /// Sync all logs ranges + async fn sync_all_logs( + &self, + missing_logs: Vec, + ) -> Result<(), SyncError> { + if missing_logs.is_empty() { + return Ok(()); + } + + // Acquire 1 permit for this entire segment + // This limits how many segments can be processing logs concurrently + info!( + "Worker {} acquiring log semaphore permit for segment (blocks {}-{})", + self.worker_id, self.from_block, self.to_block + ); + + let _permit = self.logs_semaphore.clone() + .acquire_owned() + .await + .map_err(|e| { + SyncError::from_anyhow_with_context( + DataType::Logs, + self.from_block, + self.to_block, + "Failed to acquire log semaphore permit for segment", + anyhow::anyhow!("{}", e), + ) + })?; + + info!( + "Worker {} acquired permit, starting log sync for {} ranges", + self.worker_id, missing_logs.len() + ); + + // Connect to bridge + let mut client = FlightBridgeClient::connect(self.bridge_endpoint.clone()) + .await + .map_err(|e| { + SyncError::from_anyhow_with_context( + DataType::Logs, + self.from_block, + self.to_block, + "Failed to connect to bridge", + e, + ) + })?; + + // Process all ranges sequentially while holding all permits + for range in &missing_logs { + info!( + "Worker {} syncing logs {}-{}", + self.worker_id, range.start, range.end + ); + self.sync_logs_range(&mut client, range.start, range.end) + .await?; + } - info!("Worker {} completed sync successfully", self.worker_id); Ok(()) } async fn sync_blocks_range( - &mut self, + &self, client: &mut FlightBridgeClient, from_block: u64, to_block: u64, - ) -> Result<(u64, u64), SyncError> { + ) -> Result<(), SyncError> { // Track phase self.metrics.active_workers_inc("blocks"); let metrics = self.metrics.clone(); @@ -416,12 +449,12 @@ impl SyncWorker { let mut resume_from = from_block; let mut last_resume_from = resume_from; - let (batches_processed, bytes_written) = loop { + loop { match self .try_sync_blocks_stream(client, resume_from, to_block, &mut writer, from_block) .await { - Ok(result) => break result, + Ok(()) => break, Err(e) if is_transient_error(&e) => { // Get the last block we successfully wrote let last_written = writer.last_written_block(); @@ -465,7 +498,7 @@ impl SyncWorker { "Worker {} completed blocks {}-{} before stream error", self.worker_id, from_block, to_block ); - break (0, 0); // Already finalized + break; // Already finalized } retry_count += 1; @@ -491,17 +524,17 @@ impl SyncWorker { } }; - Ok((batches_processed, bytes_written)) + Ok(()) } async fn try_sync_blocks_stream( - &mut self, + &self, client: &mut FlightBridgeClient, from_block: u64, to_block: u64, writer: &mut ParquetWriter, original_from: u64, - ) -> Result<(u64, u64), SyncError> { + ) -> Result<(), SyncError> { // Create historical query descriptor with large message preferences use phaser_bridge::descriptors::StreamPreferences; let preferences = StreamPreferences { @@ -649,15 +682,15 @@ impl SyncWorker { // Treat as success with no data } - Ok((batches_processed, bytes_written)) + Ok(()) } async fn sync_transactions_range( - &mut self, + &self, client: &mut FlightBridgeClient, from_block: u64, to_block: u64, - ) -> Result<(u64, u64), SyncError> { + ) -> Result<(), SyncError> { // Track phase self.metrics.active_workers_inc("transactions"); let metrics = self.metrics.clone(); @@ -722,7 +755,7 @@ impl SyncWorker { let mut resume_from = from_block; let mut last_resume_from = resume_from; - let (batches_processed, bytes_written) = loop { + loop { match self .try_sync_transactions_stream( client, @@ -734,7 +767,7 @@ impl SyncWorker { ) .await { - Ok(result) => break result, + Ok(()) => break, Err(e) if is_transient_error(&e) => { // Get the last block we successfully wrote let last_written = writer.last_written_block(); @@ -778,7 +811,7 @@ impl SyncWorker { "Worker {} completed transactions {}-{} before stream error", self.worker_id, from_block, to_block ); - break (0, 0); // Already finalized + break; // Already finalized } retry_count += 1; @@ -804,18 +837,18 @@ impl SyncWorker { } }; - Ok((batches_processed, bytes_written)) + Ok(()) } async fn try_sync_transactions_stream( - &mut self, + &self, client: &mut FlightBridgeClient, from_block: u64, to_block: u64, writer: &mut ParquetWriter, proof_writer: &mut Option, _original_from: u64, - ) -> Result<(u64, u64), SyncError> { + ) -> Result<(), SyncError> { // Create historical query descriptor for transactions with configured validation stage use phaser_bridge::descriptors::StreamPreferences; let preferences = StreamPreferences { @@ -1024,7 +1057,7 @@ impl SyncWorker { // Treat as success with no data } - Ok((batches_processed, bytes_written)) + Ok(()) } fn generate_proofs_for_batch( @@ -1085,11 +1118,11 @@ impl SyncWorker { } async fn sync_logs_range( - &mut self, + &self, client: &mut FlightBridgeClient, from_block: u64, to_block: u64, - ) -> Result<(u64, u64), SyncError> { + ) -> Result<(), SyncError> { // Track phase self.metrics.active_workers_inc("logs"); let metrics = self.metrics.clone(); @@ -1122,12 +1155,12 @@ impl SyncWorker { let mut resume_from = from_block; let mut last_resume_from = resume_from; - let (batches_processed, bytes_written) = loop { + loop { match self .try_sync_logs_stream(client, resume_from, to_block, &mut writer, from_block) .await { - Ok(result) => break result, + Ok(()) => break, Err(e) if is_transient_error(&e) => { // Get the last block we successfully wrote let last_written = writer.last_written_block(); @@ -1171,7 +1204,7 @@ impl SyncWorker { "Worker {} completed logs {}-{} before stream error", self.worker_id, from_block, to_block ); - break (0, 0); // Already finalized + break; // Already finalized } retry_count += 1; @@ -1197,17 +1230,17 @@ impl SyncWorker { } }; - Ok((batches_processed, bytes_written)) + Ok(()) } async fn try_sync_logs_stream( - &mut self, + &self, client: &mut FlightBridgeClient, from_block: u64, to_block: u64, writer: &mut ParquetWriter, _original_from: u64, - ) -> Result<(u64, u64), SyncError> { + ) -> Result<(), SyncError> { // Create historical query descriptor for logs with configured validation stage use phaser_bridge::descriptors::StreamPreferences; let preferences = StreamPreferences { @@ -1355,6 +1388,6 @@ impl SyncWorker { // Treat as success with no data } - Ok((batches_processed, bytes_written)) + Ok(()) } } From a17f4bd301ed92d5f25b7139bd60e3fd43b7d1b4 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 10 Nov 2025 19:50:19 -0800 Subject: [PATCH 2/5] Update sync service to use max_concurrent_log_segments - Use renamed config field when creating logs semaphore - Update comment to reflect "limiting" rather than "sequencing" - Log message shows semaphore capacity for observability --- crates/phaser-query/src/sync/service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 0eaeb81..9e78220 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -304,13 +304,13 @@ impl SyncServer { job_config.bridge_name.clone(), ); - // Create logs semaphore to sequence log processing across workers + // Create logs semaphore to limit concurrent log segment processing let logs_semaphore = Arc::new(tokio::sync::Semaphore::new( - config.max_logs_segments_sync as usize, + config.max_concurrent_log_segments as usize, )); info!( "Created logs semaphore with capacity {} (max segments syncing logs concurrently)", - config.max_logs_segments_sync + config.max_concurrent_log_segments ); for worker_id in 0..num_workers { From 89da4535112272615e9c612bc552a2a2a59863bb Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 10 Nov 2025 19:50:19 -0800 Subject: [PATCH 3/5] Update worker semaphore usage and fix clippy warnings Semaphore changes: - Format semaphore acquire chain for readability - Permit is held for entire log sync operation per segment Clippy/dead_code fixes: - Rename bytes_written to _bytes_written (unused but kept for future use) - Add #[allow(dead_code)] to ProgressUpdate struct (legacy tracking code) - Add #[allow(dead_code)] to update_stage and update_progress methods (may be used in future for enhanced progress tracking) --- crates/phaser-query/src/sync/worker.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index b3de868..788dabe 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -48,6 +48,7 @@ pub struct WorkerProgress { pub type ProgressTracker = Arc>>; /// Progress update data for tracking worker state +#[allow(dead_code)] pub struct ProgressUpdate { pub phase: String, pub blocks_done: bool, @@ -187,11 +188,13 @@ impl SyncWorker { } /// Update the current stage + #[allow(dead_code)] async fn update_stage(&self, stage: &str) { let mut progress = self.current_progress.write().await; progress.current_phase = stage.to_string(); } + #[allow(dead_code)] async fn update_progress(&self, update: ProgressUpdate) { if let Some(tracker) = &self.progress_tracker { let mut tracker_lock = tracker.write().await; @@ -367,7 +370,9 @@ impl SyncWorker { self.worker_id, self.from_block, self.to_block ); - let _permit = self.logs_semaphore.clone() + let _permit = self + .logs_semaphore + .clone() .acquire_owned() .await .map_err(|e| { @@ -382,7 +387,8 @@ impl SyncWorker { info!( "Worker {} acquired permit, starting log sync for {} ranges", - self.worker_id, missing_logs.len() + self.worker_id, + missing_logs.len() ); // Connect to bridge @@ -522,7 +528,7 @@ impl SyncWorker { } Err(e) => return Err(e), } - }; + } Ok(()) } @@ -561,7 +567,7 @@ impl SyncWorker { let mut stream = Box::pin(stream); let mut batches_processed = 0u64; - let mut bytes_written = 0u64; + let mut _bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; let mut max_responsibility_end: Option = None; @@ -835,7 +841,7 @@ impl SyncWorker { } Err(e) => return Err(e), } - }; + } Ok(()) } @@ -878,7 +884,7 @@ impl SyncWorker { let mut stream = Box::pin(stream); let mut batches_processed = 0u64; - let mut bytes_written = 0u64; + let mut _bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; let mut max_responsibility_end: Option = None; @@ -1228,7 +1234,7 @@ impl SyncWorker { } Err(e) => return Err(e), } - }; + } Ok(()) } @@ -1268,7 +1274,7 @@ impl SyncWorker { let mut stream = Box::pin(stream); let mut batches_processed = 0u64; - let mut bytes_written = 0u64; + let mut _bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; let mut max_responsibility_end: Option = None; From d9507c3a70c418184811afe1a6c0f4a270b0a57d Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 10 Nov 2025 19:50:19 -0800 Subject: [PATCH 4/5] Format erigon-bridge segment_worker Debug impl Apply rustfmt formatting to multi-line field call. --- crates/bridges/evm/erigon-bridge/src/segment_worker.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs index 5df9394..ed326ad 100644 --- a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs +++ b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs @@ -59,7 +59,10 @@ impl std::fmt::Debug for SegmentConfig { .field("validation_batch_size", &self.validation_batch_size) .field("max_concurrent_executions", &self.max_concurrent_executions) .field("global_max_execute_blocks", &self.global_max_execute_blocks) - .field("execute_blocks_semaphore", &self.execute_blocks_semaphore.is_some()) + .field( + "execute_blocks_semaphore", + &self.execute_blocks_semaphore.is_some(), + ) .finish() } } From d7670c7d425dc0e9225f3d8e8e5dec134508e400 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Tue, 11 Nov 2025 08:13:09 -0800 Subject: [PATCH 5/5] Remove unused bytes_written --- crates/phaser-query/src/sync/worker.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index 788dabe..811e78e 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -567,7 +567,6 @@ impl SyncWorker { let mut stream = Box::pin(stream); let mut batches_processed = 0u64; - let mut _bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; let mut max_responsibility_end: Option = None; @@ -631,8 +630,6 @@ impl SyncWorker { ) })?; - bytes_written += batch_bytes; - // Update progress with actual disk bytes self.update_current_progress(batch_bytes).await; @@ -884,7 +881,6 @@ impl SyncWorker { let mut stream = Box::pin(stream); let mut batches_processed = 0u64; - let mut _bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; let mut max_responsibility_end: Option = None; @@ -983,8 +979,6 @@ impl SyncWorker { ) })?; - bytes_written += batch_bytes; - // Update progress with actual disk bytes self.update_current_progress(batch_bytes).await; @@ -992,14 +986,13 @@ impl SyncWorker { } info!( - "Worker {} received {} batches for transactions {}-{} (first_block: {:?}, last_block: {:?}, bytes: {})", + "Worker {} received {} batches for transactions {}-{} (first_block: {:?}, last_block: {:?})", self.worker_id, batches_processed, from_block, to_block, first_block_seen, - last_block_seen, - bytes_written + last_block_seen ); // Validate we got data within the expected range @@ -1274,7 +1267,6 @@ impl SyncWorker { let mut stream = Box::pin(stream); let mut batches_processed = 0u64; - let mut _bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; let mut max_responsibility_end: Option = None; @@ -1336,8 +1328,6 @@ impl SyncWorker { ) })?; - bytes_written += batch_bytes; - // Update progress with actual disk bytes self.update_current_progress(batch_bytes).await;