diff --git a/Cargo.toml b/Cargo.toml index 1685fa97e802..aca39db811d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,6 +204,7 @@ smallvec = "1" smart-default = "0.7" sonic-rs = "0.5" spire_enum = "1" +sqlx = { version = "0.8", default-features = false, features = ["sqlite", "runtime-tokio", "macros"] } stacker = "0.1" static_assertions = "1" statrs = "0.18" @@ -232,7 +233,6 @@ zstd = "0.13" # optional dependencies console-subscriber = { version = "0.5", features = ["parking_lot"], optional = true } -sqlx = { version = "0.8", default-features = false, features = ["sqlite", "runtime-tokio", "macros"], optional = true } tikv-jemallocator = { version = "0.6", optional = true } tracing-loki = { version = "0.2", default-features = false, features = ["compat-0-2-1", "rustls"], optional = true } @@ -324,14 +324,13 @@ lto = "fat" # These should be refactored (probably removed) in #2984 [features] -default = ["jemalloc", "tracing-loki", "sqlite"] -test = [] # default feature set for unit tests +default = ["jemalloc", "tracing-loki"] +test = [] # default feature set for unit tests slim = ["rustalloc"] -cargo-test = [] # group of tests that is recommended to run with `cargo test` instead of `nextest` -doctest-private = [] # see lib.rs::doctest_private -benchmark-private = ["dep:criterion"] # see lib.rs::benchmark_private -interop-tests-private = [] # see lib.rs::interop_tests_private -sqlite = ["dep:sqlx"] +cargo-test = [] # group of tests that is recommended to run with `cargo test` instead of `nextest` +doctest-private = [] # see lib.rs::doctest_private +benchmark-private = ["dep:criterion"] # see lib.rs::benchmark_private +interop-tests-private = [] # see lib.rs::interop_tests_private # Allocator. Use at most one of these. rustalloc = [] diff --git a/scripts/tests/api_compare/docker-compose.yml b/scripts/tests/api_compare/docker-compose.yml index 3221ce40ae84..4af1a22df317 100644 --- a/scripts/tests/api_compare/docker-compose.yml +++ b/scripts/tests/api_compare/docker-compose.yml @@ -58,7 +58,7 @@ services: --halt-after-import SNAPSHOT_EPOCH="$(ls /data/*.car.zst | tail -n 1 | grep -Eo '[0-9]+' | tail -n 1)" - # backfill the index db + # backfill the index db (nosql) forest-tool index backfill --from $$SNAPSHOT_EPOCH --n-tipsets 200 --chain ${CHAIN} forest --chain ${CHAIN} --encrypt-keystore false --no-gc \ @@ -90,6 +90,10 @@ services: export FULLNODE_API_INFO="$(cat /data/forest-token):/dns/forest/tcp/${FOREST_RPC_PORT}/http" echo "Waiting till Forest is ready" forest-cli healthcheck ready --healthcheck-port ${FOREST_HEALTHZ_RPC_PORT} --wait + + # backfill the index db (sql) + SNAPSHOT_EPOCH="$(ls /data/*.car.zst | tail -n 1 | grep -Eo '[0-9]+' | tail -n 1)" + forest-cli index validate-backfill --from $$SNAPSHOT_EPOCH --to $(($$SNAPSHOT_EPOCH - 200)) forest-wallet-import: depends_on: forest: diff --git a/scripts/tests/api_compare/filter-list-gateway b/scripts/tests/api_compare/filter-list-gateway index df81739d886b..54b4fc9f582d 100644 --- a/scripts/tests/api_compare/filter-list-gateway +++ b/scripts/tests/api_compare/filter-list-gateway @@ -7,6 +7,7 @@ !Filecoin.ChainSetHead !Filecoin.ChainStatObj !Filecoin.ChainTipSetWeight +!Filecoin.ChainValidateIndex !Filecoin.EthGetBlockReceiptsLimited !Filecoin.F3 !Filecoin.GasEstimateGasLimit diff --git a/scripts/tests/api_compare/filter-list-offline b/scripts/tests/api_compare/filter-list-offline index 3f2dafc80728..f073808064ec 100644 --- a/scripts/tests/api_compare/filter-list-offline +++ b/scripts/tests/api_compare/filter-list-offline @@ -1,5 +1,7 @@ # This list contains potentially broken methods (or tests) that are ignored. # They should be considered bugged, and not used until the root cause is resolved. +# Indexer disabled +!Filecoin.ChainValidateIndex !Filecoin.EthSyncing !eth_syncing !Filecoin.NetAddrsListen diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 84086620907b..65e131deac27 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -126,7 +126,7 @@ where chain_config: Arc, genesis_block_header: CachingBlockHeader, ) -> anyhow::Result { - let (publisher, _) = broadcast::channel(SINK_CAP); + let (head_changes_tx, _) = broadcast::channel(SINK_CAP); let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db))); let validated_blocks = Mutex::new(HashSet::default()); let head = if let Some(head_tsk) = heaviest_tipset_key_provider @@ -141,7 +141,7 @@ where Tipset::from(&genesis_block_header) }; let cs = Self { - head_changes_tx: publisher, + head_changes_tx, chain_index, tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs new file mode 100644 index 000000000000..d0e7e86711c0 --- /dev/null +++ b/src/chain/store/indexer.rs @@ -0,0 +1,927 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod ddls; +#[cfg(test)] +mod tests; + +use ahash::HashMap; +use anyhow::Context as _; +use cid::Cid; +pub use ddls::{DDLS, PreparedStatements}; +use fvm_ipld_blockstore::Blockstore; +use sqlx::Row as _; + +use crate::{ + blocks::Tipset, + chain::{ChainStore, HeadChanges, index::ResolveNullTipset}, + message::{ChainMessage, SignedMessage}, + rpc::{ + chain::types::ChainIndexValidation, + eth::{eth_tx_from_signed_eth_message, types::EthHash}, + }, + shim::{ + ActorID, + address::Address, + clock::{ChainEpoch, EPOCHS_IN_DAY}, + executor::{Receipt, StampedEvent}, + }, + utils::sqlite, +}; +use std::{ + ops::DerefMut as _, + sync::Arc, + time::{Duration, Instant}, +}; + +type ActorToDelegatedAddressFunc = + Arc anyhow::Result
+ Send + Sync + 'static>; + +type RecomputeTipsetStateFunc = Arc anyhow::Result<()> + Send + Sync + 'static>; + +type ExecutedMessage = (ChainMessage, Receipt, Option>); + +struct IndexedTipsetData { + pub indexed_messages_count: u64, + pub indexed_events_count: u64, + pub indexed_event_entries_count: u64, +} + +#[derive(Debug, smart_default::SmartDefault)] +pub struct SqliteIndexerOptions { + pub gc_retention_epochs: ChainEpoch, +} + +impl SqliteIndexerOptions { + fn validate(&self) -> anyhow::Result<()> { + anyhow::ensure!( + self.gc_retention_epochs == 0 || self.gc_retention_epochs >= EPOCHS_IN_DAY, + "gc retention epochs must be 0 or greater than {EPOCHS_IN_DAY}" + ); + Ok(()) + } + + pub fn with_gc_retention_epochs(mut self, gc_retention_epochs: ChainEpoch) -> Self { + self.gc_retention_epochs = gc_retention_epochs; + self + } +} + +pub struct SqliteIndexer { + lock: tokio::sync::Mutex<()>, + options: SqliteIndexerOptions, + cs: Arc>, + db: sqlx::SqlitePool, + stmts: PreparedStatements, + actor_to_delegated_address_func: Option, + recompute_tipset_state_func: Option, +} + +impl SqliteIndexer +where + BS: Blockstore, +{ + pub async fn new( + db: sqlx::SqlitePool, + cs: Arc>, + options: SqliteIndexerOptions, + ) -> anyhow::Result { + options.validate()?; + sqlite::init_db( + &db, + "chain index", + DDLS.iter().cloned().map(sqlx::query), + vec![], + ) + .await?; + let stmts = PreparedStatements::default(); + Ok(Self { + lock: Default::default(), + options, + cs, + db, + stmts, + actor_to_delegated_address_func: None, + recompute_tipset_state_func: None, + }) + } + + /// Acquires write lock. Note that `WAL` (Write-Ahead Logging) mode is enabled to allow + /// concurrent reads to occur while a single write transaction is active + pub async fn aquire_write_lock(&self) -> tokio::sync::MutexGuard<'_, ()> { + self.lock.lock().await + } + + pub fn with_actor_to_delegated_address_func(mut self, f: ActorToDelegatedAddressFunc) -> Self { + self.actor_to_delegated_address_func = Some(f); + self + } + + pub fn with_recompute_tipset_state_func(mut self, f: RecomputeTipsetStateFunc) -> Self { + self.recompute_tipset_state_func = Some(f); + self + } + + pub async fn index_loop( + &self, + mut head_changes_rx: tokio::sync::broadcast::Receiver, + ) -> anyhow::Result<()> { + loop { + let HeadChanges { reverts, applies } = head_changes_rx.recv().await?; + let _lock = self.aquire_write_lock().await; + for ts in reverts { + if let Err(e) = self.revert_tipset(&ts).await { + tracing::warn!( + "failed to revert new head@{}({}): {e}", + ts.epoch(), + ts.key() + ); + } + } + for ts in applies { + if let Err(e) = self.index_tipset(&ts).await { + tracing::warn!("failed to index new head@{}({}): {e}", ts.epoch(), ts.key()); + } + } + } + } + + pub async fn gc_loop(&self) { + if self.options.gc_retention_epochs <= 0 { + tracing::info!("gc retention epochs is not set, skipping gc"); + return; + } + + let mut ticker = tokio::time::interval(Duration::from_hours(4)); + loop { + ticker.tick().await; + self.gc().await; + } + } + + async fn gc(&self) { + let _lock = self.aquire_write_lock().await; + tracing::info!("starting index gc"); + let head = self.cs.heaviest_tipset(); + let removal_epoch = head.epoch() - self.options.gc_retention_epochs - 10; // 10 is for some grace period + if removal_epoch <= 0 { + tracing::info!("no tipsets to gc"); + return; + } + + tracing::info!( + "gc'ing all (reverted and non-reverted) tipsets before epoch {removal_epoch}" + ); + match sqlx::query(self.stmts.remove_tipsets_before_height) + .execute(&self.db) + .await + { + Ok(r) => { + tracing::info!( + "gc'd {} entries before epoch {removal_epoch}", + r.rows_affected() + ); + } + Err(e) => { + tracing::error!( + "failed to remove reverted tipsets before height {removal_epoch}: {e}" + ); + return; + } + } + + // ------------------------------------------------------------------------------------------------- + // Also GC eth hashes + + // Convert `gc_retention_epochs` to number of days + let gc_retention_days = self.options.gc_retention_epochs / EPOCHS_IN_DAY; + if gc_retention_days < 1 { + tracing::info!("skipping gc of eth hashes as retention days is less than 1"); + return; + } + + tracing::info!("gc'ing eth hashes older than {gc_retention_days} days"); + match sqlx::query(self.stmts.remove_eth_hashes_older_than) + .execute(&self.db) + .await + { + Ok(r) => { + tracing::info!( + "gc'd {} eth hashes older than {gc_retention_days} days", + r.rows_affected() + ); + } + Err(e) => { + tracing::error!("failed to gc eth hashes older than {gc_retention_days} days: {e}"); + } + } + } + + pub async fn populate(&self) -> anyhow::Result<()> { + let _lock = self.aquire_write_lock().await; + let start = Instant::now(); + let head = self.cs.heaviest_tipset(); + tracing::info!( + "starting to populate chainindex at head epoch {}", + head.epoch() + ); + let mut tx = self.db.begin().await?; + let mut total_indexed = 0; + for ts in head.chain(self.cs.blockstore()) { + if let Err(e) = self.index_tipset_with_tx(&mut tx, &ts).await { + tracing::info!( + "stopping chainindex population at epoch {}: {e}", + ts.epoch() + ); + break; + } + total_indexed += 1; + } + tx.commit().await?; + tracing::info!( + "successfully populated chain index with {total_indexed} tipsets, took {}", + humantime::format_duration(start.elapsed()) + ); + Ok(()) + } + + pub async fn validate_index( + &self, + epoch: ChainEpoch, + backfill: bool, + ) -> anyhow::Result { + let _lock = if backfill { + Some(self.aquire_write_lock().await) + } else { + None + }; + let head = self.cs.heaviest_tipset(); + anyhow::ensure!( + epoch < head.epoch(), + "cannot validate index at epoch {epoch}, can only validate at an epoch less than chain head epoch {}", + head.epoch() + ); + let ts = + self.cs + .chain_index() + .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?; + let is_index_empty: bool = sqlx::query(self.stmts.is_index_empty) + .fetch_one(&self.db) + .await? + .get(0); + + // Canonical chain has a null round at the epoch -> return if index is empty otherwise validate that index also + // has a null round at this epoch i.e. it does not have anything indexed at all for this epoch + if ts.epoch() != epoch { + if is_index_empty { + return Ok(ChainIndexValidation { + height: epoch, + is_null_round: true, + ..Default::default() + }); + } + // validate the db has a hole here and error if not, we don't attempt to repair because something must be very wrong for this to fail + return self.validate_is_null_round(epoch).await; + } + + // if the index is empty -> short-circuit and simply backfill if applicable + if is_index_empty { + check_backfill_required(epoch, backfill)?; + return self.backfill_missing_tipset(&ts).await; + } + + // see if the tipset at this epoch is already indexed or if we need to backfill + if let Some((reverted_count, non_reverted_count)) = + self.get_tipset_counts_at_height(epoch).await? + { + if reverted_count == 0 && non_reverted_count == 0 { + check_backfill_required(epoch, backfill)?; + return self.backfill_missing_tipset(&ts).await; + } else if reverted_count > 0 && non_reverted_count == 0 { + anyhow::bail!("index corruption: height {epoch} only has reverted tipsets"); + } else if non_reverted_count > 1 { + anyhow::bail!("index corruption: height {epoch} has multiple non-reverted tipsets"); + } + } else { + check_backfill_required(epoch, backfill)?; + return self.backfill_missing_tipset(&ts).await; + } + + // fetch the non-reverted tipset at this epoch + let indexed_tsk_cid_bytes: Vec = + sqlx::query(self.stmts.get_non_reverted_tipset_at_height) + .bind(epoch) + .fetch_one(&self.db) + .await? + .get(0); + let indexed_tsk_cid = Cid::read_bytes(indexed_tsk_cid_bytes.as_slice())?; + let expected_tsk_cid = ts.key().cid()?; + anyhow::ensure!( + indexed_tsk_cid == expected_tsk_cid, + "index corruption: indexed tipset at height {epoch} has key {indexed_tsk_cid}, but canonical chain has {expected_tsk_cid}", + ); + let ( + IndexedTipsetData { + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + }, + backfilled, + ) = if let Ok(r) = self.get_and_verify_indexed_data(&ts).await { + (r, false) + } else { + self.backfill_missing_tipset(&ts).await?; + (self.get_and_verify_indexed_data(&ts).await?, true) + }; + Ok(ChainIndexValidation { + tip_set_key: ts.key().clone().into(), + height: ts.epoch(), + backfilled, + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + is_null_round: false, + }) + } + + async fn validate_is_null_round( + &self, + epoch: ChainEpoch, + ) -> anyhow::Result { + // make sure we do not have tipset(reverted or non-reverted) indexed at this epoch + let is_null_round: bool = sqlx::query(self.stmts.has_null_round_at_height) + .bind(epoch) + .fetch_one(&self.db) + .await? + .get(0); + anyhow::ensure!( + is_null_round, + "index corruption: height {epoch} should be a null round but is not" + ); + Ok(ChainIndexValidation { + height: epoch, + is_null_round: true, + ..Default::default() + }) + } + + async fn backfill_missing_tipset(&self, ts: &Tipset) -> anyhow::Result { + let execution_ts = self.get_next_tipset(ts)?; + let mut tx = self.db.begin().await?; + self.index_tipset_and_parent_events_with_tx(&mut tx, &execution_ts) + .await?; + tx.commit().await?; + let IndexedTipsetData { + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + } = self.get_indexed_tipset_data(ts).await?; + Ok(ChainIndexValidation { + tip_set_key: ts.key().clone().into(), + height: ts.epoch(), + backfilled: true, + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + is_null_round: false, + }) + } + + fn get_next_tipset(&self, ts: &Tipset) -> anyhow::Result { + let child = self.cs.chain_index().tipset_by_height( + ts.epoch() + 1, + self.cs.heaviest_tipset(), + ResolveNullTipset::TakeNewer, + )?; + anyhow::ensure!( + child.parents() == ts.key(), + "chain forked at height {}; please retry your request; err: chain forked", + ts.epoch() + ); + Ok(child) + } + + async fn get_and_verify_indexed_data(&self, ts: &Tipset) -> anyhow::Result { + let indexed_tipset_data = self.get_indexed_tipset_data(ts).await?; + self.verify_indexed_data(ts, &indexed_tipset_data).await?; + Ok(indexed_tipset_data) + } + + /// verifies that the indexed data for a tipset is correct by comparing the number of messages and events + /// in the chain store to the number of messages and events indexed. + async fn verify_indexed_data( + &self, + ts: &Tipset, + indexed_tipset_data: &IndexedTipsetData, + ) -> anyhow::Result<()> { + let tsk_cid = ts.key().cid()?; + let tsk_cid_bytes = tsk_cid.to_bytes(); + let execution_ts = self.get_next_tipset(ts)?; + + // given that `ts` is on the canonical chain and `execution_ts` is the next tipset in the chain + // `ts` can not have reverted events + let has_reverted_events_in_tipset: bool = + sqlx::query(self.stmts.has_reverted_events_in_tipset) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + anyhow::ensure!( + !has_reverted_events_in_tipset, + "index corruption: reverted events found for an executed tipset {tsk_cid} at height {}", + ts.epoch() + ); + let executed_messages = self.load_executed_messages(ts, &execution_ts)?; + anyhow::ensure!( + executed_messages.len() as u64 == indexed_tipset_data.indexed_messages_count, + "message count mismatch for height {}: chainstore has {}, index has {}", + ts.epoch(), + executed_messages.len(), + indexed_tipset_data.indexed_messages_count + ); + let mut events_count = 0; + let mut event_entries_count = 0; + for (_, _, events) in &executed_messages { + if let Some(events) = events { + events_count += events.len(); + for event in events { + event_entries_count += event.event().entries().len(); + } + } + } + anyhow::ensure!( + events_count as u64 == indexed_tipset_data.indexed_events_count, + "event count mismatch for height {}: chainstore has {events_count}, index has {}", + ts.epoch(), + indexed_tipset_data.indexed_events_count + ); + anyhow::ensure!( + event_entries_count as u64 == indexed_tipset_data.indexed_event_entries_count, + "event entries count mismatch for height {}: chainstore has {event_entries_count}, index has {}", + ts.epoch(), + indexed_tipset_data.indexed_event_entries_count + ); + + // compare the events AMT root between the indexed events and the events in the chain state + for (message, receipt, _) in executed_messages { + let msg_cid = message.cid(); + let indexed_events_root = self.amt_root_for_events(&tsk_cid, &msg_cid).await?; + match (indexed_events_root, receipt.events_root()) { + (Some(a), Some(b)) => { + anyhow::ensure!( + a == b, + "index corruption: events AMT root mismatch for message {msg_cid} at height {}. Index root: {a}, Receipt root: {b}", + ts.epoch() + ); + } + (None, None) => continue, + (Some(_), None) => { + anyhow::bail!( + "index corruption: events found in index for message {msg_cid} at height {}, but message receipt has no events root", + ts.epoch() + ) + } + (None, Some(b)) => { + anyhow::bail!( + "index corruption: no events found in index for message {msg_cid} at height {}, but message receipt has events root {b}", + ts.epoch() + ) + } + } + } + + Ok(()) + } + + async fn get_indexed_tipset_data(&self, ts: &Tipset) -> anyhow::Result { + let tsk_cid_bytes = ts.key().cid()?.to_bytes(); + let indexed_messages_count = sqlx::query(self.stmts.get_non_reverted_tipset_message_count) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + let indexed_events_count = sqlx::query(self.stmts.get_non_reverted_tipset_event_count) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + let indexed_event_entries_count = + sqlx::query(self.stmts.get_non_reverted_tipset_event_entries_count) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + Ok(IndexedTipsetData { + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + }) + } + + async fn get_tipset_counts_at_height( + &self, + epoch: ChainEpoch, + ) -> anyhow::Result> { + let row = sqlx::query(self.stmts.count_tipsets_at_height) + .bind(epoch) + .fetch_optional(&self.db) + .await?; + Ok(row.map(|r| (r.get(0), r.get(1)))) + } + + async fn amt_root_for_events( + &self, + tsk_cid: &Cid, + msg_cid: &Cid, + ) -> anyhow::Result> { + use crate::state_manager::EVENTS_AMT_BITWIDTH; + use fil_actors_shared::fvm_ipld_amt::Amt; + use fvm_ipld_blockstore::MemoryBlockstore; + use fvm_shared4::event::{ActorEvent, Entry, Flags, StampedEvent}; + + let mut tx = self.db.begin().await?; + let rows = sqlx::query(self.stmts.get_event_id_and_emitter_id) + .bind(tsk_cid.to_bytes()) + .bind(msg_cid.to_bytes()) + .fetch_all(tx.deref_mut()) + .await?; + if rows.is_empty() { + Ok(None) + } else { + let mut events = Amt::new_with_bit_width(MemoryBlockstore::new(), EVENTS_AMT_BITWIDTH); + for (i, row) in rows.iter().enumerate() { + let event_id: i64 = row.get(0); + let actor_id: i64 = row.get(1); + let mut event = StampedEvent { + emitter: actor_id as _, + event: ActorEvent { entries: vec![] }, + }; + let rows2 = sqlx::query(self.stmts.get_event_entries) + .bind(event_id) + .fetch_all(tx.deref_mut()) + .await?; + for row2 in rows2 { + let flags: Vec = row2.get(0); + if let Some(&flags) = flags.first() { + let key: String = row2.get(1); + let codec: i64 = row2.get(2); + let value: Vec = row2.get(3); + event.event.entries.push(Entry { + flags: Flags::from_bits_retain(flags as _), + key, + codec: codec as _, + value, + }); + } + } + events.set(i as _, event)?; + } + + Ok(Some(events.flush()?)) + } + } + + fn load_executed_messages( + &self, + msg_ts: &Tipset, + receipt_ts: &Tipset, + ) -> anyhow::Result> { + let recompute_tipset_state_func = self + .recompute_tipset_state_func + .as_ref() + .context("recompute_tipset_state_func not set")?; + let msgs = self.cs.messages_for_tipset(msg_ts)?; + if msgs.is_empty() { + return Ok(vec![]); + } + let mut recomputed = false; + let recompute = || { + let tsk_cid = receipt_ts.key().cid()?; + tracing::warn!( + "failed to load receipts for tipset {tsk_cid} (epoch {}); recomputing tipset state", + receipt_ts.epoch() + ); + recompute_tipset_state_func(msg_ts.clone())?; + tracing::warn!( + "successfully recomputed tipset state and loaded events for tipset {tsk_cid} (epoch {})", + receipt_ts.epoch() + ); + anyhow::Ok(()) + }; + let receipts = match Receipt::get_receipts( + self.cs.blockstore(), + *receipt_ts.parent_message_receipts(), + ) { + Ok(receipts) => receipts, + Err(_) => { + recompute()?; + recomputed = true; + Receipt::get_receipts(self.cs.blockstore(), *receipt_ts.parent_message_receipts())? + } + }; + anyhow::ensure!( + msgs.len() == receipts.len(), + "mismatching message and receipt counts ({} msgs, {} rcts)", + msgs.len(), + receipts.len() + ); + let mut executed = Vec::with_capacity(msgs.len()); + for (message, receipt) in msgs.into_iter().zip(receipts.into_iter()) { + let events = if let Some(events_root) = receipt.events_root() { + Some( + match StampedEvent::get_events(self.cs.blockstore(), &events_root) { + Ok(events) => events, + Err(e) if recomputed => return Err(e), + Err(_) => { + recompute()?; + recomputed = true; + StampedEvent::get_events(self.cs.blockstore(), &events_root)? + } + }, + ) + } else { + None + }; + executed.push((message, receipt, events)); + } + Ok(executed) + } + + async fn revert_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + tracing::debug!("reverting tipset@{}[{}]", ts.epoch(), ts.key().terse()); + let tsk_cid_bytes = ts.key().cid()?.to_bytes(); + // Because of deferred execution in Filecoin, events at tipset T are reverted when a tipset T+1 is reverted. + // However, the tipet `T` itself is not reverted. + let pts = Tipset::load_required(self.cs.blockstore(), ts.parents())?; + let events_tsk_cid_bytes = pts.key().cid()?.to_bytes(); + let mut tx = self.db.begin().await?; + sqlx::query(self.stmts.update_tipset_to_reverted) + .bind(&tsk_cid_bytes) + .execute(tx.deref_mut()) + .await?; + // events are indexed against the message inclusion tipset, not the message execution tipset. + // So we need to revert the events for the message inclusion tipset + sqlx::query(self.stmts.update_events_to_reverted) + .bind(&events_tsk_cid_bytes) + .execute(tx.deref_mut()) + .await?; + tx.commit().await?; + Ok(()) + } + + async fn index_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + tracing::debug!("indexing tipset@{}[{}]", ts.epoch(), ts.key().terse()); + let mut tx = self.db.begin().await?; + self.index_tipset_and_parent_events_with_tx(&mut tx, ts) + .await?; + tx.commit().await?; + Ok(()) + } + + async fn index_tipset_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + ts: &Tipset, + ) -> anyhow::Result<()> { + let tsk_cid_bytes = ts.key().cid()?.to_bytes(); + if self + .restore_tipset_if_exists_with_tx(tx, &tsk_cid_bytes) + .await? + { + Ok(()) + } else { + let msgs = self + .cs + .messages_for_tipset(ts) + .map_err(|e| anyhow::anyhow!("failed to get messages for tipset: {e}"))?; + if msgs.is_empty() { + // If there are no messages, just insert the tipset and return + sqlx::query(self.stmts.insert_tipset_message) + .bind(&tsk_cid_bytes) + .bind(ts.epoch()) + .bind(0) + .bind(None::<&[u8]>) + .bind(-1) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to insert empty tipset: {e}"))?; + } else { + for (i, msg) in msgs.into_iter().enumerate() { + sqlx::query(self.stmts.insert_tipset_message) + .bind(&tsk_cid_bytes) + .bind(ts.epoch()) + .bind(0) + .bind(msg.cid().to_bytes()) + .bind(i as i64) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to insert tipset message: {e}"))?; + } + + for block in ts.block_headers() { + let (_, smsgs) = crate::chain::block_messages(self.cs.blockstore(), block) + .map_err(|e| anyhow::anyhow!("failed to get messages for block: {e}"))?; + for smsg in smsgs.into_iter().filter(SignedMessage::is_delegated) { + self.index_signed_message_with_tx(tx, &smsg) + .await + .map_err(|e| anyhow::anyhow!("failed to index eth tx hash: {e}"))?; + } + } + } + Ok(()) + } + } + + async fn index_tipset_and_parent_events_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + ts: &Tipset, + ) -> anyhow::Result<()> { + self.index_tipset_with_tx(tx, ts) + .await + .map_err(|e| anyhow::anyhow!("failed to index tipset: {e}"))?; + if ts.epoch() == 0 { + // Skip parent if ts is genesis + return Ok(()); + } + let pts = Tipset::load_required(self.cs.blockstore(), ts.parents())?; + // Index the parent tipset if it doesn't exist yet. + // This is necessary to properly index events produced by executing + // messages included in the parent tipset by the current tipset (deferred execution). + self.index_tipset_with_tx(tx, &pts) + .await + .map_err(|e| anyhow::anyhow!("failed to index parent tipset: {e}"))?; + // Now Index events + self.index_events_with_tx(tx, &pts, ts) + .await + .map_err(|e| anyhow::anyhow!("failed to index events: {e}")) + } + + async fn index_events_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + msg_ts: &Tipset, + execution_ts: &Tipset, + ) -> anyhow::Result<()> { + let actor_to_delegated_address_func = self + .actor_to_delegated_address_func + .as_ref() + .context("indexer can not index events without an address resolver")?; + // check if we have an event indexed for any message in the `msg_ts` tipset -> if so, there's nothig to do here + // this makes event inserts idempotent + let msg_tsk_cid_bytes = msg_ts.key().cid()?.to_bytes(); + + // if we've already indexed events for this tipset, mark them as unreverted and return + let rows_affected = sqlx::query(self.stmts.update_events_to_non_reverted) + .bind(&msg_tsk_cid_bytes) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to unrevert events for tipset: {e}"))? + .rows_affected(); + if rows_affected > 0 { + tracing::debug!( + "unreverted {rows_affected} events for tipset {}", + msg_ts.key() + ); + return Ok(()); + } + let executed_messages = self + .load_executed_messages(msg_ts, execution_ts) + .map_err(|e| anyhow::anyhow!("failed to load executed message: {e}"))?; + let mut event_count = 0; + let mut address_lookups = HashMap::default(); + for (message, _, events) in executed_messages { + let msg_cid_bytes = message.cid().to_bytes(); + + // read message id for this message cid and tipset key cid + let message_id: i64 = sqlx::query(self.stmts.get_msg_id_for_msg_cid_and_tipset) + .bind(&msg_tsk_cid_bytes) + .bind(&msg_cid_bytes) + .fetch_optional(tx.deref_mut()) + .await? + .with_context(|| { + format!( + "message id not found for message cid {} and tipset key {}", + message.cid(), + msg_ts.key() + ) + })? + .get(0); + + // Insert events for this message + if let Some(events) = events { + for event in events { + let emitter = event.emitter(); + let addr = if let Some(addr) = address_lookups.get(&emitter) { + *addr + } else { + let addr = actor_to_delegated_address_func(emitter, execution_ts)?; + address_lookups.insert(emitter, addr); + addr + }; + + let robust_addr_bytes = if addr.is_delegated() { + addr.to_bytes() + } else { + vec![] + }; + + // Insert event into events table + let event_id = sqlx::query(self.stmts.insert_event) + .bind(message_id) + .bind(event_count) + .bind(emitter as i64) + .bind(robust_addr_bytes) + .bind(0) + .execute(tx.deref_mut()) + .await? + .last_insert_rowid(); + + for entry in event.event().entries() { + let (flags, key, codec, value) = entry.into_parts(); + sqlx::query(self.stmts.insert_event_entry) + .bind(event_id) + .bind(is_indexed_flag(flags)) + .bind([flags as u8].as_slice()) + .bind(key) + .bind(codec as i64) + .bind(&value) + .execute(tx.deref_mut()) + .await?; + } + + event_count += 1; + } + } + } + Ok(()) + } + + async fn restore_tipset_if_exists_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + tsk_cid_bytes: &[u8], + ) -> anyhow::Result { + match sqlx::query(self.stmts.has_tipset) + .bind(tsk_cid_bytes) + .fetch_one(tx.deref_mut()) + .await + .map(|r| r.get(0)) + { + Ok(exists) => { + if exists { + sqlx::query(self.stmts.update_tipset_to_non_reverted) + .bind(tsk_cid_bytes) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to restore tipset: {e}"))?; + } + Ok(exists) + } + Err(e) => anyhow::bail!("failed to check if tipset exists: {e}"), + } + } + + async fn index_signed_message_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + smsg: &SignedMessage, + ) -> anyhow::Result<()> { + let (_, eth_tx) = eth_tx_from_signed_eth_message(smsg, self.cs.chain_config().eth_chain_id) + .map_err(|e| anyhow::anyhow!("failed to convert filecoin message to eth tx: {e}"))?; + let tx_hash = EthHash( + eth_tx + .eth_hash() + .map_err(|e| anyhow::anyhow!("failed to hash transaction: {e}"))?, + ); + self.index_eth_tx_hash_with_tx(tx, tx_hash, smsg.cid()) + .await + } + + async fn index_eth_tx_hash_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + tx_hash: EthHash, + msg_cid: Cid, + ) -> anyhow::Result<()> { + _ = sqlx::query(self.stmts.insert_eth_tx_hash) + .bind(tx_hash.to_string()) + .bind(msg_cid.to_string()) + .execute(tx.deref_mut()) + .await?; + Ok(()) + } +} + +fn is_indexed_flag(flag: u64) -> bool { + use crate::shim::fvm_shared_latest::event::Flags; + flag & (Flags::FLAG_INDEXED_KEY.bits() | Flags::FLAG_INDEXED_VALUE.bits()) > 0 +} + +fn check_backfill_required(epoch: ChainEpoch, backfill: bool) -> anyhow::Result<()> { + anyhow::ensure!( + backfill, + "missing tipset at height {epoch} in the chain index, set backfill flag to true to fix" + ); + Ok(()) +} diff --git a/src/chain/store/indexer/ddls.rs b/src/chain/store/indexer/ddls.rs new file mode 100644 index 000000000000..64000cb7e95e --- /dev/null +++ b/src/chain/store/indexer/ddls.rs @@ -0,0 +1,126 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +pub static DDLS: [&str; 10] = [ + r#"CREATE TABLE IF NOT EXISTS tipset_message ( + id INTEGER PRIMARY KEY, + tipset_key_cid BLOB NOT NULL, + height INTEGER NOT NULL, + reverted INTEGER NOT NULL, + message_cid BLOB, + message_index INTEGER, + UNIQUE (tipset_key_cid, message_cid) + )"#, + r#"CREATE TABLE IF NOT EXISTS eth_tx_hash ( + tx_hash TEXT PRIMARY KEY, + message_cid BLOB NOT NULL, + inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + )"#, + r#"CREATE TABLE IF NOT EXISTS event ( + id INTEGER PRIMARY KEY, + message_id INTEGER NOT NULL, + event_index INTEGER NOT NULL, + emitter_id INTEGER NOT NULL, + emitter_addr BLOB, + reverted INTEGER NOT NULL, + FOREIGN KEY (message_id) REFERENCES tipset_message(id) ON DELETE CASCADE, + UNIQUE (message_id, event_index) + )"#, + r#"CREATE TABLE IF NOT EXISTS event_entry ( + event_id INTEGER NOT NULL, + indexed INTEGER NOT NULL, + flags BLOB NOT NULL, + key TEXT NOT NULL, + codec INTEGER, + value BLOB NOT NULL, + FOREIGN KEY (event_id) REFERENCES event(id) ON DELETE CASCADE + )"#, + "CREATE INDEX IF NOT EXISTS insertion_time_index ON eth_tx_hash (inserted_at)", + "CREATE INDEX IF NOT EXISTS idx_message_cid ON tipset_message (message_cid)", + "CREATE INDEX IF NOT EXISTS idx_tipset_key_cid ON tipset_message (tipset_key_cid)", + "CREATE INDEX IF NOT EXISTS idx_event_message_id ON event (message_id)", + "CREATE INDEX IF NOT EXISTS idx_height ON tipset_message (height)", + "CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id)", +]; + +pub struct PreparedStatements { + pub has_tipset: &'static str, + pub is_index_empty: &'static str, + pub has_null_round_at_height: &'static str, + pub get_non_reverted_tipset_at_height: &'static str, + pub count_tipsets_at_height: &'static str, + pub get_non_reverted_tipset_message_count: &'static str, + pub get_non_reverted_tipset_event_count: &'static str, + pub has_reverted_events_in_tipset: &'static str, + pub get_non_reverted_tipset_event_entries_count: &'static str, + pub insert_eth_tx_hash: &'static str, + pub insert_tipset_message: &'static str, + pub update_tipset_to_non_reverted: &'static str, + pub update_tipset_to_reverted: &'static str, + pub update_events_to_non_reverted: &'static str, + pub update_events_to_reverted: &'static str, + pub get_msg_id_for_msg_cid_and_tipset: &'static str, + pub insert_event: &'static str, + pub insert_event_entry: &'static str, + pub remove_tipsets_before_height: &'static str, + pub remove_eth_hashes_older_than: &'static str, + pub get_event_entries: &'static str, + pub get_event_id_and_emitter_id: &'static str, +} + +impl Default for PreparedStatements { + fn default() -> Self { + let has_tipset = "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)"; + let is_index_empty = "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)"; + let has_null_round_at_height = + "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)"; + let get_non_reverted_tipset_at_height = + "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1"; + let count_tipsets_at_height = "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets"; + let get_non_reverted_tipset_message_count = "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL"; + let get_non_reverted_tipset_event_count = "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)"; + let has_reverted_events_in_tipset = "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ?))"; + let get_non_reverted_tipset_event_entries_count = "SELECT COUNT(ee.event_id) AS entry_count FROM event_entry ee JOIN event e ON ee.event_id = e.id JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.reverted = 0"; + let insert_eth_tx_hash = "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP"; + let insert_tipset_message = "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0"; + let update_tipset_to_non_reverted = + "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?"; + let update_tipset_to_reverted = + "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?"; + let update_events_to_non_reverted = "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ?)"; + let update_events_to_reverted = "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT id FROM tipset_message WHERE height >= ?)"; + let get_msg_id_for_msg_cid_and_tipset = "SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0"; + let insert_event = "INSERT INTO event (message_id, event_index, emitter_id, emitter_addr, reverted) VALUES (?, ?, ?, ?, ?)"; + let insert_event_entry = "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)"; + let remove_tipsets_before_height = "DELETE FROM tipset_message WHERE height < ?"; + let remove_eth_hashes_older_than = + "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)"; + let get_event_entries = "SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC"; + let get_event_id_and_emitter_id = "SELECT e.id, e.emitter_id FROM event e JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.message_cid = ? ORDER BY e.event_index ASC"; + + Self { + has_tipset, + is_index_empty, + has_null_round_at_height, + get_non_reverted_tipset_at_height, + count_tipsets_at_height, + get_non_reverted_tipset_message_count, + get_non_reverted_tipset_event_count, + has_reverted_events_in_tipset, + get_non_reverted_tipset_event_entries_count, + insert_eth_tx_hash, + insert_tipset_message, + update_tipset_to_non_reverted, + update_tipset_to_reverted, + update_events_to_non_reverted, + update_events_to_reverted, + get_msg_id_for_msg_cid_and_tipset, + insert_event, + insert_event_entry, + remove_tipsets_before_height, + remove_eth_hashes_older_than, + get_event_entries, + get_event_id_and_emitter_id, + } + } +} diff --git a/src/chain/store/indexer/tests.rs b/src/chain/store/indexer/tests.rs new file mode 100644 index 000000000000..f0947f4105c8 --- /dev/null +++ b/src/chain/store/indexer/tests.rs @@ -0,0 +1,40 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use crate::{ + blocks::{Chain4U, chain4u}, + db::MemoryDB, + networks::ChainConfig, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_indexer_new() { + let c4u = Chain4U::new(); + chain4u! { + in c4u; + t0 @ [_b0] + }; + + let bs = Arc::new(MemoryDB::default()); + let cs = Arc::new( + ChainStore::new( + bs.clone(), + bs.clone(), + bs, + Arc::new(ChainConfig::devnet()), + t0.min_ticket_block().clone(), + ) + .unwrap(), + ); + let temp_db_path = tempfile::Builder::new() + .suffix(".sqlite3") + .tempfile_in(std::env::temp_dir()) + .unwrap(); + let db = crate::utils::sqlite::open_file(temp_db_path.path()) + .await + .unwrap(); + SqliteIndexer::new(db, cs, Default::default()) + .await + .unwrap(); +} diff --git a/src/chain/store/mod.rs b/src/chain/store/mod.rs index 0aba76672816..dc6665a6aa05 100644 --- a/src/chain/store/mod.rs +++ b/src/chain/store/mod.rs @@ -5,6 +5,7 @@ pub mod base_fee; mod chain_store; mod errors; pub mod index; +pub mod indexer; mod tipset_tracker; mod weighted_quick_select; diff --git a/src/cli/subcommands/index_cmd.rs b/src/cli/subcommands/index_cmd.rs new file mode 100644 index 000000000000..73bb586f62c9 --- /dev/null +++ b/src/cli/subcommands/index_cmd.rs @@ -0,0 +1,92 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::{ + rpc::{ + self, RpcMethodExt as _, + chain::{ChainHead, ChainValidateIndex}, + }, + shim::clock::ChainEpoch, +}; +use clap::Subcommand; +use std::time::Instant; + +/// Manage the chain index +#[derive(Debug, Subcommand)] +pub enum IndexCommands { + /// validates the chain index entries for each epoch in descending order in the specified range, checking for missing or + /// inconsistent entries (i.e. the indexed data does not match the actual chain state). If '--backfill' is enabled + /// (which it is by default), it will attempt to backfill any missing entries using the `ChainValidateIndex` API. + ValidateBackfill { + /// specifies the starting tipset epoch for validation (inclusive) + #[arg(long, required = true)] + from: ChainEpoch, + /// specifies the ending tipset epoch for validation (inclusive) + #[arg(long, required = true)] + to: ChainEpoch, + /// determines whether to backfill missing index entries during validation + #[arg(long, default_missing_value = "true", default_value = "true")] + backfill: Option, + }, +} + +impl IndexCommands { + pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> { + match self { + Self::ValidateBackfill { from, to, backfill } => { + validate_backfill(&client, from, to, backfill.unwrap_or_default()).await + } + } + } +} + +async fn validate_backfill( + client: &rpc::Client, + from: ChainEpoch, + to: ChainEpoch, + backfill: bool, +) -> anyhow::Result<()> { + anyhow::ensure!( + from > 0, + "invalid from epoch: {from}, must be greater than 0" + ); + anyhow::ensure!(to > 0, "invalid to epoch: {to}, must be greater than 0"); + anyhow::ensure!( + to <= from, + "to epoch ({to}) must be less than or equal to from epoch ({from})" + ); + let head = ChainHead::call(client, ()).await?; + anyhow::ensure!( + from < head.epoch(), + "from epoch ({from}) must be less than chain head ({})", + head.epoch() + ); + let start = Instant::now(); + tracing::info!( + "starting chainindex validation; from epoch: {from}; to epoch: {to}; backfill: {backfill};" + ); + let mut backfills = 0; + let mut null_rounds = 0; + let mut validations = 0; + for epoch in (to..=from).rev() { + match ChainValidateIndex::call(client, (epoch, backfill)).await { + Ok(r) => { + if r.backfilled { + backfills += 1; + } else if r.is_null_round { + null_rounds += 1; + } else { + validations += 1; + } + } + Err(e) => { + tracing::warn!("Failed to validate index at epoch {epoch}: {e}"); + } + } + } + tracing::info!( + "done with {backfills} backfills, {null_rounds} null rounds, {validations} validations, took {}", + humantime::format_duration(start.elapsed()) + ); + Ok(()) +} diff --git a/src/cli/subcommands/mod.rs b/src/cli/subcommands/mod.rs index 29546f14b625..48dd6a2dbe2d 100644 --- a/src/cli/subcommands/mod.rs +++ b/src/cli/subcommands/mod.rs @@ -11,6 +11,7 @@ mod chain_cmd; mod config_cmd; mod f3_cmd; mod healthcheck_cmd; +mod index_cmd; mod info_cmd; mod mpool_cmd; mod net_cmd; @@ -22,9 +23,10 @@ mod wait_api_cmd; pub(super) use self::{ auth_cmd::AuthCommands, chain_cmd::ChainCommands, config_cmd::ConfigCommands, - f3_cmd::F3Commands, healthcheck_cmd::HealthcheckCommand, mpool_cmd::MpoolCommands, - net_cmd::NetCommands, shutdown_cmd::ShutdownCommand, snapshot_cmd::SnapshotCommands, - state_cmd::StateCommands, sync_cmd::SyncCommands, wait_api_cmd::WaitApiCommand, + f3_cmd::F3Commands, healthcheck_cmd::HealthcheckCommand, index_cmd::IndexCommands, + mpool_cmd::MpoolCommands, net_cmd::NetCommands, shutdown_cmd::ShutdownCommand, + snapshot_cmd::SnapshotCommands, state_cmd::StateCommands, sync_cmd::SyncCommands, + wait_api_cmd::WaitApiCommand, }; use crate::cli::subcommands::info_cmd::InfoCommand; pub(crate) use crate::cli_shared::cli::Config; @@ -95,12 +97,15 @@ pub enum Subcommand { #[command(subcommand)] Healthcheck(HealthcheckCommand), - /// Manages Filecoin Fast Finality (F3) interactions + /// Manage Filecoin Fast Finality (F3) interactions #[command(subcommand)] F3(F3Commands), /// Wait for lotus API to come online WaitApi(WaitApiCommand), + + #[command(subcommand)] + Index(IndexCommands), } impl Subcommand { diff --git a/src/daemon/context.rs b/src/daemon/context.rs index f6a60538967d..de52fee0fa8e 100644 --- a/src/daemon/context.rs +++ b/src/daemon/context.rs @@ -3,6 +3,7 @@ use crate::auth::{ADMIN, create_token, generate_priv_key}; use crate::chain::ChainStore; +use crate::chain::indexer::{SqliteIndexer, SqliteIndexerOptions}; use crate::cli_shared::chain_path; use crate::cli_shared::cli::CliOpts; use crate::daemon::asyncify; @@ -11,13 +12,16 @@ use crate::daemon::db_util::load_all_forest_cars_with_cleanup; use crate::db::car::ManyCar; use crate::db::db_engine::{db_root, open_db}; use crate::db::parity_db::ParityDb; -use crate::db::{CAR_DB_DIR_NAME, DummyStore, EthMappingsStore}; +use crate::db::{ + CAR_DB_DIR_NAME, DummyStore, EthMappingsStore, INDEX_DB_DIR_NAME, INDEX_DB_FILE_NAME, +}; use crate::genesis::read_genesis_header; +use crate::interpreter::VMTrace; use crate::libp2p::{Keypair, PeerId}; use crate::networks::ChainConfig; use crate::rpc::sync::SnapshotProgressTracker; -use crate::shim::address::CurrentNetwork; -use crate::state_manager::StateManager; +use crate::shim::address::{Address, CurrentNetwork}; +use crate::state_manager::{NO_CALLBACK, StateManager}; use crate::{ Config, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, JWT_IDENTIFIER, KeyStore, KeyStoreConfig, @@ -27,7 +31,7 @@ use dialoguer::console::Term; use fvm_shared4::address::Network; use parking_lot::RwLock; use std::cell::RefCell; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use tracing::{info, warn}; @@ -37,6 +41,7 @@ pub struct AppContext { pub db: Arc, pub db_meta_data: DbMetadata, pub state_manager: Arc>, + pub chain_indexer: Option>>, pub keystore: Arc>, pub admin_jwt: String, pub snapshot_progress_tracker: SnapshotProgressTracker, @@ -50,12 +55,53 @@ impl AppContext { let state_manager = create_state_manager(cfg, &db, &chain_cfg).await?; let (keystore, admin_jwt) = load_or_create_keystore_and_configure_jwt(opts, cfg).await?; let snapshot_progress_tracker = SnapshotProgressTracker::default(); + let chain_indexer = if cfg.chain_indexer.enable_indexer { + Some(Arc::new( + SqliteIndexer::new( + crate::utils::sqlite::open_file(db_meta_data.index_db_path()).await?, + state_manager.chain_store().clone(), + SqliteIndexerOptions::default().with_gc_retention_epochs( + cfg.chain_indexer.gc_retention_epochs.unwrap_or_default() as _, + ), + ) + .await? + .with_actor_to_delegated_address_func(Arc::new({ + let state_manager = state_manager.clone(); + move |actor_id, ts| { + let id_addr = Address::new_id(actor_id); + Ok( + match state_manager.get_required_actor(&id_addr, *ts.parent_state()) { + Ok(actor) => actor + .delegated_address + .map(Address::from) + .unwrap_or(id_addr), + Err(_) => id_addr, + }, + ) + } + })) + .with_recompute_tipset_state_func(Arc::new({ + let state_manager = state_manager.clone(); + move |ts| { + state_manager.compute_tipset_state_blocking( + ts, + NO_CALLBACK, + VMTrace::NotTraced, + )?; + Ok(()) + } + })), + )) + } else { + None + }; Ok(Self { net_keypair, p2p_peer_id, db, db_meta_data, state_manager, + chain_indexer, keystore, admin_jwt, snapshot_progress_tracker, @@ -184,15 +230,20 @@ pub type DbType = ManyCar>; pub(crate) struct DbMetadata { db_root_dir: PathBuf, forest_car_db_dir: PathBuf, + index_db_path: PathBuf, } impl DbMetadata { - pub(crate) fn get_root_dir(&self) -> PathBuf { - self.db_root_dir.clone() + pub(crate) fn root_dir(&self) -> &Path { + &self.db_root_dir } - pub(crate) fn get_forest_car_db_dir(&self) -> PathBuf { - self.forest_car_db_dir.clone() + pub(crate) fn forest_car_db_dir(&self) -> &Path { + &self.forest_car_db_dir + } + + pub(crate) fn index_db_path(&self) -> &Path { + &self.index_db_path } } @@ -201,6 +252,7 @@ impl DbMetadata { /// - load parity-db /// - load CAR database /// - load actor bundles +/// - setup index db folder and file async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc, DbMetadata)> { maybe_migrate_db(config); let chain_data_path = chain_path(config); @@ -209,6 +261,11 @@ async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc anyhow::Result<(Arc Self { - let (publisher, _) = broadcast::channel(1); + let (head_changes_tx, _) = broadcast::channel(1); TestApi { inner: Mutex::new(TestApiInner { max_actor_pending_messages, ..TestApiInner::default() }), - head_changes_tx: publisher, + head_changes_tx, } } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 35b9dfa1eac9..30789f6cb460 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -218,6 +218,29 @@ async fn get_f3_finality_tipset( }) } +pub enum ChainValidateIndex {} +impl RpcMethod<2> for ChainValidateIndex { + const NAME: &'static str = "Filecoin.ChainValidateIndex"; + const PARAM_NAMES: [&'static str; 2] = ["epoch", "backfill"]; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Write; + + type Params = (ChainEpoch, bool); + type Ok = ChainIndexValidation; + + async fn handle( + ctx: Ctx, + (epoch, backfill): Self::Params, + _: &http::Extensions, + ) -> Result { + if let Some(ci) = &ctx.chain_indexer { + Ok(ci.validate_index(epoch, backfill).await?) + } else { + Err(anyhow::anyhow!("chain indexer is disabled").into()) + } + } +} + pub enum ChainGetMessage {} impl RpcMethod<1> for ChainGetMessage { const NAME: &'static str = "Filecoin.ChainGetMessage"; diff --git a/src/rpc/methods/chain/types.rs b/src/rpc/methods/chain/types.rs index 54b728981ea0..0162a57b36f0 100644 --- a/src/rpc/methods/chain/types.rs +++ b/src/rpc/methods/chain/types.rs @@ -10,3 +10,25 @@ pub struct ObjStat { pub links: usize, } lotus_json_with_self!(ObjStat); + +#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq, Default)] +#[serde(rename_all = "PascalCase")] +pub struct ChainIndexValidation { + /// the key of the canonical tipset for this epoch + #[serde(with = "crate::lotus_json")] + #[schemars(with = "LotusJson")] + pub tip_set_key: ApiTipsetKey, + /// the epoch height at which the validation is performed. + pub height: ChainEpoch, + /// the number of indexed messages for the canonical tipset at this epoch + pub indexed_messages_count: u64, + /// the number of indexed events for the canonical tipset at this epoch + pub indexed_events_count: u64, + /// the number of indexed event entries for the canonical tipset at this epoch + pub indexed_event_entries_count: u64, + /// whether missing data was successfully backfilled into the index during validation + pub backfilled: bool, + /// if the epoch corresponds to a null round and therefore does not have any indexed messages or events + pub is_null_round: bool, +} +lotus_json_with_self!(ChainIndexValidation); diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 070614ff78a4..5d600413abe7 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -235,6 +235,7 @@ mod tests { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())), mpool: Arc::new(pool), + chain_indexer: Default::default(), bad_blocks: Some(Default::default()), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f7cc4bef31be..477bb295e75d 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -92,6 +92,7 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::chain::ForestChainExportStatus); $callback!($crate::rpc::chain::ForestChainExportCancel); $callback!($crate::rpc::chain::ChainGetTipsetByParentState); + $callback!($crate::rpc::chain::ChainValidateIndex); // common vertical $callback!($crate::rpc::common::Session); @@ -472,6 +473,7 @@ pub struct RPCState { pub keystore: Arc>, pub state_manager: Arc>, pub mpool: Arc>>>, + pub chain_indexer: Option>>, pub bad_blocks: Option>, pub msgs_in_tipset: Arc, pub sync_status: crate::chain_sync::SyncStatus, diff --git a/src/shim/address.rs b/src/shim/address.rs index 503fb5abcc3d..904576132295 100644 --- a/src/shim/address.rs +++ b/src/shim/address.rs @@ -174,6 +174,10 @@ impl Address { self.0.protocol() } + pub fn is_delegated(&self) -> bool { + self.protocol() == Protocol::Delegated + } + pub fn into_payload(self) -> Payload { self.0.into_payload() } diff --git a/src/shim/mod.rs b/src/shim/mod.rs index 346bfb8b5572..99c65227356c 100644 --- a/src/shim/mod.rs +++ b/src/shim/mod.rs @@ -37,3 +37,5 @@ pub mod fvm_latest { } pub type MethodNum = fvm_shared_latest::MethodNum; + +pub type ActorID = fvm_shared_latest::ActorID; diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 01b273eaa55c..6c3cd4b69ca0 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -120,6 +120,7 @@ where state_manager, keystore: Arc::new(RwLock::new(keystore)), mpool: Arc::new(message_pool), + chain_indexer: Default::default(), bad_blocks: Default::default(), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), diff --git a/src/tool/subcommands/api_cmd/api_compare_tests.rs b/src/tool/subcommands/api_cmd/api_compare_tests.rs index 3648e6ef7c98..14720a52a757 100644 --- a/src/tool/subcommands/api_cmd/api_compare_tests.rs +++ b/src/tool/subcommands/api_cmd/api_compare_tests.rs @@ -489,6 +489,7 @@ fn chain_tests_with_tipset( tipset: &Tipset, ) -> anyhow::Result> { let mut tests = vec![ + RpcTest::identity(ChainValidateIndex::request((tipset.epoch(), true))?), RpcTest::identity(ChainGetTipSetByHeight::request(( tipset.epoch(), Default::default(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index e20f7e4f866f..63c7f9bc36d1 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -147,6 +147,7 @@ async fn ctx( state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: Arc::new(message_pool), + chain_indexer: Default::default(), bad_blocks: Default::default(), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 463f95e220e0..e00a86566f18 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -165,6 +165,7 @@ async fn ctx( state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: Arc::new(message_pool), + chain_indexer: Default::default(), bad_blocks: Default::default(), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d724536e4c20..9e55375b23a3 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -15,7 +15,6 @@ pub mod p2p; pub mod proofs_api; pub mod rand; pub mod reqwest_resume; -#[cfg(feature = "sqlite")] pub mod sqlite; pub mod stats; pub mod stream; diff --git a/src/utils/sqlite/mod.rs b/src/utils/sqlite/mod.rs index 0a252649d291..0118bc4ddbe3 100644 --- a/src/utils/sqlite/mod.rs +++ b/src/utils/sqlite/mod.rs @@ -34,16 +34,6 @@ pub async fn open_file(file: &Path) -> anyhow::Result { Ok(open(options).await?) } -/// Opens for creates an in-memory database -pub async fn open_memory() -> sqlx::Result { - open( - SqliteConnectOptions::new() - .in_memory(true) - .shared_cache(true), - ) - .await -} - /// Opens a database at the given path. If the database does not exist, it will be created. pub async fn open(options: SqliteConnectOptions) -> sqlx::Result { let options = options @@ -54,7 +44,8 @@ pub async fn open(options: SqliteConnectOptions) -> sqlx::Result { .journal_mode(SqliteJournalMode::Wal) .pragma("journal_size_limit", "0") // always reset journal and wal files .foreign_keys(true) - .read_only(false); + .read_only(false) + .create_if_missing(true); SqlitePool::connect_with(options).await } @@ -89,13 +80,15 @@ pub async fn init_db<'q>( tx.commit().await }; - if sqlx::query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") + if sqlx::query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta'") .fetch_optional(db) .await .map_err(|e| anyhow::anyhow!("error looking for {name} database _meta table: {e}"))? .is_none() { - init(db, schema_version).await?; + init(db, schema_version).await.map_err(|e| { + anyhow::anyhow!("failed to initialize db version {schema_version}: {e}") + })?; } let found_version: u64 = sqlx::query_scalar("SELECT max(version) FROM _meta")