diff --git a/crates/bridges/evm/erigon-bridge/src/bridge.rs b/crates/bridges/evm/erigon-bridge/src/bridge.rs index 516098b..e1d3829 100644 --- a/crates/bridges/evm/erigon-bridge/src/bridge.rs +++ b/crates/bridges/evm/erigon-bridge/src/bridge.rs @@ -25,6 +25,7 @@ use crate::segment_worker::{split_into_segments, SegmentConfig, SegmentWorker}; use crate::streaming_service::StreamingService; use crate::trie_client::TrieClient; use crate::trie_converter; +use phaser_metrics::SegmentMetrics; use std::sync::Arc; use tonic::Status as TonicStatus; @@ -122,15 +123,21 @@ impl ErigonFlightBridge { }) } - pub fn bridge_info(&self) -> BridgeInfo { + pub async fn bridge_info(&self) -> BridgeInfo { + // Query current block from Erigon + let current_block = { + let mut client = self.client.lock().await; + client.get_latest_block().await.unwrap_or(0) + }; + BridgeInfo { name: "erigon-bridge".to_string(), node_type: "erigon".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), chain_id: self.chain_id, capabilities: vec!["streaming".to_string()], - current_block: 0, // Would need to query this from Erigon - oldest_block: 0, // Would need to query this from Erigon + current_block, + oldest_block: 0, // Could query this from Erigon if needed } } @@ -545,6 +552,7 @@ impl ErigonFlightBridge { let blockdata_pool = self.blockdata_pool.clone(); let batch_size = self.segment_config.validation_batch_size; + let metrics = self.metrics.clone(); let stream = async_stream::stream! { match stream_type { @@ -621,8 +629,14 @@ impl ErigonFlightBridge { } Err(e) => { error!("Failed to receive block batch: {}", e); + + // Convert tonic::Status to ErigonBridgeError for proper categorization + let bridge_err = ErigonBridgeError::from(e); + let error_type = crate::segment_worker::SegmentWorker::categorize_error(&bridge_err); + metrics.error(&error_type, "blocks"); + yield Err(arrow_flight::error::FlightError::ExternalError( - Box::new(std::io::Error::other(e.to_string())) + Box::new(std::io::Error::other(bridge_err.to_string())) )); break; } @@ -674,7 +688,7 @@ impl ErigonFlightBridge { #[async_trait] impl FlightBridge for ErigonFlightBridge { async fn get_info(&self) -> Result { - Ok(self.bridge_info()) + Ok(self.bridge_info().await) } async fn get_capabilities(&self) -> Result { @@ -698,7 +712,7 @@ impl FlightBridge for ErigonFlightBridge { // Simple handshake - return bridge info let response = HandshakeResponse { protocol_version: 1, - payload: serde_json::to_vec(&self.bridge_info()) + payload: serde_json::to_vec(&self.bridge_info().await) .map_err(|e| Status::internal(format!("Failed to serialize bridge info: {}", e)))? .into(), }; @@ -882,6 +896,8 @@ impl FlightBridge for ErigonFlightBridge { }; let schema = Self::get_schema_for_type(stream_type); + let metrics_for_stream = self.metrics.clone(); + let stream_type_for_metrics = stream_type; // Compression is handled at the gRPC transport level via tonic // (configured in main.rs with accept_compressed/send_compressed) @@ -900,11 +916,11 @@ impl FlightBridge for ErigonFlightBridge { while let Some(batch_result) = batch_stream.next().await { match batch_result { Ok(batch_with_range) => { - // Encode the responsibility range metadata - let metadata = match batch_with_range.encode_range_metadata() { + // Encode the batch metadata (responsibility range) + let metadata = match batch_with_range.encode_metadata() { Ok(m) => m, Err(e) => { - error!("Failed to encode range metadata: {}", e); + error!("Failed to encode batch metadata: {}", e); yield Err(Status::internal(format!("Metadata encoding error: {}", e))); continue; } @@ -936,6 +952,30 @@ impl FlightBridge for ErigonFlightBridge { } Err(e) => { error!("Error in batch stream: {}", e); + + // Categorize error from string (since FlightError wraps the original error as string) + let err_str = e.to_string().to_lowercase(); + let error_type = if err_str.contains("timeout") || err_str.contains("timed out") { + "timeout" + } else if err_str.contains("header not found") || err_str.contains("block not found") || err_str.contains("not found") { + "not_found" + } else if err_str.contains("connection") || err_str.contains("connect") { + "connection" + } else if err_str.contains("unavailable") { + "unavailable" + } else { + "unknown" + }; + + let data_type = match stream_type_for_metrics { + StreamType::Blocks => "blocks", + StreamType::Transactions => "transactions", + StreamType::Logs => "logs", + StreamType::Trie => "trie", + }; + + metrics_for_stream.error(error_type, data_type); + yield Err(Status::internal(format!("Stream error: {}", e))); } } diff --git a/crates/bridges/evm/erigon-bridge/src/main.rs b/crates/bridges/evm/erigon-bridge/src/main.rs index 3846402..87cd31c 100644 --- a/crates/bridges/evm/erigon-bridge/src/main.rs +++ b/crates/bridges/evm/erigon-bridge/src/main.rs @@ -197,9 +197,9 @@ async fn main() -> Result<()> { // Create the Flight server let flight_server = FlightBridgeServer::new(bridge); - // Configure global maximum message size (128MB) + // Configure global maximum message size (256MB) // Per-stream limits are negotiated via StreamPreferences - const MAX_MESSAGE_SIZE: usize = 128 * 1024 * 1024; + const MAX_MESSAGE_SIZE: usize = 256 * 1024 * 1024; // Configure compression based on CLI flag let mut flight_service = flight_server diff --git a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs index 8771aee..2ad977c 100644 --- a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs +++ b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs @@ -185,6 +185,11 @@ impl SegmentWorker { Err(e) => { error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}", worker_id, segment_id, current_block, chunk_end, e); + + // Track error type for monitoring + let error_type = Self::categorize_error(&e); + self.metrics.error(&error_type, "blocks"); + self.metrics.active_workers_dec("blocks"); self.metrics.segment_attempt(false); yield Err(e); @@ -241,10 +246,13 @@ impl SegmentWorker { worker_id, segment_id, start_block, end_block ); + let request_start = Instant::now(); let mut tx_stream = client.stream_transactions(start_block, end_block, self.config.validation_batch_size as u32).await.map_err(|e| { error!("Worker {} segment {}: Failed to create transaction stream: {}", worker_id, segment_id, e); e })?; + let duration_ms = request_start.elapsed().as_millis() as f64; + self.metrics.grpc_request_duration_transactions(segment_id, "stream_transactions", duration_ms); let mut all_transactions = Vec::new(); let mut batch_count = 0u64; @@ -335,6 +343,10 @@ impl SegmentWorker { .await { Ok(batch) => batch, Err(e) => { + // Track error type for monitoring + let error_type = Self::categorize_error(&e); + self.metrics.error(&error_type, "transactions"); + // Clean up metrics before error return self.metrics.active_workers_dec("transactions"); self.metrics.segment_attempt(false); @@ -346,6 +358,10 @@ impl SegmentWorker { // Yield this batch immediately - no buffering! // Wrap with responsibility range metadata yielded_batches += 1; + info!( + "BRIDGE YIELD: Worker {} segment {}: Yielding transaction batch {}, blocks {}-{}", + worker_id, segment_id, yielded_batches, start_block, end_block + ); yield Ok(BatchWithRange::new(arrow_batch, start_block, end_block)); } @@ -365,6 +381,7 @@ impl SegmentWorker { segment_id, total_blocks ); + self.metrics.error("empty_stream", "transactions"); self.metrics.active_workers_dec("transactions"); self.metrics .set_worker_stage(worker_id, segment_id, WorkerStage::Idle); @@ -441,6 +458,11 @@ impl SegmentWorker { Err(e) => { error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}", worker_id, segment_id, current_block, chunk_end, e); + + // Track error type for monitoring + let error_type = Self::categorize_error(&e); + self.metrics.error(&error_type, "logs"); + self.metrics.active_workers_dec("logs"); yield Err(e); return; @@ -461,9 +483,6 @@ impl SegmentWorker { let mut sorted_chunk: Vec<_> = chunk_headers.into_iter().collect(); sorted_chunk.sort_by_key(|(block_num, _)| *block_num); - // Batch blocks into sub-ranges for receipt fetching - // Instead of 1 request per block, do 1 request per 10-20 blocks - // This reduces gRPC overhead while maintaining good parallelism const RECEIPT_BATCH_SIZE: usize = 10; let receipt_futures: Vec<_> = sorted_chunk @@ -504,6 +523,10 @@ impl SegmentWorker { current_batch.extend(batch_data); } Err(e) => { + // Track error type for monitoring + let error_type = Self::categorize_error(&e); + self.metrics.error(&error_type, "logs"); + self.metrics.active_workers_dec("logs"); yield Err(e); return; @@ -530,6 +553,10 @@ impl SegmentWorker { .await { Ok(b) => b, Err(e) => { + // Track error type for monitoring + let error_type = Self::categorize_error(&e); + self.metrics.error(&error_type, "logs"); + self.metrics.active_workers_dec("logs"); yield Err(e); return; @@ -567,6 +594,7 @@ impl SegmentWorker { segment_id, total_blocks ); + self.metrics.error("empty_stream", "logs"); self.metrics.active_workers_dec("logs"); yield Err(ErigonBridgeError::StreamProtocol(StreamError::ZeroBatchesConsumed { start: self.segment_start, @@ -603,6 +631,15 @@ impl SegmentWorker { batch_size: u32, metrics: &BridgeMetrics, ) -> Result, ErigonBridgeError> { + info!( + "ERIGON REQUEST: Requesting blocks {}-{} from Erigon (expecting {} blocks)", + segment_start, + segment_end, + segment_end - segment_start + 1 + ); + + let segment_id = segment_start / 500_000; + let request_start = Instant::now(); let mut block_stream = client .stream_blocks(segment_start, segment_end, batch_size) .await @@ -613,6 +650,8 @@ impl SegmentWorker { ); e })?; + let duration_ms = request_start.elapsed().as_millis() as f64; + metrics.grpc_request_duration_blocks(segment_id, "stream_blocks", duration_ms); // Track active gRPC stream metrics.grpc_stream_inc("blocks"); @@ -668,13 +707,28 @@ impl SegmentWorker { segment_start, segment_end, batch_count ); } else { - debug!( - "Blocks {}-{}: Header stream completed, fetched {} headers from {} batches", - segment_start, - segment_end, - headers.len(), - batch_count - ); + // Calculate actual range received + let block_numbers: Vec = headers.keys().copied().collect(); + let min_received = block_numbers.iter().min().copied().unwrap_or(0); + let max_received = block_numbers.iter().max().copied().unwrap_or(0); + let expected_count = segment_end - segment_start + 1; + let missing_count = expected_count - headers.len() as u64; + + if missing_count > 0 { + warn!( + "ERIGON RESPONSE: Blocks {}-{}: Received {} headers (expected {}), actual range {}-{}, MISSING {} blocks", + segment_start, segment_end, headers.len(), expected_count, min_received, max_received, missing_count + ); + } else { + info!( + "ERIGON RESPONSE: Blocks {}-{}: Received {} headers (complete), range {}-{}", + segment_start, + segment_end, + headers.len(), + min_received, + max_received + ); + } } Ok(headers) @@ -755,6 +809,7 @@ impl SegmentWorker { metrics: &BridgeMetrics, ) -> Result, Header)>, ErigonBridgeError> { let call_start = std::time::Instant::now(); + let segment_id = from_block / 500_000; // Spawn a monitoring task that warns if the call takes too long const SLOW_CALL_WARNING_THRESHOLD_SECS: u64 = 300; // 5 minutes @@ -773,6 +828,7 @@ impl SegmentWorker { }) }; + let request_start = Instant::now(); let mut receipt_stream = client .execute_blocks(from_block, to_block, 100) .await @@ -788,6 +844,8 @@ impl SegmentWorker { ); e })?; + let duration_ms = request_start.elapsed().as_millis() as f64; + metrics.grpc_request_duration_logs(segment_id, "execute_blocks", duration_ms); // Cancel the monitor task since the call succeeded monitor_handle.abort(); @@ -964,6 +1022,40 @@ impl SegmentWorker { // Convert directly without intermediate ReceiptBatch struct BlockDataConverter::receipts_to_logs_arrow(all_receipts, ×tamps) } + + /// Categorize error for metrics tracking + pub(crate) fn categorize_error(error: &ErigonBridgeError) -> String { + let err_str = error.to_string(); + let err_lower = err_str.to_lowercase(); + + // Check for known error patterns first + if err_lower.contains("timeout") || err_lower.contains("timed out") { + return "timeout".to_string(); + } else if err_lower.contains("header not found") || err_lower.contains("block not found") { + return "not_found".to_string(); + } else if err_lower.contains("connection") || err_lower.contains("connect") { + return "connection".to_string(); + } else if err_lower.contains("unavailable") { + return "unavailable".to_string(); + } else if err_lower.contains("rlp") || err_lower.contains("decoding") { + return "decode_error".to_string(); + } else if err_lower.contains("validation") { + return "validation".to_string(); + } + + // For unknown errors, include the actual error message (truncated to avoid extremely long labels) + // Take the first part up to first colon, or first 80 chars + let pattern = err_str + .split(':') + .next() + .unwrap_or(&err_str) + .trim() + .chars() + .take(80) + .collect::(); + + format!("unknown:{}", pattern) + } } /// Split a block range into segments diff --git a/crates/phaser-bridge/src/client.rs b/crates/phaser-bridge/src/client.rs index 89f41fd..029e4c0 100644 --- a/crates/phaser-bridge/src/client.rs +++ b/crates/phaser-bridge/src/client.rs @@ -1,9 +1,6 @@ use arrow::datatypes::Schema; use arrow_array::RecordBatch; -use arrow_flight::{ - decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, FlightClient, - FlightInfo, -}; +use arrow_flight::{flight_service_client::FlightServiceClient, FlightClient, FlightInfo}; use futures::stream::StreamExt; use tonic::transport::Channel; use tracing::{debug, error, info}; @@ -79,9 +76,9 @@ impl FlightBridgeClient { Channel::from_shared(uri)?.connect().await? }; - // Configure message size limits (128MB global max for large batches) + // Configure message size limits (256MB global max for large batches) // This allows the client to receive large messages from the bridge - const MAX_MESSAGE_SIZE: usize = 128 * 1024 * 1024; + const MAX_MESSAGE_SIZE: usize = 256 * 1024 * 1024; // Client accepts compression if server sends it // Compression is controlled by the bridge's --compression flag @@ -153,41 +150,25 @@ impl FlightBridgeClient { Ok(batches) } - /// Subscribe to real-time blockchain data - pub async fn subscribe( - &mut self, - descriptor: &BlockchainDescriptor, - ) -> Result { - let ticket = descriptor.to_ticket(); - - info!( - "Subscribing to real-time data from bridge ({})", - descriptor.stream_type.to_string() - ); - let stream = self.client.do_get(ticket).await?; - - // Return the stream for the caller to consume - Ok(stream) - } - - /// Subscribe with access to app_metadata (responsibility ranges) - /// Returns a stream of (RecordBatch, Option<(u64, u64)>) where the tuple is (start_block, end_block) + /// Subscribe with access to batch metadata from FlightData.app_metadata + /// + /// Returns a stream of (RecordBatch, BatchMetadata) tuples. /// /// This method accesses raw FlightData messages to preserve app_metadata that contains - /// responsibility range information (which blocks were processed, even if they had 0 rows). + /// batch metadata including responsibility range information (which blocks were processed, + /// even if the batch contains 0 rows). + /// + /// All batches are required to have metadata - if metadata is missing or invalid, + /// an error is returned. pub async fn subscribe_with_metadata( &mut self, descriptor: &BlockchainDescriptor, ) -> Result< impl futures::Stream< - Item = Result<(RecordBatch, Option<(u64, u64)>), arrow_flight::error::FlightError>, + Item = Result<(RecordBatch, crate::BatchMetadata), arrow_flight::error::FlightError>, >, anyhow::Error, > { - use crate::BatchWithRange; - use arrow_ipc::reader::StreamReader; - use std::io::Cursor; - let ticket = descriptor.to_ticket(); info!( @@ -196,50 +177,60 @@ impl FlightBridgeClient { ); // Access the inner FlightServiceClient to get raw FlightData stream - // We need mutable access to the client let mut inner_client = self.client.inner().clone(); let response = inner_client.do_get(ticket).await?; - let mut flight_data_stream = response.into_inner(); + let flight_data_stream = response.into_inner(); - // Manually decode FlightData to preserve app_metadata + // We need to decode FlightData messages while preserving app_metadata + // FlightRecordBatchStream would handle decoding correctly, but we'd lose app_metadata + // So we'll manually process FlightData using arrow_flight::utils for proper decoding let stream = async_stream::try_stream! { - let mut schema_received = false; - let mut schema_bytes = Vec::new(); + use arrow_ipc::{root_as_message, convert::fb_to_schema}; + use std::sync::Arc; + + let mut flight_data_stream = flight_data_stream; + + let mut schema: Option = None; + let dictionaries_by_id = std::collections::HashMap::new(); while let Some(flight_data_result) = flight_data_stream.next().await { - let flight_data = flight_data_result.map_err(|e| { - arrow_flight::error::FlightError::Tonic(Box::new(e)) - })?; - - // First message contains the schema - if !schema_received { - schema_bytes.extend_from_slice(&flight_data.data_header); - schema_bytes.extend_from_slice(&flight_data.data_body); - schema_received = true; + let flight_data = flight_data_result + .map_err(|e| arrow_flight::error::FlightError::Tonic(Box::new(e)))?; + + // First message should be the schema + if schema.is_none() { + // Decode schema from FlightData header (based on flight_data_to_batches implementation) + let message = root_as_message(&flight_data.data_header[..]) + .map_err(|err| arrow_flight::error::FlightError::DecodeError( + format!("Cannot get root as message: {:?}", err) + ))?; + + let ipc_schema = message + .header_as_schema() + .ok_or_else(|| arrow_flight::error::FlightError::DecodeError( + "First message should be schema".to_string() + ))?; + + schema = Some(Arc::new(fb_to_schema(ipc_schema))); continue; } - // Extract app_metadata (responsibility range) - let responsibility_range = if !flight_data.app_metadata.is_empty() { - BatchWithRange::decode_range_metadata(&flight_data.app_metadata) - } else { - None - }; + // Extract and decode app_metadata (required) + let metadata = crate::BatchMetadata::decode(&flight_data.app_metadata) + .map_err(|e| arrow_flight::error::FlightError::DecodeError( + format!("Failed to decode batch metadata: {}", e) + ))?; - // Decode the RecordBatch from IPC data + // Decode the RecordBatch from FlightData using the schema if !flight_data.data_body.is_empty() { - // Combine header and body into a single buffer for IPC decoding - let mut ipc_data = Vec::new(); - ipc_data.extend_from_slice(&flight_data.data_header); - ipc_data.extend_from_slice(&flight_data.data_body); - - // Decode using Arrow IPC - let cursor = Cursor::new(ipc_data); - let reader = StreamReader::try_new(cursor, None)?; - - for batch_result in reader { - let batch = batch_result?; - yield (batch, responsibility_range); + if let Some(ref schema_ref) = schema { + let batch = arrow_flight::utils::flight_data_to_arrow_batch( + &flight_data, + schema_ref.clone(), + &dictionaries_by_id + )?; + + yield (batch, metadata); } } } diff --git a/crates/phaser-bridge/src/lib.rs b/crates/phaser-bridge/src/lib.rs index f7805ed..73bcb28 100644 --- a/crates/phaser-bridge/src/lib.rs +++ b/crates/phaser-bridge/src/lib.rs @@ -25,6 +25,73 @@ pub struct ResponsibilityRange { pub end_block: u64, } +/// Metadata attached to each batch via FlightData.app_metadata +/// +/// This struct is encoded/decoded from FlightData.app_metadata and can be extended +/// over time with new fields. When adding fields: +/// - Use Option for truly optional metadata +/// - Add required fields carefully (breaks compatibility) +/// - Document the field's purpose and when it was added +/// - Update BatchMetadata::decode() to handle the new field +/// +/// Current usage: +/// - Bridge encodes this into FlightData.app_metadata via BatchMetadata::encode() +/// - Client decodes from FlightData.app_metadata via BatchMetadata::decode() +/// - Returned as part of subscribe_with_metadata() stream items +/// +/// Historical context: +/// - v1: Added responsibility_range (required) for gap detection +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BatchMetadata { + /// Block range this batch is responsible for (which blocks were checked) + /// + /// This range indicates what blocks the bridge processed, even if the batch + /// contains 0 rows. Critical for distinguishing between "unchecked blocks" + /// and "checked but empty blocks" in gap detection. + /// + /// When a bridge processes blocks X to Y, all resulting batches will carry + /// the same responsibility range (X, Y), even if split across multiple + /// FlightData messages due to size limits. + pub responsibility_range: ResponsibilityRange, + // Future fields can be added here: + // /// Compression ratio achieved for this batch (added v2) + // pub compression_ratio: Option, + // + // /// If batch was split, index of this piece (added v2) + // pub split_index: Option, +} + +impl BatchMetadata { + /// Create new metadata with responsibility range + pub fn new(start_block: u64, end_block: u64) -> Self { + Self { + responsibility_range: ResponsibilityRange { + start_block, + end_block, + }, + } + } + + /// Encode metadata to bytes for FlightData.app_metadata + pub fn encode(&self) -> Result, bincode::Error> { + bincode::serialize(self) + } + + /// Decode metadata from FlightData.app_metadata bytes + /// + /// Returns an error if metadata is missing, empty, or invalid. + /// This enforces that all batches from subscribe_with_metadata() must + /// include proper metadata. + pub fn decode(metadata: &[u8]) -> Result> { + if metadata.is_empty() { + return Err("FlightData.app_metadata is empty - bridge must send BatchMetadata".into()); + } + + bincode::deserialize(metadata) + .map_err(|e| format!("Failed to decode BatchMetadata from app_metadata: {}", e).into()) + } +} + /// Wrapper for RecordBatch with responsibility range metadata /// /// The responsibility range indicates which blocks this batch was responsible for processing, @@ -52,24 +119,26 @@ impl BatchWithRange { } } - /// Encode the block range as bytes for app_metadata using bincode + /// Encode as BatchMetadata for app_metadata + pub fn encode_metadata(&self) -> Result, bincode::Error> { + let metadata = BatchMetadata::new(self.start_block, self.end_block); + metadata.encode() + } + + /// Legacy method - use BatchMetadata::encode() instead + #[deprecated(note = "Use encode_metadata() or BatchMetadata::encode() directly")] pub fn encode_range_metadata(&self) -> Result, bincode::Error> { - let range = ResponsibilityRange { - start_block: self.start_block, - end_block: self.end_block, - }; - bincode::serialize(&range) + self.encode_metadata() } - /// Decode block range from app_metadata bytes using bincode - /// - /// Returns None if metadata is invalid or empty + /// Legacy method - use BatchMetadata::decode() instead + #[deprecated(note = "Use BatchMetadata::decode() directly")] pub fn decode_range_metadata(metadata: &[u8]) -> Option<(u64, u64)> { - if metadata.is_empty() { - return None; - } - - let range: ResponsibilityRange = bincode::deserialize(metadata).ok()?; - Some((range.start_block, range.end_block)) + BatchMetadata::decode(metadata).ok().map(|m| { + ( + m.responsibility_range.start_block, + m.responsibility_range.end_block, + ) + }) } } diff --git a/crates/phaser-metrics/src/segment_metrics.rs b/crates/phaser-metrics/src/segment_metrics.rs index a4ac0d3..9396b4c 100644 --- a/crates/phaser-metrics/src/segment_metrics.rs +++ b/crates/phaser-metrics/src/segment_metrics.rs @@ -102,6 +102,9 @@ pub trait SegmentMetrics { pub struct BridgeMetrics { pub base: Arc, grpc_streams_active: IntGaugeVec, + grpc_request_duration_blocks: HistogramVec, + grpc_request_duration_transactions: HistogramVec, + grpc_request_duration_logs: HistogramVec, } impl SegmentMetrics for BridgeMetrics { @@ -351,6 +354,49 @@ impl BridgeMetrics { &["chain_id", "bridge_name", "stream_type"] ) .unwrap(), + grpc_request_duration_blocks: register_histogram_vec!( + format!( + "{}_grpc_request_duration_blocks_milliseconds", + service_name_str + ), + "Duration of individual block fetch requests in milliseconds", + &["chain_id", "bridge_name", "segment_num", "method"], + // 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s, 30s, 1m, 2m, 5m + vec![ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, + 10000.0, 30000.0, 60000.0, 120000.0, 300000.0 + ] + ) + .unwrap(), + grpc_request_duration_transactions: register_histogram_vec!( + format!( + "{}_grpc_request_duration_transactions_milliseconds", + service_name_str + ), + "Duration of individual transaction fetch requests in milliseconds", + &["chain_id", "bridge_name", "segment_num", "method"], + // 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s, 30s, 1m, 2m, 5m, 10m + vec![ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, + 10000.0, 30000.0, 60000.0, 120000.0, 300000.0, 600000.0 + ] + ) + .unwrap(), + grpc_request_duration_logs: register_histogram_vec!( + format!( + "{}_grpc_request_duration_logs_milliseconds", + service_name_str + ), + "Duration of individual log fetch requests in milliseconds", + &["chain_id", "bridge_name", "segment_num", "method"], + // 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s, 30s, 1m, 2m, 5m, 10m, 20m, 30m, 1h + vec![ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, + 10000.0, 30000.0, 60000.0, 120000.0, 300000.0, 600000.0, 1200000.0, 1800000.0, + 3600000.0 + ] + ) + .unwrap(), } } @@ -367,6 +413,52 @@ impl BridgeMetrics { .with_label_values(&[&self.base.chain_id, &self.base.bridge_name, stream_type]) .dec(); } + + /// Record gRPC request duration for blocks in milliseconds + pub fn grpc_request_duration_blocks( + &self, + segment_num: u64, + method: &str, + duration_millis: f64, + ) { + self.grpc_request_duration_blocks + .with_label_values(&[ + &self.base.chain_id, + &self.base.bridge_name, + &segment_num.to_string(), + method, + ]) + .observe(duration_millis); + } + + /// Record gRPC request duration for transactions in milliseconds + pub fn grpc_request_duration_transactions( + &self, + segment_num: u64, + method: &str, + duration_millis: f64, + ) { + self.grpc_request_duration_transactions + .with_label_values(&[ + &self.base.chain_id, + &self.base.bridge_name, + &segment_num.to_string(), + method, + ]) + .observe(duration_millis); + } + + /// Record gRPC request duration for logs in milliseconds + pub fn grpc_request_duration_logs(&self, segment_num: u64, method: &str, duration_millis: f64) { + self.grpc_request_duration_logs + .with_label_values(&[ + &self.base.chain_id, + &self.base.bridge_name, + &segment_num.to_string(), + method, + ]) + .observe(duration_millis); + } } /// Worker processing stages diff --git a/crates/phaser-query/src/parquet_writer.rs b/crates/phaser-query/src/parquet_writer.rs index c402eee..5118b1c 100644 --- a/crates/phaser-query/src/parquet_writer.rs +++ b/crates/phaser-query/src/parquet_writer.rs @@ -34,10 +34,9 @@ struct CurrentFile { writer: ArrowWriter, temp_path: PathBuf, row_count: usize, - start_block: u64, // Actual first block with data - end_block: u64, // Actual last block with data - bytes_written: u64, // Track actual bytes written to disk - responsibility_start: u64, // First block this file is responsible for (may not have data) + start_block: u64, // Actual first block with data + end_block: u64, // Actual last block with data + bytes_written: u64, // Track actual bytes written to disk } impl ParquetWriter { @@ -119,6 +118,22 @@ impl ParquetWriter { } } + /// Update the responsibility end from bridge batch metadata + /// This allows the writer to claim responsibility for blocks beyond where data was found + pub fn update_responsibility_end(&mut self, new_end: u64) { + if let Some((start, current_end)) = self.responsibility_range { + if new_end > current_end { + self.responsibility_range = Some((start, new_end)); + debug!( + "Updated responsibility_end from {} to {} for {}", + current_end, new_end, self.data_type + ); + } + } else { + warn!("update_responsibility_end called but no responsibility_range set"); + } + } + /// Set both segment and responsibility to the same range (for backwards compatibility) pub fn set_block_range(&mut self, start: u64, end: u64) { if end < start { @@ -235,17 +250,6 @@ impl ParquetWriter { let writer = ArrowWriter::try_new(file, schema, Some(props))?; - // Determine responsibility_start based on whether this is the first file in the segment - let responsibility_start = if self.sequence_number == 0 { - // First file in segment - responsible from responsibility range start - self.responsibility_range - .map(|(start, _)| start) - .unwrap_or(block_num) - } else { - // Subsequent file - responsible from where data starts - block_num - }; - self.current_file = Some(CurrentFile { writer, temp_path, @@ -253,7 +257,6 @@ impl ParquetWriter { start_block: block_num, end_block: block_num, bytes_written: 0, - responsibility_start, }); Ok(()) @@ -401,26 +404,19 @@ impl ParquetWriter { // Update Phaser metadata in the file footer if let Some((segment_start, segment_end)) = self.segment_range { // Get responsibility range (defaults to segment range if not specified) - let (_resp_start, resp_end) = self + let (resp_start, resp_end) = self .responsibility_range .unwrap_or((segment_start, segment_end)); - // For responsibility_end: use resp_end if we're at the end of the responsibility range - // Check if current.end_block is at or beyond resp_end-1 (within last block of range) - let responsibility_end = - if current.end_block >= resp_end || current.end_block == resp_end - 1 { - resp_end - } else { - current.end_block - }; - + // Always use the full responsibility range from bridge metadata + // The bridge tells us which blocks were checked, even if they had no data let phaser_meta = PhaserMetadata::new( segment_start, segment_end, - current.responsibility_start, // responsibility_start (first block this file is responsible for) - responsibility_end, // responsibility_end (last block this file is responsible for) + resp_start, // responsibility_start from bridge + resp_end, // responsibility_end from bridge (may be > current.end_block) current.start_block, // data_start (actual first block with data) - current.end_block, // data_end (actual last block with data) + current.end_block, // data_end (actual last block with data) self.data_type.clone(), ) .with_is_live(self.is_live); diff --git a/crates/phaser-query/src/streaming_with_writer.rs b/crates/phaser-query/src/streaming_with_writer.rs index 8739eff..bbcc671 100644 --- a/crates/phaser-query/src/streaming_with_writer.rs +++ b/crates/phaser-query/src/streaming_with_writer.rs @@ -99,8 +99,12 @@ impl StreamingServiceWithWriter { /// Spawn a stream processor task for any stream type fn spawn_stream_processor( stream_type: StreamType, - mut stream: impl StreamExt> - + Send + mut stream: impl StreamExt< + Item = Result< + (RecordBatch, phaser_bridge::BatchMetadata), + arrow_flight::error::FlightError, + >, + > + Send + Unpin + 'static, sender: mpsc::Sender, @@ -114,7 +118,9 @@ impl StreamingServiceWithWriter { tokio::spawn(async move { while let Some(batch_result) = stream.next().await { match batch_result { - Ok(batch) => { + Ok((batch, _metadata)) => { + // TODO: Track responsibility ranges from metadata for live streaming + // For now, just process the batch as before // Special logging for blocks to show block number let _block_number = if matches!(stream_type, StreamType::Blocks) { let block_num = batch @@ -224,39 +230,39 @@ impl StreamingServiceWithWriter { continue; } - // Subscribe to blocks + // Subscribe to blocks with metadata let blocks_descriptor = BlockchainDescriptor::live(StreamType::Blocks); info!("Subscribing to blocks from bridge"); - let blocks_stream = bridge.subscribe(&blocks_descriptor).await?; + let blocks_stream = bridge.subscribe_with_metadata(&blocks_descriptor).await?; Self::spawn_stream_processor( StreamType::Blocks, - blocks_stream, + Box::pin(blocks_stream), blocks_tx.clone(), self.live_state.clone(), self.chain_id, self.bridge_name.clone(), ); - // Subscribe to transactions + // Subscribe to transactions with metadata let txs_descriptor = BlockchainDescriptor::live(StreamType::Transactions); info!("Subscribing to transactions from bridge"); - let txs_stream = bridge.subscribe(&txs_descriptor).await?; + let txs_stream = bridge.subscribe_with_metadata(&txs_descriptor).await?; Self::spawn_stream_processor( StreamType::Transactions, - txs_stream, + Box::pin(txs_stream), txs_tx.clone(), self.live_state.clone(), self.chain_id, self.bridge_name.clone(), ); - // Subscribe to logs + // Subscribe to logs with metadata let logs_descriptor = BlockchainDescriptor::live(StreamType::Logs); info!("Subscribing to logs from bridge"); - let logs_stream = bridge.subscribe(&logs_descriptor).await?; + let logs_stream = bridge.subscribe_with_metadata(&logs_descriptor).await?; Self::spawn_stream_processor( StreamType::Logs, - logs_stream, + Box::pin(logs_stream), logs_tx.clone(), self.live_state.clone(), self.chain_id, @@ -349,16 +355,16 @@ impl StreamingServiceWithWriter { continue; } - // Subscribe to trie stream + // Subscribe to trie stream with metadata let trie_descriptor = BlockchainDescriptor::live(StreamType::Trie); info!("Subscribing to trie data from bridge"); - match bridge.subscribe(&trie_descriptor).await { + match bridge.subscribe_with_metadata(&trie_descriptor).await { Ok(trie_stream) => { info!("Successfully subscribed to trie stream"); Self::spawn_stream_processor( StreamType::Trie, - trie_stream, + Box::pin(trie_stream), trie_tx.clone(), self.live_state.clone(), self.chain_id, diff --git a/crates/phaser-query/src/sync/error.rs b/crates/phaser-query/src/sync/error.rs index fd8bae1..c648f79 100644 --- a/crates/phaser-query/src/sync/error.rs +++ b/crates/phaser-query/src/sync/error.rs @@ -176,6 +176,9 @@ impl SyncError { ErrorCategory::Connection } else if err_lower.contains("timeout") || err_lower.contains("timed out") { ErrorCategory::Timeout + } else if err_lower.contains("header not found") || err_lower.contains("block not found") { + // Block doesn't exist - likely beyond chain tip, not a transient error for historical sync + ErrorCategory::NoData } else if err_lower.contains("validation") || err_lower.contains("invalid") { ErrorCategory::Validation } else if err_lower.contains("io error") || err_lower.contains("file") { @@ -215,6 +218,9 @@ impl SyncError { ErrorCategory::Connection } else if err_lower.contains("timeout") || err_lower.contains("timed out") { ErrorCategory::Timeout + } else if err_lower.contains("header not found") || err_lower.contains("block not found") { + // Block doesn't exist - likely beyond chain tip, not a transient error for historical sync + ErrorCategory::NoData } else if err_lower.contains("validation") || err_lower.contains("invalid") { ErrorCategory::Validation } else if err_lower.contains("io error") || err_lower.contains("file") { @@ -254,6 +260,9 @@ impl SyncError { ErrorCategory::Connection } else if err_lower.contains("timeout") || err_lower.contains("timed out") { ErrorCategory::Timeout + } else if err_lower.contains("header not found") || err_lower.contains("block not found") { + // Block doesn't exist - likely beyond chain tip, not a transient error for historical sync + ErrorCategory::NoData } else if err_lower.contains("validation") || err_lower.contains("invalid") { ErrorCategory::Validation } else if err_lower.contains("io error") || err_lower.contains("file") { diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index cf4cafd..3c8c446 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -6,6 +6,7 @@ use crate::sync::worker::{ProgressTracker, SyncWorker, SyncWorkerConfig}; use crate::PhaserConfig; use anyhow::Result; use core_executor::ThreadPoolExecutor; +use phaser_bridge::FlightBridgeClient; use phaser_metrics::SegmentMetrics; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -414,6 +415,34 @@ impl SyncServer { // Metrics metrics.sync_errors(error_category, data_type); + + // Check if error is retryable + use crate::sync::error::ErrorCategory; + let is_retryable = matches!( + sync_err.category, + ErrorCategory::Connection + | ErrorCategory::Timeout + | ErrorCategory::Cancelled + ); + + if !is_retryable { + // Non-retryable error - fail immediately + metrics.segment_attempts("failure"); + error!( + worker_id = worker_id, + segment_num = segment_num, + error_type = error_category, + data_type = data_type, + from_block = sync_err.from_block, + to_block = sync_err.to_block, + error = %sync_err, + "Segment failed with non-retryable error, not retrying" + ); + // Don't re-queue, move to next segment + continue; + } + + // Retryable error - proceed with retry logic metrics.segment_attempts("retry"); // Update retry count metric (gauge for current retry count) @@ -459,7 +488,7 @@ impl SyncServer { // Sleep for backoff period BEFORE re-queuing tokio::time::sleep(Duration::from_secs(backoff_secs)).await; - // Always re-queue (never abandon) + // Re-queue for retry let mut work_with_retry = work.clone(); work_with_retry.retry_count = Some(retry_count + 1); work_with_retry.last_attempt = std::time::Instant::now(); @@ -580,6 +609,28 @@ impl SyncService for SyncServer { )) })?; + // Validate that to_block doesn't exceed chain tip (only for historical syncs without live streaming) + if historical_boundary.is_none() { + // Connect to bridge to check chain tip + let mut client = FlightBridgeClient::connect(bridge.endpoint.clone()) + .await + .map_err(|e| Status::unavailable(format!("Failed to connect to bridge: {}", e)))?; + + let bridge_info = client + .get_info() + .await + .map_err(|e| Status::internal(format!("Failed to get bridge info: {}", e)))?; + + if bridge_info.current_block > 0 && to_block > bridge_info.current_block { + return Err(Status::invalid_argument(format!( + "Requested to_block ({}) exceeds chain tip ({}). \ + Historical sync cannot request blocks that don't exist yet. \ + Current chain tip is at block {}.", + to_block, bridge_info.current_block, bridge_info.current_block + ))); + } + } + // Generate job ID let job_id = Uuid::new_v4().to_string(); diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index fc81c98..5edfe21 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -25,10 +25,7 @@ fn is_transient_error(err: &SyncError) -> bool { ErrorCategory::Connection | ErrorCategory::Timeout | ErrorCategory::Cancelled ) || // Also check message for specific transient patterns - err.message.contains("txn 0 already rollback") - || err.message.contains("Timeout expired") - || err.message.contains("Failed to receive") - || err.message.contains("stream") + err.message.contains("Timeout expired") } /// Worker progress tracking @@ -515,25 +512,30 @@ impl SyncWorker { let descriptor = BlockchainDescriptor::historical(StreamType::Blocks, from_block, to_block) .with_preferences(preferences); - // Subscribe to the block stream (returns Arrow RecordBatches) - let mut stream = client.subscribe(&descriptor).await.map_err(|e| { - SyncError::from_anyhow_with_context( - DataType::Blocks, - from_block, - to_block, - "Failed to subscribe to block stream", - e, - ) - })?; + // Subscribe to the block stream with metadata (returns RecordBatch + responsibility range) + let stream = client + .subscribe_with_metadata(&descriptor) + .await + .map_err(|e| { + SyncError::from_anyhow_with_context( + DataType::Blocks, + from_block, + to_block, + "Failed to subscribe to block stream", + e, + ) + })?; + let mut stream = Box::pin(stream); let mut batches_processed = 0u64; let mut bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; + let mut max_responsibility_end: Option = None; while let Some(batch_result) = stream.next().await { - let batch = match batch_result { - Ok(batch) => batch, + let (batch, metadata) = match batch_result { + Ok(data) => data, Err(e) => { // Preserve the full gRPC/Flight error chain from bridge/erigon return Err(SyncError::from_error_with_context( @@ -546,6 +548,16 @@ impl SyncWorker { } }; + // Track the maximum responsibility end from batch metadata + let resp_end = metadata.responsibility_range.end_block; + max_responsibility_end = Some( + max_responsibility_end + .map(|current| current.max(resp_end)) + .unwrap_or(resp_end), + ); + // Update writer's responsibility range from bridge metadata + writer.update_responsibility_end(resp_end); + debug!( "Worker {} received block batch with {} rows", self.worker_id, @@ -825,20 +837,22 @@ impl SyncWorker { self.validation_stage ); - // Subscribe to the transaction stream (returns Arrow RecordBatches) - let mut stream = client - .subscribe(&descriptor) + // Subscribe to the transaction stream with metadata (returns RecordBatch + responsibility range) + let stream = client + .subscribe_with_metadata(&descriptor) .await .context("Failed to subscribe to transaction stream")?; + let mut stream = Box::pin(stream); let mut batches_processed = 0u64; let mut bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; + let mut max_responsibility_end: Option = None; while let Some(batch_result) = stream.next().await { - let batch = match batch_result { - Ok(batch) => batch, + let (batch, metadata) = match batch_result { + Ok(data) => data, Err(e) => { // Log the full error details from the bridge before wrapping error!( @@ -856,6 +870,19 @@ impl SyncWorker { } }; + // Track the maximum responsibility end from batch metadata + let resp_end = metadata.responsibility_range.end_block; + max_responsibility_end = Some( + max_responsibility_end + .map(|current| current.max(resp_end)) + .unwrap_or(resp_end), + ); + // Update writer's responsibility range from bridge metadata + writer.update_responsibility_end(resp_end); + if let Some(ref mut proof_w) = proof_writer { + proof_w.update_responsibility_end(resp_end); + } + debug!( "Worker {} received transaction batch with {} rows", self.worker_id, @@ -875,6 +902,17 @@ impl SyncWorker { if first_block_seen.is_none() { first_block_seen = Some(first); } + + // Log what we received from bridge + info!( + "PHASER RECEIVED: Worker {} transactions batch {}, blocks {}-{} ({} rows)", + self.worker_id, + batches_processed + 1, + first, + last, + batch.num_rows() + ); + last_block_seen = Some(last); } } @@ -1181,25 +1219,30 @@ impl SyncWorker { .with_validation(self.validation_stage) .with_preferences(preferences); - // Subscribe to the log stream (returns Arrow RecordBatches) - let mut stream = client.subscribe(&descriptor).await.map_err(|e| { - SyncError::from_anyhow_with_context( - DataType::Logs, - from_block, - to_block, - "Failed to subscribe to log stream", - e, - ) - })?; + // Subscribe to the log stream with metadata (returns RecordBatch + responsibility range) + let stream = client + .subscribe_with_metadata(&descriptor) + .await + .map_err(|e| { + SyncError::from_anyhow_with_context( + DataType::Logs, + from_block, + to_block, + "Failed to subscribe to log stream", + e, + ) + })?; + let mut stream = Box::pin(stream); let mut batches_processed = 0u64; let mut bytes_written = 0u64; let mut first_block_seen: Option = None; let mut last_block_seen: Option = None; + let mut max_responsibility_end: Option = None; while let Some(batch_result) = stream.next().await { - let batch = match batch_result { - Ok(batch) => batch, + let (batch, metadata) = match batch_result { + Ok(data) => data, Err(e) => { // Preserve the full gRPC/Flight error chain from bridge/erigon return Err(SyncError::from_error_with_context( @@ -1212,6 +1255,16 @@ impl SyncWorker { } }; + // Track the maximum responsibility end from batch metadata + let resp_end = metadata.responsibility_range.end_block; + max_responsibility_end = Some( + max_responsibility_end + .map(|current| current.max(resp_end)) + .unwrap_or(resp_end), + ); + // Update writer's responsibility range from bridge metadata + writer.update_responsibility_end(resp_end); + debug!( "Worker {} received log batch with {} rows", self.worker_id,