Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions crates/bridges/evm/erigon-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::segment_worker::{split_into_segments, SegmentConfig, SegmentWorker};
use crate::streaming_service::StreamingService;
use crate::trie_client::TrieClient;
use crate::trie_converter;
use phaser_metrics::SegmentMetrics;
use std::sync::Arc;
use tonic::Status as TonicStatus;

Expand Down Expand Up @@ -122,15 +123,21 @@ impl ErigonFlightBridge {
})
}

pub fn bridge_info(&self) -> BridgeInfo {
pub async fn bridge_info(&self) -> BridgeInfo {
// Query current block from Erigon
let current_block = {
let mut client = self.client.lock().await;
client.get_latest_block().await.unwrap_or(0)
};

BridgeInfo {
name: "erigon-bridge".to_string(),
node_type: "erigon".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
chain_id: self.chain_id,
capabilities: vec!["streaming".to_string()],
current_block: 0, // Would need to query this from Erigon
oldest_block: 0, // Would need to query this from Erigon
current_block,
oldest_block: 0, // Could query this from Erigon if needed
}
}

Expand Down Expand Up @@ -545,6 +552,7 @@ impl ErigonFlightBridge {

let blockdata_pool = self.blockdata_pool.clone();
let batch_size = self.segment_config.validation_batch_size;
let metrics = self.metrics.clone();

let stream = async_stream::stream! {
match stream_type {
Expand Down Expand Up @@ -621,8 +629,14 @@ impl ErigonFlightBridge {
}
Err(e) => {
error!("Failed to receive block batch: {}", e);

// Convert tonic::Status to ErigonBridgeError for proper categorization
let bridge_err = ErigonBridgeError::from(e);
let error_type = crate::segment_worker::SegmentWorker::categorize_error(&bridge_err);
metrics.error(&error_type, "blocks");

yield Err(arrow_flight::error::FlightError::ExternalError(
Box::new(std::io::Error::other(e.to_string()))
Box::new(std::io::Error::other(bridge_err.to_string()))
));
break;
}
Expand Down Expand Up @@ -674,7 +688,7 @@ impl ErigonFlightBridge {
#[async_trait]
impl FlightBridge for ErigonFlightBridge {
async fn get_info(&self) -> Result<BridgeInfo, Status> {
Ok(self.bridge_info())
Ok(self.bridge_info().await)
}

async fn get_capabilities(&self) -> Result<BridgeCapabilities, Status> {
Expand All @@ -698,7 +712,7 @@ impl FlightBridge for ErigonFlightBridge {
// Simple handshake - return bridge info
let response = HandshakeResponse {
protocol_version: 1,
payload: serde_json::to_vec(&self.bridge_info())
payload: serde_json::to_vec(&self.bridge_info().await)
.map_err(|e| Status::internal(format!("Failed to serialize bridge info: {}", e)))?
.into(),
};
Expand Down Expand Up @@ -882,6 +896,8 @@ impl FlightBridge for ErigonFlightBridge {
};

let schema = Self::get_schema_for_type(stream_type);
let metrics_for_stream = self.metrics.clone();
let stream_type_for_metrics = stream_type;

// Compression is handled at the gRPC transport level via tonic
// (configured in main.rs with accept_compressed/send_compressed)
Expand All @@ -900,11 +916,11 @@ impl FlightBridge for ErigonFlightBridge {
while let Some(batch_result) = batch_stream.next().await {
match batch_result {
Ok(batch_with_range) => {
// Encode the responsibility range metadata
let metadata = match batch_with_range.encode_range_metadata() {
// Encode the batch metadata (responsibility range)
let metadata = match batch_with_range.encode_metadata() {
Ok(m) => m,
Err(e) => {
error!("Failed to encode range metadata: {}", e);
error!("Failed to encode batch metadata: {}", e);
yield Err(Status::internal(format!("Metadata encoding error: {}", e)));
continue;
}
Expand Down Expand Up @@ -936,6 +952,30 @@ impl FlightBridge for ErigonFlightBridge {
}
Err(e) => {
error!("Error in batch stream: {}", e);

// Categorize error from string (since FlightError wraps the original error as string)
let err_str = e.to_string().to_lowercase();
let error_type = if err_str.contains("timeout") || err_str.contains("timed out") {
"timeout"
} else if err_str.contains("header not found") || err_str.contains("block not found") || err_str.contains("not found") {
"not_found"
} else if err_str.contains("connection") || err_str.contains("connect") {
"connection"
} else if err_str.contains("unavailable") {
"unavailable"
} else {
"unknown"
};

let data_type = match stream_type_for_metrics {
StreamType::Blocks => "blocks",
StreamType::Transactions => "transactions",
StreamType::Logs => "logs",
StreamType::Trie => "trie",
};

metrics_for_stream.error(error_type, data_type);

yield Err(Status::internal(format!("Stream error: {}", e)));
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bridges/evm/erigon-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ async fn main() -> Result<()> {
// Create the Flight server
let flight_server = FlightBridgeServer::new(bridge);

// Configure global maximum message size (128MB)
// Configure global maximum message size (256MB)
// Per-stream limits are negotiated via StreamPreferences
const MAX_MESSAGE_SIZE: usize = 128 * 1024 * 1024;
const MAX_MESSAGE_SIZE: usize = 256 * 1024 * 1024;

// Configure compression based on CLI flag
let mut flight_service = flight_server
Expand Down
112 changes: 102 additions & 10 deletions crates/bridges/evm/erigon-bridge/src/segment_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl SegmentWorker {
Err(e) => {
error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}",
worker_id, segment_id, current_block, chunk_end, e);

// Track error type for monitoring
let error_type = Self::categorize_error(&e);
self.metrics.error(&error_type, "blocks");

self.metrics.active_workers_dec("blocks");
self.metrics.segment_attempt(false);
yield Err(e);
Expand Down Expand Up @@ -241,10 +246,13 @@ impl SegmentWorker {
worker_id, segment_id, start_block, end_block
);

let request_start = Instant::now();
let mut tx_stream = client.stream_transactions(start_block, end_block, self.config.validation_batch_size as u32).await.map_err(|e| {
error!("Worker {} segment {}: Failed to create transaction stream: {}", worker_id, segment_id, e);
e
})?;
let duration_ms = request_start.elapsed().as_millis() as f64;
self.metrics.grpc_request_duration_transactions(segment_id, "stream_transactions", duration_ms);
let mut all_transactions = Vec::new();

let mut batch_count = 0u64;
Expand Down Expand Up @@ -335,6 +343,10 @@ impl SegmentWorker {
.await {
Ok(batch) => batch,
Err(e) => {
// Track error type for monitoring
let error_type = Self::categorize_error(&e);
self.metrics.error(&error_type, "transactions");

// Clean up metrics before error return
self.metrics.active_workers_dec("transactions");
self.metrics.segment_attempt(false);
Expand All @@ -346,6 +358,10 @@ impl SegmentWorker {
// Yield this batch immediately - no buffering!
// Wrap with responsibility range metadata
yielded_batches += 1;
info!(
"BRIDGE YIELD: Worker {} segment {}: Yielding transaction batch {}, blocks {}-{}",
worker_id, segment_id, yielded_batches, start_block, end_block
);
yield Ok(BatchWithRange::new(arrow_batch, start_block, end_block));
}

Expand All @@ -365,6 +381,7 @@ impl SegmentWorker {
segment_id,
total_blocks
);
self.metrics.error("empty_stream", "transactions");
self.metrics.active_workers_dec("transactions");
self.metrics
.set_worker_stage(worker_id, segment_id, WorkerStage::Idle);
Expand Down Expand Up @@ -441,6 +458,11 @@ impl SegmentWorker {
Err(e) => {
error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}",
worker_id, segment_id, current_block, chunk_end, e);

// Track error type for monitoring
let error_type = Self::categorize_error(&e);
self.metrics.error(&error_type, "logs");

self.metrics.active_workers_dec("logs");
yield Err(e);
return;
Expand All @@ -461,9 +483,6 @@ impl SegmentWorker {
let mut sorted_chunk: Vec<_> = chunk_headers.into_iter().collect();
sorted_chunk.sort_by_key(|(block_num, _)| *block_num);

// Batch blocks into sub-ranges for receipt fetching
// Instead of 1 request per block, do 1 request per 10-20 blocks
// This reduces gRPC overhead while maintaining good parallelism
const RECEIPT_BATCH_SIZE: usize = 10;

let receipt_futures: Vec<_> = sorted_chunk
Expand Down Expand Up @@ -504,6 +523,10 @@ impl SegmentWorker {
current_batch.extend(batch_data);
}
Err(e) => {
// Track error type for monitoring
let error_type = Self::categorize_error(&e);
self.metrics.error(&error_type, "logs");

self.metrics.active_workers_dec("logs");
yield Err(e);
return;
Expand All @@ -530,6 +553,10 @@ impl SegmentWorker {
.await {
Ok(b) => b,
Err(e) => {
// Track error type for monitoring
let error_type = Self::categorize_error(&e);
self.metrics.error(&error_type, "logs");

self.metrics.active_workers_dec("logs");
yield Err(e);
return;
Expand Down Expand Up @@ -567,6 +594,7 @@ impl SegmentWorker {
segment_id,
total_blocks
);
self.metrics.error("empty_stream", "logs");
self.metrics.active_workers_dec("logs");
yield Err(ErigonBridgeError::StreamProtocol(StreamError::ZeroBatchesConsumed {
start: self.segment_start,
Expand Down Expand Up @@ -603,6 +631,15 @@ impl SegmentWorker {
batch_size: u32,
metrics: &BridgeMetrics,
) -> Result<HashMap<u64, (Header, i64)>, ErigonBridgeError> {
info!(
"ERIGON REQUEST: Requesting blocks {}-{} from Erigon (expecting {} blocks)",
segment_start,
segment_end,
segment_end - segment_start + 1
);

let segment_id = segment_start / 500_000;
let request_start = Instant::now();
let mut block_stream = client
.stream_blocks(segment_start, segment_end, batch_size)
.await
Expand All @@ -613,6 +650,8 @@ impl SegmentWorker {
);
e
})?;
let duration_ms = request_start.elapsed().as_millis() as f64;
metrics.grpc_request_duration_blocks(segment_id, "stream_blocks", duration_ms);

// Track active gRPC stream
metrics.grpc_stream_inc("blocks");
Expand Down Expand Up @@ -668,13 +707,28 @@ impl SegmentWorker {
segment_start, segment_end, batch_count
);
} else {
debug!(
"Blocks {}-{}: Header stream completed, fetched {} headers from {} batches",
segment_start,
segment_end,
headers.len(),
batch_count
);
// Calculate actual range received
let block_numbers: Vec<u64> = headers.keys().copied().collect();
let min_received = block_numbers.iter().min().copied().unwrap_or(0);
let max_received = block_numbers.iter().max().copied().unwrap_or(0);
let expected_count = segment_end - segment_start + 1;
let missing_count = expected_count - headers.len() as u64;

if missing_count > 0 {
warn!(
"ERIGON RESPONSE: Blocks {}-{}: Received {} headers (expected {}), actual range {}-{}, MISSING {} blocks",
segment_start, segment_end, headers.len(), expected_count, min_received, max_received, missing_count
);
} else {
info!(
"ERIGON RESPONSE: Blocks {}-{}: Received {} headers (complete), range {}-{}",
segment_start,
segment_end,
headers.len(),
min_received,
max_received
);
}
}

Ok(headers)
Expand Down Expand Up @@ -755,6 +809,7 @@ impl SegmentWorker {
metrics: &BridgeMetrics,
) -> Result<Vec<(u64, Vec<crate::proto::custom::ReceiptData>, Header)>, ErigonBridgeError> {
let call_start = std::time::Instant::now();
let segment_id = from_block / 500_000;

// Spawn a monitoring task that warns if the call takes too long
const SLOW_CALL_WARNING_THRESHOLD_SECS: u64 = 300; // 5 minutes
Expand All @@ -773,6 +828,7 @@ impl SegmentWorker {
})
};

let request_start = Instant::now();
let mut receipt_stream = client
.execute_blocks(from_block, to_block, 100)
.await
Expand All @@ -788,6 +844,8 @@ impl SegmentWorker {
);
e
})?;
let duration_ms = request_start.elapsed().as_millis() as f64;
metrics.grpc_request_duration_logs(segment_id, "execute_blocks", duration_ms);

// Cancel the monitor task since the call succeeded
monitor_handle.abort();
Expand Down Expand Up @@ -964,6 +1022,40 @@ impl SegmentWorker {
// Convert directly without intermediate ReceiptBatch struct
BlockDataConverter::receipts_to_logs_arrow(all_receipts, &timestamps)
}

/// Categorize error for metrics tracking
pub(crate) fn categorize_error(error: &ErigonBridgeError) -> String {
let err_str = error.to_string();
let err_lower = err_str.to_lowercase();

// Check for known error patterns first
if err_lower.contains("timeout") || err_lower.contains("timed out") {
return "timeout".to_string();
} else if err_lower.contains("header not found") || err_lower.contains("block not found") {
return "not_found".to_string();
} else if err_lower.contains("connection") || err_lower.contains("connect") {
return "connection".to_string();
} else if err_lower.contains("unavailable") {
return "unavailable".to_string();
} else if err_lower.contains("rlp") || err_lower.contains("decoding") {
return "decode_error".to_string();
} else if err_lower.contains("validation") {
return "validation".to_string();
}

// For unknown errors, include the actual error message (truncated to avoid extremely long labels)
// Take the first part up to first colon, or first 80 chars
let pattern = err_str
.split(':')
.next()
.unwrap_or(&err_str)
.trim()
.chars()
.take(80)
.collect::<String>();

format!("unknown:{}", pattern)
}
}

/// Split a block range into segments
Expand Down
Loading