Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ serde_json = { version = "1.0", features = ["arbitrary_precision"] }
serde_regex = "1.1.0"
serde_yaml = "0.9.21"
slog = { version = "2.8.2", features = ["release_max_level_trace", "max_level_trace"] }
slog-async = "2.5.0"
slog-term = "2.7.0"
sqlparser = { version = "0.60.0", features = ["visitor"] }
strum = { version = "0.27", features = ["derive"] }
syn = { version = "2.0.114", features = ["full"] }
Expand Down
4 changes: 1 addition & 3 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::polling_monitor::{
use anyhow::{self, Error};
use bytes::Bytes;
use graph::{
blockchain::{Blockchain, TriggerFilterWrapper},
blockchain::Blockchain,
components::{store::DeploymentId, subgraph::HostMetrics},
data::subgraph::SubgraphManifest,
data_source::{
Expand Down Expand Up @@ -74,7 +74,6 @@ where
pub(crate) instance: SubgraphInstance<C, T>,
pub instances: SubgraphKeepAlive,
pub offchain_monitor: OffchainMonitor,
pub filter: Option<TriggerFilterWrapper<C>>,
pub(crate) trigger_processor: Box<dyn TriggerProcessor<C, T>>,
pub(crate) decoder: Box<Decoder<C, T>>,
}
Expand All @@ -101,7 +100,6 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
instance,
instances,
offchain_monitor,
filter: None,
trigger_processor,
decoder,
}
Expand Down
95 changes: 94 additions & 1 deletion core/src/subgraph/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,37 @@ impl DeterministicError for StoreError {}

impl DeterministicError for anyhow::Error {}

/// Classification of processing errors for unified error handling.
///
/// This enum provides a consistent way to categorize errors and determine
/// the appropriate response. The error handling invariants are:
///
/// - **Deterministic**: Stop processing the current block, persist PoI only.
/// The subgraph will be marked as failed. These errors are reproducible
/// and indicate a bug in the subgraph or a permanent data issue.
///
/// - **NonDeterministic**: Retry with exponential backoff. These errors are
/// transient (network issues, temporary database problems) and may succeed
/// on retry.
///
/// - **PossibleReorg**: Restart the block stream cleanly without persisting.
/// The block stream needs to be restarted to detect and handle a potential
/// blockchain reorganization.
///
/// - **Canceled**: The subgraph was canceled (unassigned or shut down).
/// No error should be recorded; this is a clean shutdown.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessingErrorKind {
/// Error is deterministic - stop processing, persist PoI only
Deterministic,
/// Error is non-deterministic - retry with backoff
NonDeterministic,
/// Possible blockchain reorg detected - restart block stream cleanly
PossibleReorg,
/// Processing was canceled - clean shutdown
Canceled,
}

/// An error happened during processing and we need to classify errors into
/// deterministic and non-deterministic errors. This struct holds the result
/// of that classification
Expand All @@ -23,13 +54,72 @@ pub enum ProcessingError {
#[error("{0}")]
Deterministic(Box<dyn DeterministicError>),

/// A possible blockchain reorganization was detected.
/// The block stream should be restarted to detect and handle the reorg.
#[error("possible reorg detected: {0:#}")]
PossibleReorg(Error),

#[error("subgraph stopped while processing triggers")]
Canceled,
}

impl ProcessingError {
/// Classify the error into one of the defined error kinds.
///
/// This method provides a unified way to determine how to handle an error:
/// - `Deterministic`: Stop processing, persist PoI only
/// - `NonDeterministic`: Retry with backoff
/// - `PossibleReorg`: Restart block stream cleanly
/// - `Canceled`: Clean shutdown, no error recording
pub fn kind(&self) -> ProcessingErrorKind {
match self {
ProcessingError::Unknown(_) => ProcessingErrorKind::NonDeterministic,
ProcessingError::Deterministic(_) => ProcessingErrorKind::Deterministic,
ProcessingError::PossibleReorg(_) => ProcessingErrorKind::PossibleReorg,
ProcessingError::Canceled => ProcessingErrorKind::Canceled,
}
}

#[allow(dead_code)]
pub fn is_deterministic(&self) -> bool {
matches!(self, ProcessingError::Deterministic(_))
matches!(self.kind(), ProcessingErrorKind::Deterministic)
}

/// Returns true if this error should stop processing the current block.
///
/// Deterministic errors stop processing because continuing would produce
/// incorrect results. The PoI is still persisted for debugging purposes.
#[allow(dead_code)]
pub fn should_stop_processing(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::Deterministic)
}

/// Returns true if this error requires a clean restart of the block stream.
///
/// Possible reorgs require restarting to allow the block stream to detect
/// and properly handle the reorganization. No state should be persisted
/// in this case.
#[allow(dead_code)]
pub fn should_restart(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::PossibleReorg)
}

/// Returns true if this error is retryable with exponential backoff.
///
/// Non-deterministic errors (network issues, temporary failures) may
/// succeed on retry and should not immediately fail the subgraph.
#[allow(dead_code)]
pub fn is_retryable(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::NonDeterministic)
}

/// Returns true if processing was canceled (clean shutdown).
///
/// Canceled errors indicate the subgraph was unassigned or shut down
/// intentionally and should not be treated as failures.
#[allow(dead_code)]
pub fn is_canceled(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::Canceled)
}

pub fn detail(self, ctx: &str) -> ProcessingError {
Expand All @@ -41,6 +131,9 @@ impl ProcessingError {
ProcessingError::Deterministic(e) => {
ProcessingError::Deterministic(Box::new(anyhow!("{e}").context(ctx.to_string())))
}
ProcessingError::PossibleReorg(e) => {
ProcessingError::PossibleReorg(e.context(ctx.to_string()))
}
ProcessingError::Canceled => ProcessingError::Canceled,
}
}
Expand Down
Loading