From 09079d492415dd75563f4c014a4b720f53757994 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 9 Mar 2026 09:26:10 +0800 Subject: [PATCH 01/20] feat: sqlite indexer --- Cargo.toml | 15 +- src/chain/store/indexer.rs | 813 ++++++++++++++++++ src/chain/store/indexer/ddls.rs | 113 +++ src/chain/store/indexer/tests.rs | 40 + src/chain/store/mod.rs | 1 + src/daemon/context.rs | 74 +- src/daemon/mod.rs | 90 +- src/db/mod.rs | 2 + src/rpc/methods/chain.rs | 23 + src/rpc/methods/chain/types.rs | 22 + src/rpc/methods/sync.rs | 1 + src/rpc/mod.rs | 2 + src/shim/address.rs | 4 + src/shim/mod.rs | 2 + src/tool/offline_server/server.rs | 1 + .../subcommands/api_cmd/api_compare_tests.rs | 1 + .../api_cmd/generate_test_snapshot.rs | 1 + src/tool/subcommands/api_cmd/test_snapshot.rs | 1 + src/utils/mod.rs | 1 - src/utils/sqlite/mod.rs | 19 +- 20 files changed, 1167 insertions(+), 59 deletions(-) create mode 100644 src/chain/store/indexer.rs create mode 100644 src/chain/store/indexer/ddls.rs create mode 100644 src/chain/store/indexer/tests.rs diff --git a/Cargo.toml b/Cargo.toml index 75a021dc8767..1caecd2984ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,6 +200,7 @@ smallvec = "1" smart-default = "0.7" sonic-rs = "0.5" spire_enum = "1" +sqlx = { version = "0.8", default-features = false, features = ["sqlite", "runtime-tokio", "macros"] } stacker = "0.1" static_assertions = "1" statrs = "0.18" @@ -228,7 +229,6 @@ zstd = "0.13" # optional dependencies console-subscriber = { version = "0.5", features = ["parking_lot"], optional = true } -sqlx = { version = "0.8", default-features = false, features = ["sqlite", "runtime-tokio", "macros"], optional = true } tikv-jemallocator = { version = "0.6", optional = true } tracing-loki = { version = "0.2", default-features = false, features = ["compat-0-2-1", "rustls"], optional = true } @@ -319,14 +319,13 @@ lto = "fat" # These should be refactored (probably removed) in #2984 [features] -default = ["jemalloc", "tracing-loki", "sqlite"] -test = [] # default feature set for unit tests +default = ["jemalloc", "tracing-loki"] +test = [] # default feature set for unit tests slim = ["rustalloc"] -cargo-test = [] # group of tests that is recommended to run with `cargo test` instead of `nextest` -doctest-private = [] # see lib.rs::doctest_private -benchmark-private = ["dep:criterion"] # see lib.rs::benchmark_private -interop-tests-private = [] # see lib.rs::interop_tests_private -sqlite = ["dep:sqlx"] +cargo-test = [] # group of tests that is recommended to run with `cargo test` instead of `nextest` +doctest-private = [] # see lib.rs::doctest_private +benchmark-private = ["dep:criterion"] # see lib.rs::benchmark_private +interop-tests-private = [] # see lib.rs::interop_tests_private # Allocator. Use at most one of these. rustalloc = [] diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs new file mode 100644 index 000000000000..b987b6ed0f08 --- /dev/null +++ b/src/chain/store/indexer.rs @@ -0,0 +1,813 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +mod ddls; +#[cfg(test)] +mod tests; + +use ahash::HashMap; +use anyhow::Context as _; +use cid::Cid; +pub use ddls::{DDLS, PreparedStatements}; +use fvm_ipld_blockstore::Blockstore; +use sqlx::Row as _; + +use crate::{ + blocks::Tipset, + chain::{ChainStore, HeadChange, index::ResolveNullTipset}, + message::{ChainMessage, SignedMessage}, + rpc::{ + chain::types::ChainIndexValidation, + eth::{eth_tx_from_signed_eth_message, types::EthHash}, + }, + shim::{ + ActorID, + address::Address, + clock::{ChainEpoch, EPOCHS_IN_DAY}, + executor::{Receipt, StampedEvent}, + }, + utils::sqlite, +}; +use std::{ + ops::DerefMut as _, + sync::Arc, + time::{Duration, Instant}, +}; + +type ActorToDelegatedAddressFunc = + Arc anyhow::Result
+ Send + Sync + 'static>; + +type RecomputeTipsetStateFunc = Arc anyhow::Result<()> + Send + Sync + 'static>; + +type ExecutedMessage = (ChainMessage, Receipt, Option>); + +struct IndexedTipsetData { + pub indexed_messages_count: u64, + pub indexed_events_count: u64, + pub indexed_event_entries_count: u64, +} + +#[derive(Debug, smart_default::SmartDefault)] +pub struct SqliteIndexerOptions { + pub gc_retention_epochs: ChainEpoch, + pub reconcile_empty_index: bool, + #[default(3 * EPOCHS_IN_DAY as u64)] + pub max_reconcile_tipsets: u64, +} + +impl SqliteIndexerOptions { + fn validate(&self) -> anyhow::Result<()> { + anyhow::ensure!( + self.gc_retention_epochs == 0 || self.gc_retention_epochs >= EPOCHS_IN_DAY, + "gc retention epochs must be 0 or greater than {EPOCHS_IN_DAY}" + ); + Ok(()) + } + + pub fn with_gc_retention_epochs(mut self, gc_retention_epochs: ChainEpoch) -> Self { + self.gc_retention_epochs = gc_retention_epochs; + self + } +} + +pub struct SqliteIndexer { + // ensures writes are serialized so backfilling does not race with index updates + mu: tokio::sync::Mutex<()>, + options: SqliteIndexerOptions, + cs: Arc>, + db: sqlx::SqlitePool, + stmts: PreparedStatements, + actor_to_delegated_address_func: Option, + recompute_tipset_state_func: Option, +} + +impl SqliteIndexer +where + BS: Blockstore, +{ + pub async fn new( + db: sqlx::SqlitePool, + cs: Arc>, + options: SqliteIndexerOptions, + ) -> anyhow::Result { + options.validate()?; + sqlite::init_db( + &db, + "chain index", + DDLS.iter().cloned().map(sqlx::query), + vec![], + ) + .await?; + let stmts = PreparedStatements::default(); + Ok(Self { + mu: tokio::sync::Mutex::const_new(()), + options, + cs, + db, + stmts, + actor_to_delegated_address_func: None, + recompute_tipset_state_func: None, + }) + } + + pub fn with_actor_to_delegated_address_func(mut self, f: ActorToDelegatedAddressFunc) -> Self { + self.actor_to_delegated_address_func = Some(f); + self + } + + pub fn with_recompute_tipset_state_func(mut self, f: RecomputeTipsetStateFunc) -> Self { + self.recompute_tipset_state_func = Some(f); + self + } + + pub async fn index_loop( + &self, + mut head_change_subscriber: tokio::sync::broadcast::Receiver, + ) -> anyhow::Result<()> { + loop { + match head_change_subscriber.recv().await? { + HeadChange::Apply(ts) => { + if let Err(e) = self.index_tipset(&ts).await { + tracing::warn!( + "failed to index new head@{}({}): {e}", + ts.epoch(), + ts.key() + ); + } + } + } + } + } + + pub async fn gc_loop(&self) { + if self.options.gc_retention_epochs <= 0 { + tracing::info!("gc retention epochs is not set, skipping gc"); + return; + } + + let mut ticker = tokio::time::interval(Duration::from_hours(4)); + loop { + ticker.tick().await; + self.gc().await; + } + } + + async fn gc(&self) { + tracing::info!("starting index gc"); + let head = self.cs.heaviest_tipset(); + let removal_epoch = head.epoch() - self.options.gc_retention_epochs - 10; // 10 is for some grace period + if removal_epoch <= 0 { + tracing::info!("no tipsets to gc"); + return; + } + + tracing::info!( + "gc'ing all (reverted and non-reverted) tipsets before epoch {removal_epoch}" + ); + match sqlx::query(self.stmts.remove_tipsets_before_height) + .execute(&self.db) + .await + { + Ok(r) => { + tracing::info!( + "gc'd {} entries before epoch {removal_epoch}", + r.rows_affected() + ); + } + Err(e) => { + tracing::error!( + "failed to remove reverted tipsets before height {removal_epoch}: {e}" + ); + return; + } + } + + // ------------------------------------------------------------------------------------------------- + // Also GC eth hashes + + // Convert `gc_retention_epochs` to number of days + let gc_retention_days = self.options.gc_retention_epochs / EPOCHS_IN_DAY; + if gc_retention_days < 1 { + tracing::info!("skipping gc of eth hashes as retention days is less than 1"); + return; + } + + tracing::info!("gc'ing eth hashes older than {gc_retention_days} days"); + match sqlx::query(self.stmts.remove_eth_hashes_older_than) + .execute(&self.db) + .await + { + Ok(r) => { + tracing::info!( + "gc'd {} eth hashes older than {gc_retention_days} days", + r.rows_affected() + ); + } + Err(e) => { + tracing::error!("failed to gc eth hashes older than {gc_retention_days} days: {e}"); + } + } + } + + pub async fn validate_index( + &self, + epoch: ChainEpoch, + backfill: bool, + ) -> anyhow::Result { + let head = self.cs.heaviest_tipset(); + anyhow::ensure!( + epoch < head.epoch(), + "cannot validate index at epoch {epoch}, can only validate at an epoch less than chain head epoch {}", + head.epoch() + ); + let ts = + self.cs + .chain_index() + .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?; + let is_index_empty: bool = sqlx::query(self.stmts.is_index_empty) + .fetch_one(&self.db) + .await? + .get(0); + + // Canonical chain has a null round at the epoch -> return if index is empty otherwise validate that index also + // has a null round at this epoch i.e. it does not have anything indexed at all for this epoch + if ts.epoch() != epoch { + if is_index_empty { + return Ok(ChainIndexValidation { + height: epoch, + is_null_round: true, + ..Default::default() + }); + } + // validate the db has a hole here and error if not, we don't attempt to repair because something must be very wrong for this to fail + return self.validate_is_null_round(epoch).await; + } + + // if the index is empty -> short-circuit and simply backfill if applicable + if is_index_empty { + check_backfill_required(epoch, backfill)?; + return self.backfill_missing_tipset(&ts).await; + } + + // see if the tipset at this epoch is already indexed or if we need to backfill + if let Some((reverted_count, non_reverted_count)) = + self.get_tipset_counts_at_height(epoch).await? + { + if reverted_count == 0 && non_reverted_count == 0 { + check_backfill_required(epoch, backfill)?; + return self.backfill_missing_tipset(&ts).await; + } else if reverted_count > 0 && non_reverted_count == 0 { + anyhow::bail!("index corruption: height {epoch} only has reverted tipsets"); + } else if non_reverted_count > 1 { + anyhow::bail!("index corruption: height {epoch} has multiple non-reverted tipsets"); + } + } else { + check_backfill_required(epoch, backfill)?; + return self.backfill_missing_tipset(&ts).await; + } + + // fetch the non-reverted tipset at this epoch + let indexed_tsk_cid_bytes: Vec = + sqlx::query(self.stmts.get_non_reverted_tipset_at_height) + .bind(epoch) + .fetch_one(&self.db) + .await? + .get(0); + let indexed_tsk_cid = Cid::read_bytes(indexed_tsk_cid_bytes.as_slice())?; + let expected_tsk_cid = ts.key().cid()?; + anyhow::ensure!( + indexed_tsk_cid == expected_tsk_cid, + "index corruption: indexed tipset at height {epoch} has key {indexed_tsk_cid}, but canonical chain has {expected_tsk_cid}", + ); + let ( + IndexedTipsetData { + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + }, + backfilled, + ) = if let Ok(r) = self.get_and_verify_indexed_data(&ts).await { + (r, false) + } else { + self.backfill_missing_tipset(&ts).await?; + (self.get_and_verify_indexed_data(&ts).await?, true) + }; + Ok(ChainIndexValidation { + tip_set_key: ts.key().clone().into(), + height: ts.epoch(), + backfilled, + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + is_null_round: false, + }) + } + + async fn validate_is_null_round( + &self, + epoch: ChainEpoch, + ) -> anyhow::Result { + // make sure we do not have tipset(reverted or non-reverted) indexed at this epoch + let is_null_round: bool = sqlx::query(self.stmts.has_null_round_at_height) + .bind(epoch) + .fetch_one(&self.db) + .await? + .get(0); + anyhow::ensure!( + is_null_round, + "index corruption: height {epoch} should be a null round but is not" + ); + Ok(ChainIndexValidation { + height: epoch, + is_null_round: true, + ..Default::default() + }) + } + + async fn backfill_missing_tipset(&self, ts: &Tipset) -> anyhow::Result { + let execution_ts = self.get_next_tipset(ts)?; + let mut tx = self.db.begin().await?; + self.index_tipset_and_parent_events_with_tx(&mut tx, &execution_ts) + .await?; + tx.commit().await?; + let IndexedTipsetData { + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + } = self.get_indexed_tipset_data(ts).await?; + Ok(ChainIndexValidation { + tip_set_key: ts.key().clone().into(), + height: ts.epoch(), + backfilled: true, + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + is_null_round: false, + }) + } + + fn get_next_tipset(&self, ts: &Tipset) -> anyhow::Result { + let child = self.cs.chain_index().tipset_by_height( + ts.epoch() + 1, + self.cs.heaviest_tipset(), + ResolveNullTipset::TakeNewer, + )?; + anyhow::ensure!( + child.parents() == ts.key(), + "chain forked at height {}; please retry your request; err: chain forked", + ts.epoch() + ); + Ok(child) + } + + async fn get_and_verify_indexed_data(&self, ts: &Tipset) -> anyhow::Result { + let indexed_tipset_data = self.get_indexed_tipset_data(&ts).await?; + self.verify_indexed_data(ts, &indexed_tipset_data).await?; + Ok(indexed_tipset_data) + } + + /// verifies that the indexed data for a tipset is correct by comparing the number of messages and events + /// in the chainstore to the number of messages and events indexed. + async fn verify_indexed_data( + &self, + ts: &Tipset, + indexed_tipset_data: &IndexedTipsetData, + ) -> anyhow::Result<()> { + let tsk_cid = ts.key().cid()?; + let tsk_cid_bytes = tsk_cid.to_bytes(); + let execution_ts = self.get_next_tipset(ts)?; + + // given that `ts` is on the canonical chain and `execution_ts` is the next tipset in the chain + // `ts` can not have reverted events + let has_reverted_events_in_tipset: bool = + sqlx::query(self.stmts.has_reverted_events_in_tipset) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + anyhow::ensure!( + !has_reverted_events_in_tipset, + "index corruption: reverted events found for an executed tipset {tsk_cid} at height {}", + ts.epoch() + ); + let executed_messages = self.load_executed_messages(ts, &execution_ts)?; + anyhow::ensure!( + executed_messages.len() as u64 == indexed_tipset_data.indexed_messages_count, + "message count mismatch for height {}: chainstore has {}, index has {}", + ts.epoch(), + executed_messages.len(), + indexed_tipset_data.indexed_messages_count + ); + let mut events_count = 0; + let mut event_entries_count = 0; + for (_, _, events) in &executed_messages { + if let Some(events) = events { + events_count += events.len(); + for event in events { + event_entries_count += event.event().entries().len(); + } + } + } + anyhow::ensure!( + events_count as u64 == indexed_tipset_data.indexed_events_count, + "event count mismatch for height {}: chainstore has {events_count}, index has {}", + ts.epoch(), + indexed_tipset_data.indexed_events_count + ); + anyhow::ensure!( + event_entries_count as u64 == indexed_tipset_data.indexed_event_entries_count, + "event entries count mismatch for height {}: chainstore has {event_entries_count}, index has {}", + ts.epoch(), + indexed_tipset_data.indexed_event_entries_count + ); + + // compare the events AMT root between the indexed events and the events in the chain state + for (message, _, _) in executed_messages {} + + Ok(()) + } + + async fn get_indexed_tipset_data(&self, ts: &Tipset) -> anyhow::Result { + let tsk_cid_bytes = ts.key().cid()?.to_bytes(); + let indexed_messages_count = sqlx::query(self.stmts.get_non_reverted_tipset_message_count) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + let indexed_events_count = sqlx::query(self.stmts.get_non_reverted_tipset_event_count) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + let indexed_event_entries_count = + sqlx::query(self.stmts.get_non_reverted_tipset_event_entries_count) + .bind(&tsk_cid_bytes) + .fetch_one(&self.db) + .await? + .get(0); + Ok(IndexedTipsetData { + indexed_messages_count, + indexed_events_count, + indexed_event_entries_count, + }) + } + + async fn get_tipset_counts_at_height( + &self, + epoch: ChainEpoch, + ) -> anyhow::Result> { + let row = sqlx::query(self.stmts.count_tipsets_at_height) + .bind(epoch) + .fetch_optional(&self.db) + .await?; + Ok(row.map(|r| (r.get(0), r.get(1)))) + } + + fn load_executed_messages( + &self, + msg_ts: &Tipset, + receipt_ts: &Tipset, + ) -> anyhow::Result> { + let recompute_tipset_state_func = self + .recompute_tipset_state_func + .as_ref() + .context("recompute_tipset_state_func not set")?; + let msgs = self.cs.messages_for_tipset(msg_ts)?; + if msgs.is_empty() { + return Ok(vec![]); + } + let mut recomputed = false; + let recompute = || { + let tsk_cid = receipt_ts.key().cid()?; + tracing::warn!( + "failed to load receipts for tipset {tsk_cid} (epoch {}); recomputing tipset state", + receipt_ts.epoch() + ); + recompute_tipset_state_func(msg_ts.clone())?; + tracing::warn!( + "successfully recomputed tipset state and loaded events for tipset {tsk_cid} (epoch {})", + receipt_ts.epoch() + ); + anyhow::Ok(()) + }; + let receipts = match Receipt::get_receipts( + self.cs.blockstore(), + *receipt_ts.parent_message_receipts(), + ) { + Ok(receipts) => receipts, + Err(_) => { + recompute()?; + recomputed = true; + Receipt::get_receipts(self.cs.blockstore(), *receipt_ts.parent_message_receipts())? + } + }; + anyhow::ensure!( + msgs.len() == receipts.len(), + "mismatching message and receipt counts ({} msgs, {} rcts)", + msgs.len(), + receipts.len() + ); + let mut executed = Vec::with_capacity(msgs.len()); + for (message, receipt) in msgs.into_iter().zip(receipts.into_iter()) { + let events = if let Some(events_root) = receipt.events_root() { + Some( + match StampedEvent::get_events(self.cs.blockstore(), &events_root) { + Ok(events) => events, + Err(e) if recomputed => return Err(e), + Err(_) => { + recompute()?; + recomputed = true; + StampedEvent::get_events(self.cs.blockstore(), &events_root)? + } + }, + ) + } else { + None + }; + executed.push((message, receipt, events)); + } + Ok(executed) + } + + pub async fn populate(&self) -> anyhow::Result<()> { + let start = Instant::now(); + let head = self.cs.heaviest_tipset(); + tracing::info!( + "starting to populate chainindex at head epoch {}", + head.epoch() + ); + let mut tx = self.db.begin().await?; + let mut total_indexed = 0; + for ts in head.chain(self.cs.blockstore()) { + if let Err(e) = self.index_tipset_with_tx(&mut tx, &ts).await { + tracing::info!( + "stopping chainindex population at epoch {}: {e}", + ts.epoch() + ); + break; + } + total_indexed += 1; + } + tx.commit().await?; + tracing::info!( + "successfully populated chain index with {total_indexed} tipsets, took {}", + humantime::format_duration(start.elapsed()) + ); + Ok(()) + } + + pub async fn index_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + let mut tx = self.db.begin().await?; + self.index_tipset_and_parent_events_with_tx(&mut tx, ts) + .await?; + tx.commit().await?; + Ok(()) + } + + pub async fn index_tipset_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + ts: &Tipset, + ) -> anyhow::Result<()> { + let tsk_cid_bytes = ts.key().cid()?.to_bytes(); + if self + .restore_tipset_if_exists_with_tx(tx, &tsk_cid_bytes) + .await? + { + Ok(()) + } else { + let msgs = self + .cs + .messages_for_tipset(ts) + .map_err(|e| anyhow::anyhow!("failed to get messages for tipset: {e}"))?; + if msgs.is_empty() { + // If there are no messages, just insert the tipset and return + sqlx::query(self.stmts.insert_tipset_message) + .bind(&tsk_cid_bytes) + .bind(ts.epoch()) + .bind(0) + .bind(None::<&[u8]>) + .bind(-1) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to insert empty tipset: {e}"))?; + } else { + for (i, msg) in msgs.into_iter().enumerate() { + sqlx::query(self.stmts.insert_tipset_message) + .bind(&tsk_cid_bytes) + .bind(ts.epoch()) + .bind(0) + .bind(msg.cid().to_bytes()) + .bind(i as i64) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to insert tipset message: {e}"))?; + } + + for block in ts.block_headers() { + let (_, smsgs) = crate::chain::block_messages(self.cs.blockstore(), block) + .map_err(|e| anyhow::anyhow!("failed to get messages for block: {e}"))?; + for smsg in smsgs.into_iter().filter(SignedMessage::is_delegated) { + self.index_signed_message_with_tx(tx, &smsg) + .await + .map_err(|e| anyhow::anyhow!("failed to index eth tx hash: {e}"))?; + } + } + } + Ok(()) + } + } + + pub async fn index_tipset_and_parent_events_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + ts: &Tipset, + ) -> anyhow::Result<()> { + self.index_tipset_with_tx(tx, ts) + .await + .map_err(|e| anyhow::anyhow!("failed to index tipset: {e}"))?; + if ts.epoch() == 0 { + // Skip parent if ts is genesis + return Ok(()); + } + let pts = Tipset::load_required(self.cs.blockstore(), ts.parents())?; + // Index the parent tipset if it doesn't exist yet. + // This is necessary to properly index events produced by executing + // messages included in the parent tipset by the current tipset (deferred execution). + self.index_tipset_with_tx(tx, &pts) + .await + .map_err(|e| anyhow::anyhow!("failed to index parent tipset: {e}"))?; + // Now Index events + self.index_events_with_tx(tx, &pts, ts) + .await + .map_err(|e| anyhow::anyhow!("failed to index events: {e}")) + } + + pub async fn index_events_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + msg_ts: &Tipset, + execution_ts: &Tipset, + ) -> anyhow::Result<()> { + let actor_to_delegated_address_func = self + .actor_to_delegated_address_func + .as_ref() + .context("indexer can not index events without an address resolver")?; + // check if we have an event indexed for any message in the `msg_ts` tipset -> if so, there's nothig to do here + // this makes event inserts idempotent + let msg_tsk_cid_bytes = msg_ts.key().cid()?.to_bytes(); + + // if we've already indexed events for this tipset, mark them as unreverted and return + let rows_affected = sqlx::query(self.stmts.update_events_to_non_reverted) + .bind(&msg_tsk_cid_bytes) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to unrevert events for tipset: {e}"))? + .rows_affected(); + if rows_affected > 0 { + tracing::debug!( + "unreverted {rows_affected} events for tipset {}", + msg_ts.key() + ); + return Ok(()); + } + let executed_messages = self + .load_executed_messages(msg_ts, execution_ts) + .map_err(|e| anyhow::anyhow!("failed to load executed message: {e}"))?; + let mut event_count = 0; + let mut address_lookups = HashMap::default(); + for (message, _, events) in executed_messages { + let msg_cid_bytes = message.cid().to_bytes(); + + // read message id for this message cid and tipset key cid + let message_id: i64 = sqlx::query(self.stmts.get_msg_id_for_msg_cid_and_tipset) + .bind(&msg_tsk_cid_bytes) + .bind(&msg_cid_bytes) + .fetch_optional(tx.deref_mut()) + .await? + .with_context(|| { + format!( + "message id not found for message cid {} and tipset key {}", + message.cid(), + msg_ts.key() + ) + })? + .get(0); + + // Insert events for this message + if let Some(events) = events { + for event in events { + let emitter = event.emitter(); + let addr = if let Some(addr) = address_lookups.get(&emitter) { + *addr + } else { + let addr = actor_to_delegated_address_func(emitter, execution_ts)?; + address_lookups.insert(emitter, addr); + addr + }; + + let robust_addr_bytes = if addr.is_delegated() { + addr.to_bytes() + } else { + vec![] + }; + + // Insert event into events table + let event_id = sqlx::query(self.stmts.insert_event) + .bind(message_id) + .bind(event_count) + .bind(emitter as i64) + .bind(robust_addr_bytes) + .bind(0) + .execute(tx.deref_mut()) + .await? + .last_insert_rowid(); + + for entry in event.event().entries() { + let (flags, key, codec, value) = entry.into_parts(); + sqlx::query(self.stmts.insert_event_entry) + .bind(event_id) + .bind(is_indexed_flag(flags)) + .bind([flags as u8].as_slice()) + .bind(key) + .bind(codec as i64) + .bind(&value) + .execute(tx.deref_mut()) + .await?; + } + + event_count += 1; + } + } + } + Ok(()) + } + + pub async fn restore_tipset_if_exists_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + tsk_cid_bytes: &[u8], + ) -> anyhow::Result { + match sqlx::query(self.stmts.has_tipset) + .bind(tsk_cid_bytes) + .fetch_one(tx.deref_mut()) + .await + .map(|r| r.get(0)) + { + Ok(exists) => { + if exists { + sqlx::query(self.stmts.update_tipset_to_non_reverted) + .bind(tsk_cid_bytes) + .execute(tx.deref_mut()) + .await + .map_err(|e| anyhow::anyhow!("failed to restore tipset: {e}"))?; + } + Ok(exists) + } + Err(e) => anyhow::bail!("failed to check if tipset exists: {e}"), + } + } + + pub async fn index_signed_message_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + smsg: &SignedMessage, + ) -> anyhow::Result<()> { + let (_, eth_tx) = eth_tx_from_signed_eth_message(smsg, self.cs.chain_config().eth_chain_id) + .map_err(|e| anyhow::anyhow!("failed to convert filecoin message to eth tx: {e}"))?; + let tx_hash = EthHash( + eth_tx + .eth_hash() + .map_err(|e| anyhow::anyhow!("failed to hash transaction: {e}"))?, + ); + self.index_eth_tx_hash_with_tx(tx, tx_hash, smsg.cid()) + .await + } + + pub async fn index_eth_tx_hash_with_tx<'a>( + &self, + tx: &mut sqlx::SqliteTransaction<'a>, + tx_hash: EthHash, + msg_cid: Cid, + ) -> anyhow::Result<()> { + _ = sqlx::query(self.stmts.insert_eth_tx_hash) + .bind(tx_hash.to_string()) + .bind(msg_cid.to_string()) + .execute(tx.deref_mut()) + .await?; + Ok(()) + } +} + +fn is_indexed_flag(flag: u64) -> bool { + use crate::shim::fvm_shared_latest::event::Flags; + flag & (Flags::FLAG_INDEXED_KEY.bits() | Flags::FLAG_INDEXED_VALUE.bits()) > 0 +} + +fn check_backfill_required(epoch: ChainEpoch, backfill: bool) -> anyhow::Result<()> { + anyhow::ensure!( + backfill, + "missing tipset at height {epoch} in the chain index, set backfill flag to true to fix" + ); + Ok(()) +} diff --git a/src/chain/store/indexer/ddls.rs b/src/chain/store/indexer/ddls.rs new file mode 100644 index 000000000000..b4a416de067b --- /dev/null +++ b/src/chain/store/indexer/ddls.rs @@ -0,0 +1,113 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +pub static DDLS: [&str; 10] = [ + r#"CREATE TABLE IF NOT EXISTS tipset_message ( + id INTEGER PRIMARY KEY, + tipset_key_cid BLOB NOT NULL, + height INTEGER NOT NULL, + reverted INTEGER NOT NULL, + message_cid BLOB, + message_index INTEGER, + UNIQUE (tipset_key_cid, message_cid) + )"#, + r#"CREATE TABLE IF NOT EXISTS eth_tx_hash ( + tx_hash TEXT PRIMARY KEY, + message_cid BLOB NOT NULL, + inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + )"#, + r#"CREATE TABLE IF NOT EXISTS event ( + id INTEGER PRIMARY KEY, + message_id INTEGER NOT NULL, + event_index INTEGER NOT NULL, + emitter_id INTEGER NOT NULL, + emitter_addr BLOB, + reverted INTEGER NOT NULL, + FOREIGN KEY (message_id) REFERENCES tipset_message(id) ON DELETE CASCADE, + UNIQUE (message_id, event_index) + )"#, + r#"CREATE TABLE IF NOT EXISTS event_entry ( + event_id INTEGER NOT NULL, + indexed INTEGER NOT NULL, + flags BLOB NOT NULL, + key TEXT NOT NULL, + codec INTEGER, + value BLOB NOT NULL, + FOREIGN KEY (event_id) REFERENCES event(id) ON DELETE CASCADE + )"#, + "CREATE INDEX IF NOT EXISTS insertion_time_index ON eth_tx_hash (inserted_at)", + "CREATE INDEX IF NOT EXISTS idx_message_cid ON tipset_message (message_cid)", + "CREATE INDEX IF NOT EXISTS idx_tipset_key_cid ON tipset_message (tipset_key_cid)", + "CREATE INDEX IF NOT EXISTS idx_event_message_id ON event (message_id)", + "CREATE INDEX IF NOT EXISTS idx_height ON tipset_message (height)", + "CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id)", +]; + +pub struct PreparedStatements { + pub has_tipset: &'static str, + pub is_index_empty: &'static str, + pub has_null_round_at_height: &'static str, + pub get_non_reverted_tipset_at_height: &'static str, + pub count_tipsets_at_height: &'static str, + pub get_non_reverted_tipset_message_count: &'static str, + pub get_non_reverted_tipset_event_count: &'static str, + pub has_reverted_events_in_tipset: &'static str, + pub get_non_reverted_tipset_event_entries_count: &'static str, + pub insert_eth_tx_hash: &'static str, + pub insert_tipset_message: &'static str, + pub update_tipset_to_non_reverted: &'static str, + pub update_events_to_non_reverted: &'static str, + pub get_msg_id_for_msg_cid_and_tipset: &'static str, + pub insert_event: &'static str, + pub insert_event_entry: &'static str, + pub remove_tipsets_before_height: &'static str, + pub remove_eth_hashes_older_than: &'static str, +} + +impl Default for PreparedStatements { + fn default() -> Self { + let has_tipset = "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)"; + let is_index_empty = "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)"; + let has_null_round_at_height = + "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)"; + let get_non_reverted_tipset_at_height = + "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1"; + let count_tipsets_at_height = "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets"; + let get_non_reverted_tipset_message_count = "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL"; + let get_non_reverted_tipset_event_count = "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)"; + let has_reverted_events_in_tipset = "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ?))"; + let get_non_reverted_tipset_event_entries_count = "SELECT COUNT(ee.event_id) AS entry_count FROM event_entry ee JOIN event e ON ee.event_id = e.id JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.reverted = 0"; + let insert_eth_tx_hash = "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP"; + let insert_tipset_message = "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0"; + let update_tipset_to_non_reverted = + "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?"; + let update_events_to_non_reverted = "UPDATE event SET reverted = 0 WHERE message_id IN (SELECT id FROM tipset_message WHERE tipset_key_cid = ?)"; + let get_msg_id_for_msg_cid_and_tipset = "SELECT id FROM tipset_message WHERE tipset_key_cid = ? AND message_cid = ? AND reverted = 0"; + let insert_event = "INSERT INTO event (message_id, event_index, emitter_id, emitter_addr, reverted) VALUES (?, ?, ?, ?, ?)"; + let insert_event_entry = "INSERT INTO event_entry (event_id, indexed, flags, key, codec, value) VALUES (?, ?, ?, ?, ?, ?)"; + let remove_tipsets_before_height = "DELETE FROM tipset_message WHERE height < ?"; + let remove_eth_hashes_older_than = + "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)"; + + Self { + has_tipset, + is_index_empty, + has_null_round_at_height, + get_non_reverted_tipset_at_height, + count_tipsets_at_height, + get_non_reverted_tipset_message_count, + get_non_reverted_tipset_event_count, + has_reverted_events_in_tipset, + get_non_reverted_tipset_event_entries_count, + insert_eth_tx_hash, + insert_tipset_message, + update_tipset_to_non_reverted, + update_events_to_non_reverted, + get_msg_id_for_msg_cid_and_tipset, + insert_event, + insert_event_entry, + remove_tipsets_before_height, + remove_eth_hashes_older_than, + } + } +} diff --git a/src/chain/store/indexer/tests.rs b/src/chain/store/indexer/tests.rs new file mode 100644 index 000000000000..f0947f4105c8 --- /dev/null +++ b/src/chain/store/indexer/tests.rs @@ -0,0 +1,40 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use crate::{ + blocks::{Chain4U, chain4u}, + db::MemoryDB, + networks::ChainConfig, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_indexer_new() { + let c4u = Chain4U::new(); + chain4u! { + in c4u; + t0 @ [_b0] + }; + + let bs = Arc::new(MemoryDB::default()); + let cs = Arc::new( + ChainStore::new( + bs.clone(), + bs.clone(), + bs, + Arc::new(ChainConfig::devnet()), + t0.min_ticket_block().clone(), + ) + .unwrap(), + ); + let temp_db_path = tempfile::Builder::new() + .suffix(".sqlite3") + .tempfile_in(std::env::temp_dir()) + .unwrap(); + let db = crate::utils::sqlite::open_file(temp_db_path.path()) + .await + .unwrap(); + SqliteIndexer::new(db, cs, Default::default()) + .await + .unwrap(); +} diff --git a/src/chain/store/mod.rs b/src/chain/store/mod.rs index 58ca468ac366..0713f473cc35 100644 --- a/src/chain/store/mod.rs +++ b/src/chain/store/mod.rs @@ -5,6 +5,7 @@ pub mod base_fee; mod chain_store; mod errors; pub mod index; +pub mod indexer; mod tipset_tracker; pub use self::{base_fee::*, chain_store::*, errors::*}; diff --git a/src/daemon/context.rs b/src/daemon/context.rs index f6a60538967d..de52fee0fa8e 100644 --- a/src/daemon/context.rs +++ b/src/daemon/context.rs @@ -3,6 +3,7 @@ use crate::auth::{ADMIN, create_token, generate_priv_key}; use crate::chain::ChainStore; +use crate::chain::indexer::{SqliteIndexer, SqliteIndexerOptions}; use crate::cli_shared::chain_path; use crate::cli_shared::cli::CliOpts; use crate::daemon::asyncify; @@ -11,13 +12,16 @@ use crate::daemon::db_util::load_all_forest_cars_with_cleanup; use crate::db::car::ManyCar; use crate::db::db_engine::{db_root, open_db}; use crate::db::parity_db::ParityDb; -use crate::db::{CAR_DB_DIR_NAME, DummyStore, EthMappingsStore}; +use crate::db::{ + CAR_DB_DIR_NAME, DummyStore, EthMappingsStore, INDEX_DB_DIR_NAME, INDEX_DB_FILE_NAME, +}; use crate::genesis::read_genesis_header; +use crate::interpreter::VMTrace; use crate::libp2p::{Keypair, PeerId}; use crate::networks::ChainConfig; use crate::rpc::sync::SnapshotProgressTracker; -use crate::shim::address::CurrentNetwork; -use crate::state_manager::StateManager; +use crate::shim::address::{Address, CurrentNetwork}; +use crate::state_manager::{NO_CALLBACK, StateManager}; use crate::{ Config, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, JWT_IDENTIFIER, KeyStore, KeyStoreConfig, @@ -27,7 +31,7 @@ use dialoguer::console::Term; use fvm_shared4::address::Network; use parking_lot::RwLock; use std::cell::RefCell; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use tracing::{info, warn}; @@ -37,6 +41,7 @@ pub struct AppContext { pub db: Arc, pub db_meta_data: DbMetadata, pub state_manager: Arc>, + pub chain_indexer: Option>>, pub keystore: Arc>, pub admin_jwt: String, pub snapshot_progress_tracker: SnapshotProgressTracker, @@ -50,12 +55,53 @@ impl AppContext { let state_manager = create_state_manager(cfg, &db, &chain_cfg).await?; let (keystore, admin_jwt) = load_or_create_keystore_and_configure_jwt(opts, cfg).await?; let snapshot_progress_tracker = SnapshotProgressTracker::default(); + let chain_indexer = if cfg.chain_indexer.enable_indexer { + Some(Arc::new( + SqliteIndexer::new( + crate::utils::sqlite::open_file(db_meta_data.index_db_path()).await?, + state_manager.chain_store().clone(), + SqliteIndexerOptions::default().with_gc_retention_epochs( + cfg.chain_indexer.gc_retention_epochs.unwrap_or_default() as _, + ), + ) + .await? + .with_actor_to_delegated_address_func(Arc::new({ + let state_manager = state_manager.clone(); + move |actor_id, ts| { + let id_addr = Address::new_id(actor_id); + Ok( + match state_manager.get_required_actor(&id_addr, *ts.parent_state()) { + Ok(actor) => actor + .delegated_address + .map(Address::from) + .unwrap_or(id_addr), + Err(_) => id_addr, + }, + ) + } + })) + .with_recompute_tipset_state_func(Arc::new({ + let state_manager = state_manager.clone(); + move |ts| { + state_manager.compute_tipset_state_blocking( + ts, + NO_CALLBACK, + VMTrace::NotTraced, + )?; + Ok(()) + } + })), + )) + } else { + None + }; Ok(Self { net_keypair, p2p_peer_id, db, db_meta_data, state_manager, + chain_indexer, keystore, admin_jwt, snapshot_progress_tracker, @@ -184,15 +230,20 @@ pub type DbType = ManyCar>; pub(crate) struct DbMetadata { db_root_dir: PathBuf, forest_car_db_dir: PathBuf, + index_db_path: PathBuf, } impl DbMetadata { - pub(crate) fn get_root_dir(&self) -> PathBuf { - self.db_root_dir.clone() + pub(crate) fn root_dir(&self) -> &Path { + &self.db_root_dir } - pub(crate) fn get_forest_car_db_dir(&self) -> PathBuf { - self.forest_car_db_dir.clone() + pub(crate) fn forest_car_db_dir(&self) -> &Path { + &self.forest_car_db_dir + } + + pub(crate) fn index_db_path(&self) -> &Path { + &self.index_db_path } } @@ -201,6 +252,7 @@ impl DbMetadata { /// - load parity-db /// - load CAR database /// - load actor bundles +/// - setup index db folder and file async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc, DbMetadata)> { maybe_migrate_db(config); let chain_data_path = chain_path(config); @@ -209,6 +261,11 @@ async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc anyhow::Result<(Arc( }) } +pub enum ChainValidateIndex {} +impl RpcMethod<2> for ChainValidateIndex { + const NAME: &'static str = "Filecoin.ChainValidateIndex"; + const PARAM_NAMES: [&'static str; 2] = ["epoch", "backfill"]; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Write; + + type Params = (ChainEpoch, bool); + type Ok = ChainIndexValidation; + + async fn handle( + ctx: Ctx, + (epoch, backfill): Self::Params, + _: &http::Extensions, + ) -> Result { + if let Some(ci) = &ctx.chain_indexer { + Ok(ci.validate_index(epoch, backfill).await?) + } else { + Err(anyhow::anyhow!("chain indexer is disabled").into()) + } + } +} + pub enum ChainGetMessage {} impl RpcMethod<1> for ChainGetMessage { const NAME: &'static str = "Filecoin.ChainGetMessage"; diff --git a/src/rpc/methods/chain/types.rs b/src/rpc/methods/chain/types.rs index 54b728981ea0..0162a57b36f0 100644 --- a/src/rpc/methods/chain/types.rs +++ b/src/rpc/methods/chain/types.rs @@ -10,3 +10,25 @@ pub struct ObjStat { pub links: usize, } lotus_json_with_self!(ObjStat); + +#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq, Default)] +#[serde(rename_all = "PascalCase")] +pub struct ChainIndexValidation { + /// the key of the canonical tipset for this epoch + #[serde(with = "crate::lotus_json")] + #[schemars(with = "LotusJson")] + pub tip_set_key: ApiTipsetKey, + /// the epoch height at which the validation is performed. + pub height: ChainEpoch, + /// the number of indexed messages for the canonical tipset at this epoch + pub indexed_messages_count: u64, + /// the number of indexed events for the canonical tipset at this epoch + pub indexed_events_count: u64, + /// the number of indexed event entries for the canonical tipset at this epoch + pub indexed_event_entries_count: u64, + /// whether missing data was successfully backfilled into the index during validation + pub backfilled: bool, + /// if the epoch corresponds to a null round and therefore does not have any indexed messages or events + pub is_null_round: bool, +} +lotus_json_with_self!(ChainIndexValidation); diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4e7da8d7be18..ad57606c7388 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -238,6 +238,7 @@ mod tests { state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())), mpool: Arc::new(pool), + chain_indexer: Default::default(), bad_blocks: Some(Default::default()), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d759728fbbd5..bf7c41459d8d 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -92,6 +92,7 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::chain::ForestChainExportStatus); $callback!($crate::rpc::chain::ForestChainExportCancel); $callback!($crate::rpc::chain::ChainGetTipsetByParentState); + $callback!($crate::rpc::chain::ChainValidateIndex); // common vertical $callback!($crate::rpc::common::Session); @@ -472,6 +473,7 @@ pub struct RPCState { pub keystore: Arc>, pub state_manager: Arc>, pub mpool: Arc>>, + pub chain_indexer: Option>>, pub bad_blocks: Option>, pub msgs_in_tipset: Arc, pub sync_status: crate::chain_sync::SyncStatus, diff --git a/src/shim/address.rs b/src/shim/address.rs index 503fb5abcc3d..904576132295 100644 --- a/src/shim/address.rs +++ b/src/shim/address.rs @@ -174,6 +174,10 @@ impl Address { self.0.protocol() } + pub fn is_delegated(&self) -> bool { + self.protocol() == Protocol::Delegated + } + pub fn into_payload(self) -> Payload { self.0.into_payload() } diff --git a/src/shim/mod.rs b/src/shim/mod.rs index 346bfb8b5572..99c65227356c 100644 --- a/src/shim/mod.rs +++ b/src/shim/mod.rs @@ -37,3 +37,5 @@ pub mod fvm_latest { } pub type MethodNum = fvm_shared_latest::MethodNum; + +pub type ActorID = fvm_shared_latest::ActorID; diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 84b6a110d877..92878d1f032b 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -120,6 +120,7 @@ where state_manager, keystore: Arc::new(RwLock::new(keystore)), mpool: Arc::new(message_pool), + chain_indexer: Default::default(), bad_blocks: Default::default(), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), diff --git a/src/tool/subcommands/api_cmd/api_compare_tests.rs b/src/tool/subcommands/api_cmd/api_compare_tests.rs index 869365dfd887..03f7701674d0 100644 --- a/src/tool/subcommands/api_cmd/api_compare_tests.rs +++ b/src/tool/subcommands/api_cmd/api_compare_tests.rs @@ -488,6 +488,7 @@ fn chain_tests_with_tipset( tipset: &Tipset, ) -> anyhow::Result> { let mut tests = vec![ + RpcTest::identity(ChainValidateIndex::request((tipset.epoch(), true))?), RpcTest::identity(ChainGetTipSetByHeight::request(( tipset.epoch(), Default::default(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 6cedaef505e7..941b0135ee70 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -147,6 +147,7 @@ async fn ctx( state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: Arc::new(message_pool), + chain_indexer: Default::default(), bad_blocks: Default::default(), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 86feb82e3ec8..e7addcbdca1b 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -165,6 +165,7 @@ async fn ctx( state_manager, keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: Arc::new(message_pool), + chain_indexer: Default::default(), bad_blocks: Default::default(), msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d724536e4c20..9e55375b23a3 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -15,7 +15,6 @@ pub mod p2p; pub mod proofs_api; pub mod rand; pub mod reqwest_resume; -#[cfg(feature = "sqlite")] pub mod sqlite; pub mod stats; pub mod stream; diff --git a/src/utils/sqlite/mod.rs b/src/utils/sqlite/mod.rs index 0a252649d291..0118bc4ddbe3 100644 --- a/src/utils/sqlite/mod.rs +++ b/src/utils/sqlite/mod.rs @@ -34,16 +34,6 @@ pub async fn open_file(file: &Path) -> anyhow::Result { Ok(open(options).await?) } -/// Opens for creates an in-memory database -pub async fn open_memory() -> sqlx::Result { - open( - SqliteConnectOptions::new() - .in_memory(true) - .shared_cache(true), - ) - .await -} - /// Opens a database at the given path. If the database does not exist, it will be created. pub async fn open(options: SqliteConnectOptions) -> sqlx::Result { let options = options @@ -54,7 +44,8 @@ pub async fn open(options: SqliteConnectOptions) -> sqlx::Result { .journal_mode(SqliteJournalMode::Wal) .pragma("journal_size_limit", "0") // always reset journal and wal files .foreign_keys(true) - .read_only(false); + .read_only(false) + .create_if_missing(true); SqlitePool::connect_with(options).await } @@ -89,13 +80,15 @@ pub async fn init_db<'q>( tx.commit().await }; - if sqlx::query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") + if sqlx::query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta'") .fetch_optional(db) .await .map_err(|e| anyhow::anyhow!("error looking for {name} database _meta table: {e}"))? .is_none() { - init(db, schema_version).await?; + init(db, schema_version).await.map_err(|e| { + anyhow::anyhow!("failed to initialize db version {schema_version}: {e}") + })?; } let found_version: u64 = sqlx::query_scalar("SELECT max(version) FROM _meta") From cf02a814dac3f6a874faae8f45bde8e55590d737 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 11 Mar 2026 23:50:46 +0800 Subject: [PATCH 02/20] fix spellcheck --- src/chain/store/indexer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs index b987b6ed0f08..3ec61583a53b 100644 --- a/src/chain/store/indexer.rs +++ b/src/chain/store/indexer.rs @@ -367,7 +367,7 @@ where } /// verifies that the indexed data for a tipset is correct by comparing the number of messages and events - /// in the chainstore to the number of messages and events indexed. + /// in the chain store to the number of messages and events indexed. async fn verify_indexed_data( &self, ts: &Tipset, From 0a5f498a4dfb16d0b09d5cd5af0a35a621a77dfe Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 17:39:11 +0800 Subject: [PATCH 03/20] cli tool --- scripts/tests/api_compare/docker-compose.yml | 6 +- src/cli/subcommands/index_cmd.rs | 92 ++++++++++++++++++++ src/cli/subcommands/mod.rs | 13 ++- 3 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 src/cli/subcommands/index_cmd.rs diff --git a/scripts/tests/api_compare/docker-compose.yml b/scripts/tests/api_compare/docker-compose.yml index 3221ce40ae84..4af1a22df317 100644 --- a/scripts/tests/api_compare/docker-compose.yml +++ b/scripts/tests/api_compare/docker-compose.yml @@ -58,7 +58,7 @@ services: --halt-after-import SNAPSHOT_EPOCH="$(ls /data/*.car.zst | tail -n 1 | grep -Eo '[0-9]+' | tail -n 1)" - # backfill the index db + # backfill the index db (nosql) forest-tool index backfill --from $$SNAPSHOT_EPOCH --n-tipsets 200 --chain ${CHAIN} forest --chain ${CHAIN} --encrypt-keystore false --no-gc \ @@ -90,6 +90,10 @@ services: export FULLNODE_API_INFO="$(cat /data/forest-token):/dns/forest/tcp/${FOREST_RPC_PORT}/http" echo "Waiting till Forest is ready" forest-cli healthcheck ready --healthcheck-port ${FOREST_HEALTHZ_RPC_PORT} --wait + + # backfill the index db (sql) + SNAPSHOT_EPOCH="$(ls /data/*.car.zst | tail -n 1 | grep -Eo '[0-9]+' | tail -n 1)" + forest-cli index validate-backfill --from $$SNAPSHOT_EPOCH --to $(($$SNAPSHOT_EPOCH - 200)) forest-wallet-import: depends_on: forest: diff --git a/src/cli/subcommands/index_cmd.rs b/src/cli/subcommands/index_cmd.rs new file mode 100644 index 000000000000..05bc2c27fb38 --- /dev/null +++ b/src/cli/subcommands/index_cmd.rs @@ -0,0 +1,92 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::{ + rpc::{ + self, RpcMethodExt as _, + chain::{ChainHead, ChainValidateIndex}, + }, + shim::clock::{ChainEpoch, EPOCHS_IN_DAY}, +}; +use clap::Subcommand; +use std::time::Instant; + +/// Manage the chain index +#[derive(Debug, Subcommand)] +pub enum IndexCommands { + /// validates the chain index entries for each epoch in descending order in the specified range, checking for missing or + /// inconsistent entries (i.e. the indexed data does not match the actual chain state). If '--backfill' is enabled + /// (which it is by default), it will attempt to backfill any missing entries using the `ChainValidateIndex` API. + ValidateBackfill { + /// specifies the starting tipset epoch for validation (inclusive) + #[arg(long, required = true)] + from: ChainEpoch, + /// specifies the ending tipset epoch for validation (inclusive) + #[arg(long, required = true)] + to: ChainEpoch, + /// determines whether to backfill missing index entries during validation + #[arg(long, default_value_t = true)] + backfill: bool, + }, +} + +impl IndexCommands { + pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> { + match self { + Self::ValidateBackfill { from, to, backfill } => { + validate_backfill(&client, from, to, backfill).await + } + } + } +} + +async fn validate_backfill( + client: &rpc::Client, + from: ChainEpoch, + to: ChainEpoch, + backfill: bool, +) -> anyhow::Result<()> { + anyhow::ensure!( + from > 0, + "invalid from epoch: {from}, must be greater than 0" + ); + anyhow::ensure!(to > 0, "invalid to epoch: {to}, must be greater than 0"); + anyhow::ensure!( + to <= from, + "to epoch ({to}) must be less than or equal to from epoch ({from})" + ); + let head = ChainHead::call(client, ()).await?; + anyhow::ensure!( + from < head.epoch(), + "from epoch ({from}) must be less than chain head ({})", + head.epoch() + ); + let start = Instant::now(); + tracing::info!( + "starting chainindex validation; from epoch: {from}; to epoch: {to}; backfill: {backfill};" + ); + let mut backfills = 0; + let mut null_rounds = 0; + let mut validations = 0; + for epoch in (to..=from).rev() { + match ChainValidateIndex::call(client, (epoch, backfill)).await { + Ok(r) => { + if r.backfilled { + backfills += 1; + } else if r.is_null_round { + null_rounds += 1; + } else { + validations += 1; + } + } + Err(e) => { + tracing::warn!("Failed to validate index at epoch {epoch}: {e}"); + } + } + } + tracing::info!( + "done with {backfills} backfills, {null_rounds} null rounds, {validations} validations, took {}", + humantime::format_duration(start.elapsed()) + ); + Ok(()) +} diff --git a/src/cli/subcommands/mod.rs b/src/cli/subcommands/mod.rs index 29546f14b625..48dd6a2dbe2d 100644 --- a/src/cli/subcommands/mod.rs +++ b/src/cli/subcommands/mod.rs @@ -11,6 +11,7 @@ mod chain_cmd; mod config_cmd; mod f3_cmd; mod healthcheck_cmd; +mod index_cmd; mod info_cmd; mod mpool_cmd; mod net_cmd; @@ -22,9 +23,10 @@ mod wait_api_cmd; pub(super) use self::{ auth_cmd::AuthCommands, chain_cmd::ChainCommands, config_cmd::ConfigCommands, - f3_cmd::F3Commands, healthcheck_cmd::HealthcheckCommand, mpool_cmd::MpoolCommands, - net_cmd::NetCommands, shutdown_cmd::ShutdownCommand, snapshot_cmd::SnapshotCommands, - state_cmd::StateCommands, sync_cmd::SyncCommands, wait_api_cmd::WaitApiCommand, + f3_cmd::F3Commands, healthcheck_cmd::HealthcheckCommand, index_cmd::IndexCommands, + mpool_cmd::MpoolCommands, net_cmd::NetCommands, shutdown_cmd::ShutdownCommand, + snapshot_cmd::SnapshotCommands, state_cmd::StateCommands, sync_cmd::SyncCommands, + wait_api_cmd::WaitApiCommand, }; use crate::cli::subcommands::info_cmd::InfoCommand; pub(crate) use crate::cli_shared::cli::Config; @@ -95,12 +97,15 @@ pub enum Subcommand { #[command(subcommand)] Healthcheck(HealthcheckCommand), - /// Manages Filecoin Fast Finality (F3) interactions + /// Manage Filecoin Fast Finality (F3) interactions #[command(subcommand)] F3(F3Commands), /// Wait for lotus API to come online WaitApi(WaitApiCommand), + + #[command(subcommand)] + Index(IndexCommands), } impl Subcommand { From 3888ede959a908e9b0c19169506762eb819cef35 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 20:29:48 +0800 Subject: [PATCH 04/20] fix: broadcast HeadChange::Revert(Tipset) in set_heaviest_tipset --- src/chain/store/chain_store.rs | 38 ++++++++++++++++++++-------- src/daemon/mod.rs | 14 +++++----- src/message_pool/msgpool/msg_pool.rs | 17 ++++++------- src/rpc/methods/chain.rs | 33 ++++++++++++------------ src/state_manager/mod.rs | 3 ++- 5 files changed, 60 insertions(+), 45 deletions(-) diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index fb18955ed0f5..0a732758d31e 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -6,7 +6,6 @@ use super::{ index::{ChainIndex, ResolveNullTipset}, tipset_tracker::TipsetTracker, }; -use crate::db::{EthMappingsStore, EthMappingsStoreExt}; use crate::interpreter::{BlockMessages, VMTrace}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; @@ -23,6 +22,10 @@ use crate::{ blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta}, db::HeaviestTipsetKeyProvider, }; +use crate::{ + db::{EthMappingsStore, EthMappingsStoreExt}, + rpc::chain::PathChange, +}; use crate::{fil_cns, utils::cache::SizeTrackingLruCache}; use ahash::{HashMap, HashMapExt, HashSet}; use anyhow::Context as _; @@ -47,10 +50,7 @@ pub type ChainEpochDelta = ChainEpoch; /// `Enum` for `pubsub` channel that defines message type variant and data /// contained in message type. -#[derive(Clone, Debug)] -pub enum HeadChange { - Apply(Tipset), -} +pub type HeadChange = PathChange; /// Stores chain data such as heaviest tipset and cached tipset info at each /// epoch. This structure is thread-safe, and all caches are wrapped in a mutex @@ -142,14 +142,30 @@ where } /// Sets heaviest tipset - pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> { + pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> { self.heaviest_tipset_key_provider - .set_heaviest_tipset_key(ts.key())?; - *self.heaviest_tipset_cache.write() = Some(ts.clone()); - ts.key().save(self.blockstore())?; - if self.publisher.send(HeadChange::Apply(ts)).is_err() { - debug!("did not publish head change, no active receivers"); + .set_heaviest_tipset_key(head.key())?; + let old_head = (*self.heaviest_tipset_cache.write()).replace(head.clone()); + head.key().save(self.blockstore())?; + if let Some(old_head) = old_head { + match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { + Ok(changes) => { + for change in changes { + if self.publisher.send(change).is_err() { + debug!("did not publish change, no active receivers"); + } + } + } + Err(e) => { + warn!("failed to get chain path changes: {e}") + } + } + } else { + if self.publisher.send(HeadChange::Apply(head)).is_err() { + debug!("did not publish head change, no active receivers"); + } } + Ok(()) } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 765519caba4b..a25efa1caf34 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -505,14 +505,12 @@ fn maybe_start_indexer_service( // Continuously listen for head changes loop { - let HeadChange::Apply(ts) = receiver.recv().await?; - - tracing::debug!("Indexing tipset {}", ts.key()); - - let delegated_messages = - chain_store.headers_delegated_messages(ts.block_headers().iter())?; - - chain_store.process_signed_messages(&delegated_messages)?; + if let HeadChange::Apply(ts) = receiver.recv().await? { + tracing::debug!("Indexing tipset {}", ts.key()); + let delegated_messages = + chain_store.headers_delegated_messages(ts.block_headers().iter())?; + chain_store.process_signed_messages(&delegated_messages)?; + } } }); diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 6333df24debd..a20801af2482 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -551,18 +551,17 @@ where let pending = mp.pending.clone(); let republished = mp.republished.clone(); - let cur_tipset = mp.cur_tipset.clone(); + let current_ts = mp.cur_tipset.clone(); let repub_trigger = mp.repub_trigger.clone(); // Reacts to new HeadChanges services.spawn(async move { loop { match subscriber.recv().await { - Ok(ts) => { - let (cur, rev, app) = match ts { - HeadChange::Apply(tipset) => { - (cur_tipset.clone(), Vec::new(), vec![tipset]) - } + Ok(change) => { + let (reverts, applies) = match change { + HeadChange::Apply(ts) => (vec![], vec![ts]), + HeadChange::Revert(ts) => (vec![ts], vec![]), }; head_change( api.as_ref(), @@ -570,9 +569,9 @@ where repub_trigger.clone(), republished.as_ref(), pending.as_ref(), - cur.as_ref(), - rev, - app, + ¤t_ts, + reverts, + applies, ) .await .context("Error changing head")?; diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 136c386cecdb..6cd9145b0f7f 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -88,22 +88,20 @@ pub(crate) fn new_heads( let handle = tokio::spawn(async move { while let Ok(v) = subscriber.recv().await { - let headers = match v { - HeadChange::Apply(ts) => { - // Convert the tipset to an Ethereum block with full transaction info - // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block - match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { - Ok(block) => ApiHeaders(block), - Err(e) => { - tracing::error!("Failed to convert tipset to eth block: {}", e); - continue; + if let HeadChange::Apply(ts) = v { + // Convert the tipset to an Ethereum block with full transaction info + // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block + match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { + Ok(block) => { + if let Err(e) = sender.send(ApiHeaders(block)) { + tracing::error!("Failed to send headers: {}", e); + break; } } + Err(e) => { + tracing::error!("Failed to convert tipset to eth block: {}", e); + } } - }; - if let Err(e) = sender.send(headers) { - tracing::error!("Failed to send headers: {}", e); - break; } } }); @@ -149,6 +147,7 @@ pub(crate) fn logs( } } } + HeadChange::Revert(_) => {} } } }); @@ -850,7 +849,7 @@ impl RpcMethod<2> for ChainGetPath { (from, to): Self::Params, _: &http::Extensions, ) -> Result { - impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into) + chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into) } } @@ -871,7 +870,7 @@ impl RpcMethod<2> for ChainGetPath { /// ``` /// /// Exposes errors from the [`Blockstore`], and returns an error if there is no common ancestor. -fn impl_chain_get_path( +pub fn chain_get_path( chain_store: &ChainStore, from: &TipsetKey, to: &TipsetKey, @@ -903,6 +902,7 @@ fn impl_chain_get_path( to_apply = next; } } + Ok(all_reverts .into_iter() .map(PathChange::Revert) @@ -1332,6 +1332,7 @@ pub(crate) fn chain_notify( while let Ok(v) = subscriber.recv().await { let (change, tipset) = match v { HeadChange::Apply(ts) => ("apply".into(), ts), + HeadChange::Revert(ts) => ("revert".into(), ts), }; if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() { @@ -1791,7 +1792,7 @@ mod tests { } let actual = - impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap(); + chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap(); let expected = expected .into_iter() .map(|change| match change { diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index c042351222c5..053ef3dfe261 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -1210,7 +1210,7 @@ where let mut subscriber_poll = tokio::task::spawn(async move { loop { match subscriber.recv().await { - Ok(subscriber) => match subscriber { + Ok(head_change) => match head_change { HeadChange::Apply(tipset) => { if candidate_tipset .as_ref() @@ -1237,6 +1237,7 @@ where candidate_receipt = Some(receipt) } } + HeadChange::Revert(_) => {} }, Err(RecvError::Lagged(i)) => { warn!( From 4c5900807c0bef88877c80585f6894f5bc23aec9 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 21:00:27 +0800 Subject: [PATCH 05/20] resolve AI comments --- src/chain/store/chain_store.rs | 51 +++++++++++++++++----------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 0a732758d31e..2e936397c859 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -66,7 +66,7 @@ pub struct ChainStore { heaviest_tipset_key_provider: Arc, /// Heaviest tipset cache - heaviest_tipset_cache: Arc>>, + heaviest_tipset_cache: Arc>, /// Used as a cache for tipset `lookbacks`. chain_index: Arc>>, @@ -124,14 +124,20 @@ where let (publisher, _) = broadcast::channel(SINK_CAP); let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db))); let validated_blocks = Mutex::new(HashSet::default()); - + let head = if let Ok(head_tsk) = heaviest_tipset_key_provider.heaviest_tipset_key() + && let Ok(head) = chain_index.load_required_tipset(&head_tsk) + { + head + } else { + Tipset::from(&genesis_block_header) + }; let cs = Self { publisher, chain_index, tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, heaviest_tipset_key_provider, - heaviest_tipset_cache: Default::default(), + heaviest_tipset_cache: Arc::new(RwLock::new(head)), genesis_block_header, validated_blocks, eth_mappings, @@ -145,24 +151,26 @@ where pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> { self.heaviest_tipset_key_provider .set_heaviest_tipset_key(head.key())?; - let old_head = (*self.heaviest_tipset_cache.write()).replace(head.clone()); + let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone()); head.key().save(self.blockstore())?; - if let Some(old_head) = old_head { - match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { - Ok(changes) => { - for change in changes { - if self.publisher.send(change).is_err() { - debug!("did not publish change, no active receivers"); + + match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { + Ok(changes) => { + for change in changes { + let change_text = match &change { + HeadChange::Apply(ts) => format!("apply@{}: {}", ts.epoch(), ts.key()), + HeadChange::Revert(ts) => { + format!("revert@{}: {}", ts.epoch(), ts.key()) } + }; + tracing::info!("head change: {change_text}"); + if self.publisher.send(change).is_err() { + debug!("did not publish change, no active receivers"); } } - Err(e) => { - warn!("failed to get chain path changes: {e}") - } } - } else { - if self.publisher.send(HeadChange::Apply(head)).is_err() { - debug!("did not publish head change, no active receivers"); + Err(e) => { + warn!("failed to get chain path changes: {e}") } } @@ -216,16 +224,7 @@ where /// Returns the currently tracked heaviest tipset. pub fn heaviest_tipset(&self) -> Tipset { - if let Some(ts) = &*self.heaviest_tipset_cache.read() { - return ts.clone(); - } - let tsk = self - .heaviest_tipset_key_provider - .heaviest_tipset_key() - .unwrap_or_else(|_| TipsetKey::from(nunny::vec![*self.genesis_block_header.cid()])); - self.chain_index - .load_required_tipset(&tsk) - .expect("failed to load heaviest tipset") + self.heaviest_tipset_cache.read().clone() } /// Returns the genesis tipset. From b26e0c2e88a96dff37b7c6a2d55a33fb965329be Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 21:34:04 +0800 Subject: [PATCH 06/20] resolve comments --- src/chain/store/chain_store.rs | 28 +++--- src/daemon/mod.rs | 3 +- src/message_pool/msgpool/msg_pool.rs | 8 +- src/message_pool/msgpool/provider.rs | 8 +- src/message_pool/msgpool/test_provider.rs | 13 ++- src/rpc/methods/chain.rs | 106 +++++++++++++--------- src/state_manager/mod.rs | 9 +- 7 files changed, 95 insertions(+), 80 deletions(-) diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 2e936397c859..7b8d94eaef1f 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -6,7 +6,6 @@ use super::{ index::{ChainIndex, ResolveNullTipset}, tipset_tracker::TipsetTracker, }; -use crate::interpreter::{BlockMessages, VMTrace}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; use crate::networks::{ChainConfig, Height}; @@ -27,6 +26,10 @@ use crate::{ rpc::chain::PathChange, }; use crate::{fil_cns, utils::cache::SizeTrackingLruCache}; +use crate::{ + interpreter::{BlockMessages, VMTrace}, + rpc::chain::PathChanges, +}; use ahash::{HashMap, HashMapExt, HashSet}; use anyhow::Context as _; use cid::Cid; @@ -52,12 +55,14 @@ pub type ChainEpochDelta = ChainEpoch; /// contained in message type. pub type HeadChange = PathChange; +pub type HeadChanges = PathChanges; + /// Stores chain data such as heaviest tipset and cached tipset info at each /// epoch. This structure is thread-safe, and all caches are wrapped in a mutex /// to allow a consistent `ChainStore` to be shared across tasks. pub struct ChainStore { /// Publisher for head change events - publisher: Publisher, + publisher: Publisher, /// key-value `datastore`. db: Arc, @@ -125,7 +130,9 @@ where let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db))); let validated_blocks = Mutex::new(HashSet::default()); let head = if let Ok(head_tsk) = heaviest_tipset_key_provider.heaviest_tipset_key() - && let Ok(head) = chain_index.load_required_tipset(&head_tsk) + && let Some(head) = chain_index + .load_tipset(&head_tsk) + .context("failed to load head tipset")? { head } else { @@ -156,17 +163,8 @@ where match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { Ok(changes) => { - for change in changes { - let change_text = match &change { - HeadChange::Apply(ts) => format!("apply@{}: {}", ts.epoch(), ts.key()), - HeadChange::Revert(ts) => { - format!("revert@{}: {}", ts.epoch(), ts.key()) - } - }; - tracing::info!("head change: {change_text}"); - if self.publisher.send(change).is_err() { - debug!("did not publish change, no active receivers"); - } + if self.publisher.send(changes).is_err() { + debug!("did not publish changes, no active receivers"); } } Err(e) => { @@ -233,7 +231,7 @@ where } /// Returns a reference to the publisher of head changes. - pub fn publisher(&self) -> &Publisher { + pub fn publisher(&self) -> &Publisher { &self.publisher } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index a25efa1caf34..339441bcc20f 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -7,7 +7,6 @@ pub mod db_util; pub mod main; use crate::blocks::Tipset; -use crate::chain::HeadChange; use crate::chain::index::ResolveNullTipset; use crate::chain_sync::network_context::SyncNetworkContext; use crate::chain_sync::{ChainFollower, SyncStatus}; @@ -505,7 +504,7 @@ fn maybe_start_indexer_service( // Continuously listen for head changes loop { - if let HeadChange::Apply(ts) = receiver.recv().await? { + for ts in receiver.recv().await?.applies { tracing::debug!("Indexing tipset {}", ts.key()); let delegated_messages = chain_store.headers_delegated_messages(ts.block_headers().iter())?; diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index a20801af2482..6371fc16c342 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -9,7 +9,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use crate::blocks::{CachingBlockHeader, Tipset}; -use crate::chain::{HeadChange, MINIMUM_BASE_FEE}; +use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; #[cfg(test)] use crate::db::SettingsStore; use crate::eth::is_valid_eth_tx_for_sending; @@ -558,11 +558,7 @@ where services.spawn(async move { loop { match subscriber.recv().await { - Ok(change) => { - let (reverts, applies) = match change { - HeadChange::Apply(ts) => (vec![], vec![ts]), - HeadChange::Revert(ts) => (vec![ts], vec![]), - }; + Ok(HeadChanges { reverts, applies }) => { head_change( api.as_ref(), bls_sig_cache.as_ref(), diff --git a/src/message_pool/msgpool/provider.rs b/src/message_pool/msgpool/provider.rs index a685810344dc..8180b71d19eb 100644 --- a/src/message_pool/msgpool/provider.rs +++ b/src/message_pool/msgpool/provider.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; -use crate::chain::HeadChange; +use crate::chain::HeadChanges; use crate::message::{ChainMessage, SignedMessage}; use crate::message_pool::msg_pool::{ MAX_ACTOR_PENDING_MESSAGES, MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES, @@ -31,7 +31,7 @@ use crate::message_pool::errors::Error; #[async_trait] pub trait Provider { /// Update `Mpool`'s `cur_tipset` whenever there is a change to the provider - fn subscribe_head_changes(&self) -> Subscriber; + fn subscribe_head_changes(&self) -> Subscriber; /// Get the heaviest Tipset in the provider fn get_heaviest_tipset(&self) -> Tipset; /// Add a message to the `MpoolProvider`, return either Cid or Error @@ -64,7 +64,7 @@ pub trait Provider { /// `mpool` RPC. #[derive(derive_more::Constructor)] pub struct MpoolRpcProvider { - subscriber: Publisher, + subscriber: Publisher, sm: Arc>, } @@ -73,7 +73,7 @@ impl Provider for MpoolRpcProvider where DB: Blockstore + Sync + Send + 'static, { - fn subscribe_head_changes(&self) -> Subscriber { + fn subscribe_head_changes(&self) -> Subscriber { self.subscriber.subscribe() } diff --git a/src/message_pool/msgpool/test_provider.rs b/src/message_pool/msgpool/test_provider.rs index c00f7be60b33..0e18816deabf 100644 --- a/src/message_pool/msgpool/test_provider.rs +++ b/src/message_pool/msgpool/test_provider.rs @@ -8,7 +8,7 @@ use std::convert::TryFrom; use crate::blocks::{ CachingBlockHeader, ElectionProof, RawBlockHeader, Ticket, Tipset, TipsetKey, VRFProof, }; -use crate::chain::HeadChange; +use crate::chain::HeadChanges; use crate::cid_collections::CidHashMap; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; use crate::message_pool::{Error, provider::Provider}; @@ -25,7 +25,7 @@ use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher}; /// pool pub struct TestApi { pub inner: Mutex, - pub publisher: Publisher, + pub publisher: Publisher, } #[derive(Default)] @@ -81,7 +81,12 @@ impl TestApi { /// Set the heaviest tipset for `TestApi` pub fn set_heaviest_tipset(&self, ts: Tipset) { - self.publisher.send(HeadChange::Apply(ts)).unwrap(); + self.publisher + .send(HeadChanges { + applies: vec![ts], + reverts: vec![], + }) + .unwrap(); } pub fn next_block(&self) -> CachingBlockHeader { @@ -119,7 +124,7 @@ impl TestApiInner { #[async_trait] impl Provider for TestApi { - fn subscribe_head_changes(&self) -> Subscriber { + fn subscribe_head_changes(&self) -> Subscriber { self.publisher.subscribe() } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 6cd9145b0f7f..61152c4fc8ae 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -87,8 +87,8 @@ pub(crate) fn new_heads( let mut subscriber = data.chain_store().publisher().subscribe(); let handle = tokio::spawn(async move { - while let Ok(v) = subscriber.recv().await { - if let HeadChange::Apply(ts) = v { + while let Ok(changes) = subscriber.recv().await { + for ts in changes.applies { // Convert the tipset to an Ethereum block with full transaction info // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { @@ -126,28 +126,21 @@ pub(crate) fn logs( let ctx = ctx.clone(); let handle = tokio::spawn(async move { - while let Ok(v) = subscriber.recv().await { - match v { - HeadChange::Apply(ts) => { - match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { - Ok(logs) => { - if !logs.is_empty() - && let Err(e) = sender.send(logs) - { - tracing::error!( - "Failed to send logs for tipset {}: {}", - ts.key(), - e - ); - break; - } - } - Err(e) => { - tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); + while let Ok(changes) = subscriber.recv().await { + for ts in changes.applies { + match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { + Ok(logs) => { + if !logs.is_empty() + && let Err(e) = sender.send(logs) + { + tracing::error!("Failed to send logs for tipset {}: {}", ts.key(), e); + break; } } + Err(e) => { + tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); + } } - HeadChange::Revert(_) => {} } } }); @@ -849,11 +842,11 @@ impl RpcMethod<2> for ChainGetPath { (from, to): Self::Params, _: &http::Extensions, ) -> Result { - chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into) + Ok(chain_get_path(ctx.chain_store(), &from, &to)?.into_change_vec()) } } -/// Find the path between two tipsets, as a series of [`PathChange`]s. +/// Find the path between two tipsets, as [`PathChanges`]. /// /// ```text /// 0 - A - B - C - D @@ -874,7 +867,7 @@ pub fn chain_get_path( chain_store: &ChainStore, from: &TipsetKey, to: &TipsetKey, -) -> anyhow::Result> { +) -> anyhow::Result { let mut to_revert = chain_store .load_required_tipset_or_heaviest(from) .context("couldn't load `from`")?; @@ -882,8 +875,8 @@ pub fn chain_get_path( .load_required_tipset_or_heaviest(to) .context("couldn't load `to`")?; - let mut all_reverts = vec![]; - let mut all_applies = vec![]; + let mut reverts = vec![]; + let mut applies = vec![]; // This loop is guaranteed to terminate if the blockstore contain no cycles. // This is currently computationally infeasible. @@ -892,22 +885,18 @@ pub fn chain_get_path( let next = chain_store .load_required_tipset_or_heaviest(to_revert.parents()) .context("couldn't load ancestor of `from`")?; - all_reverts.push(to_revert); + reverts.push(to_revert); to_revert = next; } else { let next = chain_store .load_required_tipset_or_heaviest(to_apply.parents()) .context("couldn't load ancestor of `to`")?; - all_applies.push(to_apply); + applies.push(to_apply); to_apply = next; } } - - Ok(all_reverts - .into_iter() - .map(PathChange::Revert) - .chain(all_applies.into_iter().rev().map(PathChange::Apply)) - .collect()) + applies.reverse(); + Ok(PathChanges { reverts, applies }) } /// Get tipset at epoch. Pick younger tipset if epoch points to a @@ -1329,14 +1318,15 @@ pub(crate) fn chain_notify( // Skip first message let _ = subscriber.recv().await; - while let Ok(v) = subscriber.recv().await { - let (change, tipset) = match v { - HeadChange::Apply(ts) => ("apply".into(), ts), - HeadChange::Revert(ts) => ("revert".into(), ts), - }; - - if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() { - break; + while let Ok(changes) = subscriber.recv().await { + for change in changes.into_change_vec() { + let (change, tipset) = match change { + HeadChange::Apply(ts) => ("apply".into(), ts), + HeadChange::Revert(ts) => ("revert".into(), ts), + }; + if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() { + break; + } } } }); @@ -1564,6 +1554,33 @@ impl HasLotusJson for PathChange { } } +#[derive(Debug)] +pub struct PathChanges { + pub reverts: Vec, + pub applies: Vec, +} + +impl Clone for PathChanges { + fn clone(&self) -> Self { + let Self { reverts, applies } = self; + Self { + reverts: reverts.clone(), + applies: applies.clone(), + } + } +} + +impl PathChanges { + pub fn into_change_vec(self) -> Vec> { + let Self { reverts, applies } = self; + reverts + .into_iter() + .map(PathChange::Revert) + .chain(applies.into_iter().map(PathChange::Apply)) + .collect() + } +} + #[cfg(test)] impl quickcheck::Arbitrary for PathChange where @@ -1791,8 +1808,9 @@ mod tests { ) } - let actual = - chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap(); + let actual = chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()) + .unwrap() + .into_change_vec(); let expected = expected .into_iter() .map(|change| match change { diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 053ef3dfe261..0333cfaf31a5 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -16,7 +16,7 @@ use self::utils::structured; use crate::beacon::{BeaconEntry, BeaconSchedule}; use crate::blocks::{Tipset, TipsetKey}; use crate::chain::{ - ChainStore, HeadChange, + ChainStore, index::{ChainIndex, ResolveNullTipset}, }; use crate::interpreter::{ @@ -1210,8 +1210,8 @@ where let mut subscriber_poll = tokio::task::spawn(async move { loop { match subscriber.recv().await { - Ok(head_change) => match head_change { - HeadChange::Apply(tipset) => { + Ok(head_changes) => { + for tipset in head_changes.applies { if candidate_tipset .as_ref() .map(|s| tipset.epoch() >= s.epoch() + confidence) @@ -1237,8 +1237,7 @@ where candidate_receipt = Some(receipt) } } - HeadChange::Revert(_) => {} - }, + } Err(RecvError::Lagged(i)) => { warn!( "wait for message head change subscriber lagged, skipped {} events", From a7581831bc544aa86a5375977138de7eefcd5259 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 21:44:17 +0800 Subject: [PATCH 07/20] resolve comments --- src/chain/store/chain_store.rs | 6 ++++-- src/db/car/many.rs | 12 ++++++------ src/db/gc/snapshot.rs | 2 +- src/db/memory.rs | 5 ++--- src/db/mod.rs | 4 ++-- src/db/parity_db.rs | 5 ++--- .../subcommands/api_cmd/generate_test_snapshot.rs | 2 +- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 7b8d94eaef1f..1218e23e1687 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -129,7 +129,9 @@ where let (publisher, _) = broadcast::channel(SINK_CAP); let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db))); let validated_blocks = Mutex::new(HashSet::default()); - let head = if let Ok(head_tsk) = heaviest_tipset_key_provider.heaviest_tipset_key() + let head = if let Some(head_tsk) = heaviest_tipset_key_provider + .heaviest_tipset_key() + .context("failed to load head tipset key")? && let Some(head) = chain_index .load_tipset(&head_tsk) .context("failed to load head tipset")? @@ -156,10 +158,10 @@ where /// Sets heaviest tipset pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> { + head.key().save(self.blockstore())?; self.heaviest_tipset_key_provider .set_heaviest_tipset_key(head.key())?; let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone()); - head.key().save(self.blockstore())?; match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { Ok(changes) => { diff --git a/src/db/car/many.rs b/src/db/car/many.rs index 6b53c3ba6747..28184d03bebe 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -130,12 +130,12 @@ impl ManyCar { Ok(()) } - pub fn heaviest_tipset_key(&self) -> anyhow::Result { - self.read_only + pub fn heaviest_tipset_key(&self) -> anyhow::Result> { + Ok(self + .read_only .read() .peek() - .map(|w| AnyCar::heaviest_tipset_key(&w.car)) - .context("ManyCar store doesn't have a heaviest tipset key") + .map(|w| AnyCar::heaviest_tipset_key(&w.car))) } pub fn heaviest_tipset(&self) -> anyhow::Result { @@ -252,9 +252,9 @@ impl EthMappingsStore for ManyCar { } impl super::super::HeaviestTipsetKeyProvider for ManyCar { - fn heaviest_tipset_key(&self) -> anyhow::Result { + fn heaviest_tipset_key(&self) -> anyhow::Result> { match SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? { - Some(tsk) => Ok(tsk), + Some(tsk) => Ok(Some(tsk)), None => self.heaviest_tipset_key(), } } diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index 6417214988dc..60c53e266b63 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -282,7 +282,7 @@ where tracing::warn!("{e}"); } - *self.memory_db_head_key.write() = db.heaviest_tipset_key().ok(); + *self.memory_db_head_key.write() = db.heaviest_tipset_key()?; db.unsubscribe_write_ops(); match joinset.join_next().await { Some(Ok(map)) => { diff --git a/src/db/memory.rs b/src/db/memory.rs index 5419d06f2ff7..42a4fa70f2cb 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -154,9 +154,8 @@ impl BitswapStoreReadWrite for MemoryDB { } impl super::HeaviestTipsetKeyProvider for MemoryDB { - fn heaviest_tipset_key(&self) -> anyhow::Result { - SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? - .context("head key not found") + fn heaviest_tipset_key(&self) -> anyhow::Result> { + SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY) } fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> { diff --git a/src/db/mod.rs b/src/db/mod.rs index 48cd6d9d4ff8..4dec56e97129 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -237,14 +237,14 @@ impl PersistentStore for &Arc { pub trait HeaviestTipsetKeyProvider { /// Returns the currently tracked heaviest tipset. - fn heaviest_tipset_key(&self) -> anyhow::Result; + fn heaviest_tipset_key(&self) -> anyhow::Result>; /// Sets heaviest tipset. fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()>; } impl HeaviestTipsetKeyProvider for Arc { - fn heaviest_tipset_key(&self) -> anyhow::Result { + fn heaviest_tipset_key(&self) -> anyhow::Result> { self.as_ref().heaviest_tipset_key() } diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index 8c6970c1ef10..1d80e78482a4 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -173,9 +173,8 @@ impl SettingsStore for ParityDb { } impl super::HeaviestTipsetKeyProvider for ParityDb { - fn heaviest_tipset_key(&self) -> anyhow::Result { - super::SettingsStoreExt::read_obj::(self, super::setting_keys::HEAD_KEY)? - .context("head key not found") + fn heaviest_tipset_key(&self) -> anyhow::Result> { + super::SettingsStoreExt::read_obj::(self, super::setting_keys::HEAD_KEY) } fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> { diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 6cedaef505e7..9b99378cd763 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -223,7 +223,7 @@ where } impl HeaviestTipsetKeyProvider for ReadOpsTrackingStore { - fn heaviest_tipset_key(&self) -> anyhow::Result { + fn heaviest_tipset_key(&self) -> anyhow::Result> { self.inner.heaviest_tipset_key() } From b7c0d1984af57e6de920878d1faddfd0b61b4bf7 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 21:53:13 +0800 Subject: [PATCH 08/20] fix --- src/rpc/methods/chain.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 61152c4fc8ae..13e23024376f 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -95,7 +95,7 @@ pub(crate) fn new_heads( Ok(block) => { if let Err(e) = sender.send(ApiHeaders(block)) { tracing::error!("Failed to send headers: {}", e); - break; + return; } } Err(e) => { From 20df24cfe77d0cc000128bdd765a4aed979c6886 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 22:06:46 +0800 Subject: [PATCH 09/20] fix --- src/rpc/methods/chain.rs | 30 ++++++++++++++----- .../api_cmd/generate_test_snapshot.rs | 5 +++- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 13e23024376f..a7cd68fcc93f 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -1319,14 +1319,13 @@ pub(crate) fn chain_notify( let _ = subscriber.recv().await; while let Ok(changes) = subscriber.recv().await { - for change in changes.into_change_vec() { - let (change, tipset) = match change { - HeadChange::Apply(ts) => ("apply".into(), ts), - HeadChange::Revert(ts) => ("revert".into(), ts), - }; - if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() { - break; - } + let api_changes = changes + .into_change_vec() + .into_iter() + .map(From::from) + .collect(); + if sender.send(api_changes).is_err() { + break; } } }); @@ -1497,6 +1496,21 @@ pub struct ApiHeadChange { } lotus_json_with_self!(ApiHeadChange); +impl From for ApiHeadChange { + fn from(change: HeadChange) -> Self { + match change { + HeadChange::Apply(tipset) => Self { + change: "apply".into(), + tipset, + }, + HeadChange::Revert(tipset) => Self { + change: "revert".into(), + tipset, + }, + } + } +} + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "Type", content = "Val", rename_all = "snake_case")] pub enum PathChange { diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 9b99378cd763..c7a39a99b3e8 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -194,7 +194,10 @@ where SettingsStoreExt::write_obj( &self.tracker, crate::db::setting_keys::HEAD_KEY, - &self.inner.heaviest_tipset_key()?, + &self + .inner + .heaviest_tipset_key()? + .context("heaviest tipset key not found")?, )?; } From d753d31652a8bf7aed4a47a53079db68a14f75c3 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 22:17:46 +0800 Subject: [PATCH 10/20] fix --- src/rpc/methods/chain.rs | 1 - src/state_manager/mod.rs | 9 +++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index a7cd68fcc93f..42525daae444 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -1317,7 +1317,6 @@ pub(crate) fn chain_notify( tokio::spawn(async move { // Skip first message let _ = subscriber.recv().await; - while let Ok(changes) = subscriber.recv().await { let api_changes = changes .into_change_vec() diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 0333cfaf31a5..7f2c5179fdd3 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -1211,6 +1211,15 @@ where loop { match subscriber.recv().await { Ok(head_changes) => { + for tipset in head_changes.reverts { + if candidate_tipset + .as_ref() + .is_some_and(|candidate| candidate.key() == tipset.key()) + { + candidate_tipset = None; + candidate_receipt = None; + } + } for tipset in head_changes.applies { if candidate_tipset .as_ref() From c6b0c0ca92ec1c8d1e5a6e9b1c0e6483a0911b43 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 22:30:57 +0800 Subject: [PATCH 11/20] fix --- src/message_pool/msgpool/msg_pool.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 6371fc16c342..26e9736aad92 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -559,7 +559,7 @@ where loop { match subscriber.recv().await { Ok(HeadChanges { reverts, applies }) => { - head_change( + if let Err(e) = head_change( api.as_ref(), bls_sig_cache.as_ref(), repub_trigger.clone(), @@ -570,10 +570,12 @@ where applies, ) .await - .context("Error changing head")?; + { + tracing::warn!("Error changing head: {e}"); + } } Err(RecvError::Lagged(e)) => { - warn!("Head change subscriber lagged: skipping {} events", e); + warn!("Head change subscriber lagged: skipping {e} events"); } Err(RecvError::Closed) => { break Ok(()); From 843febee88a2f30bd8449601131c9725d120f066 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 12 Mar 2026 22:51:44 +0800 Subject: [PATCH 12/20] fix --- src/message_pool/msgpool/mod.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 720d39c36154..6fe48064d8bb 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -226,15 +226,25 @@ where let mut repub = false; let mut rmsgs: HashMap> = HashMap::new(); for ts in revert { - let pts = api.load_tipset(ts.parents())?; + let Ok(pts) = api.load_tipset(ts.parents()) else { + tracing::error!("error loading reverted tipset parent"); + continue; + }; *cur_tipset.write() = pts; let mut msgs: Vec = Vec::new(); for block in ts.block_headers() { - let (umsg, smsgs) = api.messages_for_block(block)?; + let Ok((umsg, smsgs)) = api.messages_for_block(block) else { + tracing::error!("error retrieving messages for reverted block"); + continue; + }; msgs.extend(smsgs); for msg in umsg { - let smsg = recover_sig(bls_sig_cache, msg)?; + let msg_cid = msg.cid(); + let Ok(smsg) = recover_sig(bls_sig_cache, msg) else { + tracing::debug!("could not recover signature for bls message {}", msg_cid); + continue; + }; msgs.push(smsg) } } @@ -246,7 +256,10 @@ where for ts in apply { for b in ts.block_headers() { - let (msgs, smsgs) = api.messages_for_block(b)?; + let Ok((msgs, smsgs)) = api.messages_for_block(b) else { + tracing::error!("error retrieving messages for block"); + continue; + }; for msg in smsgs { remove_from_selected_msgs( From 4aa7cc50a3bc8bdca90db511b114c1ca4005d9c6 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 13 Mar 2026 06:17:26 +0800 Subject: [PATCH 13/20] fix --- scripts/tests/api_compare/filter-list-gateway | 1 + src/chain/store/indexer.rs | 2 +- src/cli/subcommands/index_cmd.rs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/tests/api_compare/filter-list-gateway b/scripts/tests/api_compare/filter-list-gateway index df81739d886b..54b4fc9f582d 100644 --- a/scripts/tests/api_compare/filter-list-gateway +++ b/scripts/tests/api_compare/filter-list-gateway @@ -7,6 +7,7 @@ !Filecoin.ChainSetHead !Filecoin.ChainStatObj !Filecoin.ChainTipSetWeight +!Filecoin.ChainValidateIndex !Filecoin.EthGetBlockReceiptsLimited !Filecoin.F3 !Filecoin.GasEstimateGasLimit diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs index 3ec61583a53b..a39f36666e8f 100644 --- a/src/chain/store/indexer.rs +++ b/src/chain/store/indexer.rs @@ -361,7 +361,7 @@ where } async fn get_and_verify_indexed_data(&self, ts: &Tipset) -> anyhow::Result { - let indexed_tipset_data = self.get_indexed_tipset_data(&ts).await?; + let indexed_tipset_data = self.get_indexed_tipset_data(ts).await?; self.verify_indexed_data(ts, &indexed_tipset_data).await?; Ok(indexed_tipset_data) } diff --git a/src/cli/subcommands/index_cmd.rs b/src/cli/subcommands/index_cmd.rs index 05bc2c27fb38..d16cfadb54b0 100644 --- a/src/cli/subcommands/index_cmd.rs +++ b/src/cli/subcommands/index_cmd.rs @@ -6,7 +6,7 @@ use crate::{ self, RpcMethodExt as _, chain::{ChainHead, ChainValidateIndex}, }, - shim::clock::{ChainEpoch, EPOCHS_IN_DAY}, + shim::clock::ChainEpoch, }; use clap::Subcommand; use std::time::Instant; From 59f9c0b0024dc6f8ab65ed0b916d4d542b3fd055 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 13 Mar 2026 07:35:05 +0800 Subject: [PATCH 14/20] update filter-list-offline --- scripts/tests/api_compare/filter-list-offline | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/tests/api_compare/filter-list-offline b/scripts/tests/api_compare/filter-list-offline index 3f2dafc80728..f073808064ec 100644 --- a/scripts/tests/api_compare/filter-list-offline +++ b/scripts/tests/api_compare/filter-list-offline @@ -1,5 +1,7 @@ # This list contains potentially broken methods (or tests) that are ignored. # They should be considered bugged, and not used until the root cause is resolved. +# Indexer disabled +!Filecoin.ChainValidateIndex !Filecoin.EthSyncing !eth_syncing !Filecoin.NetAddrsListen From 279d588a8913bc2f827c0565600e043b4e8e5e8b Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 13 Mar 2026 07:13:46 +0800 Subject: [PATCH 15/20] checks for chain_get_path --- src/chain/store/chain_store.rs | 22 +++++++++++++++------- src/rpc/methods/chain.rs | 8 ++++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 1218e23e1687..d2815890a6c1 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -163,15 +163,23 @@ where .set_heaviest_tipset_key(head.key())?; let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone()); - match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { - Ok(changes) => { - if self.publisher.send(changes).is_err() { - debug!("did not publish changes, no active receivers"); - } - } + let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { + Ok(changes) => changes, Err(e) => { - warn!("failed to get chain path changes: {e}") + // Do not warn when the old head is genesis + if old_head.epoch() > 0 { + warn!("failed to get chain path changes: {e}"); + } + // Fallback to single apply + PathChanges { + applies: vec![head], + reverts: vec![], + } } + }; + + if self.publisher.send(changes).is_err() { + debug!("did not publish changes, no active receivers"); } Ok(()) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 42525daae444..34318f09f9d7 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -868,6 +868,7 @@ pub fn chain_get_path( from: &TipsetKey, to: &TipsetKey, ) -> anyhow::Result { + let finality = chain_store.chain_config().policy.chain_finality; let mut to_revert = chain_store .load_required_tipset_or_heaviest(from) .context("couldn't load `from`")?; @@ -875,6 +876,13 @@ pub fn chain_get_path( .load_required_tipset_or_heaviest(to) .context("couldn't load `to`")?; + anyhow::ensure!( + (to_apply.epoch() - to_revert.epoch()).abs() <= finality, + "the gap between the new head ({}) and the old head ({}) is larger than chain finality ({finality})", + to_apply.epoch(), + to_revert.epoch() + ); + let mut reverts = vec![]; let mut applies = vec![]; From 93510e1307604a6bd45c733e7044a4dc1b6ac0f5 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 16 Mar 2026 07:40:47 +0800 Subject: [PATCH 16/20] refactor --- Cargo.lock | 12 +++++ Cargo.toml | 1 + src/chain/store/chain_store.rs | 14 ++--- src/chain_sync/chain_follower.rs | 8 +-- src/daemon/mod.rs | 17 +++---- src/message_pool/mod.rs | 2 +- src/message_pool/msgpool/msg_pool.rs | 4 +- src/message_pool/msgpool/provider.rs | 51 ++++++------------- src/message_pool/msgpool/test_provider.rs | 17 +++---- src/rpc/methods/chain.rs | 14 ++--- src/rpc/methods/sync.rs | 9 ++-- src/rpc/mod.rs | 2 +- src/state_manager/mod.rs | 4 +- src/tool/offline_server/server.rs | 4 +- .../api_cmd/generate_test_snapshot.rs | 4 +- src/tool/subcommands/api_cmd/test_snapshot.rs | 4 +- 16 files changed, 76 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8244eb40b3fb..ae3e5903dc3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,6 +471,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "auto_impl" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -3243,6 +3254,7 @@ dependencies = [ "async-fs", "async-trait", "asynchronous-codec", + "auto_impl", "axum", "backon", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index e398bd941388..0f73a9a41369 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ async-compression = { version = "0.4", features = ["tokio", "zstd"] } async-fs = "2" async-trait = "0.1" asynchronous-codec = "0.7" +auto_impl = "1" axum = "0.8" backon = "1" base64 = "0.22" diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index d2815890a6c1..7741971e9dd7 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -41,7 +41,7 @@ use nonzero_ext::nonzero; use parking_lot::{Mutex, RwLock}; use serde::{Serialize, de::DeserializeOwned}; use std::{num::NonZeroUsize, sync::Arc}; -use tokio::sync::broadcast::{self, Sender as Publisher}; +use tokio::sync::broadcast; use tracing::{debug, trace, warn}; // A cap on the size of the future_sink @@ -62,7 +62,7 @@ pub type HeadChanges = PathChanges; /// to allow a consistent `ChainStore` to be shared across tasks. pub struct ChainStore { /// Publisher for head change events - publisher: Publisher, + head_changes_tx: broadcast::Sender, /// key-value `datastore`. db: Arc, @@ -141,7 +141,7 @@ where Tipset::from(&genesis_block_header) }; let cs = Self { - publisher, + head_changes_tx: publisher, chain_index, tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, @@ -178,7 +178,7 @@ where } }; - if self.publisher.send(changes).is_err() { + if self.head_changes_tx.send(changes).is_err() { debug!("did not publish changes, no active receivers"); } @@ -240,9 +240,9 @@ where Tipset::from(self.genesis_block_header()) } - /// Returns a reference to the publisher of head changes. - pub fn publisher(&self) -> &Publisher { - &self.publisher + /// Subscribes head changes. + pub fn subscribe_head_changes(&self) -> broadcast::Receiver { + self.head_changes_tx.subscribe() } /// Returns key-value store instance. diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index 4764046293e1..ee52cdcb677c 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -27,7 +27,7 @@ use crate::{ tipset_syncer::{TipsetSyncerError, validate_tipset}, }, libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest}, - message_pool::{MessagePool, MpoolRpcProvider}, + message_pool::MessagePool, networks::calculate_expected_epoch, shim::clock::ChainEpoch, state_manager::StateManager, @@ -78,7 +78,7 @@ pub struct ChainFollower { stateless_mode: bool, /// Message pool - mem_pool: Arc>>, + mem_pool: Arc>>>, } impl ChainFollower { @@ -88,7 +88,7 @@ impl ChainFollower { genesis: Tipset, net_handler: flume::Receiver, stateless_mode: bool, - mem_pool: Arc>>, + mem_pool: Arc>>>, ) -> Self { let (tipset_sender, tipset_receiver) = flume::bounded(20); let disable_bad_block_cache = is_env_truthy("FOREST_DISABLE_BAD_BLOCK_CACHE"); @@ -135,7 +135,7 @@ pub async fn chain_follower( network_rx: flume::Receiver, tipset_receiver: flume::Receiver, network: SyncNetworkContext, - mem_pool: Arc>>, + mem_pool: Arc>>>, sync_status: SyncStatus, genesis: Tipset, stateless_mode: bool, diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 339441bcc20f..91c0a974010e 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -7,6 +7,7 @@ pub mod db_util; pub mod main; use crate::blocks::Tipset; +use crate::chain::ChainStore; use crate::chain::index::ResolveNullTipset; use crate::chain_sync::network_context::SyncNetworkContext; use crate::chain_sync::{ChainFollower, SyncStatus}; @@ -22,7 +23,7 @@ use crate::daemon::{ use crate::db::gc::SnapshotGarbageCollector; use crate::db::ttl::EthMappingCollector; use crate::libp2p::{Libp2pService, PeerManager}; -use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; +use crate::message_pool::{MessagePool, MpoolConfig}; use crate::networks::{self, ChainConfig}; use crate::rpc::RPCState; use crate::rpc::eth::filter::EthEventHandler; @@ -292,11 +293,9 @@ fn create_mpool( services: &mut JoinSet>, p2p_service: &Libp2pService, ctx: &AppContext, -) -> anyhow::Result>>> { - let publisher = ctx.state_manager.chain_store().publisher(); - let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone()); +) -> anyhow::Result>>>> { Ok(MessagePool::new( - provider, + ctx.state_manager.chain_store().clone(), p2p_service.network_sender().clone(), MpoolConfig::load_config(ctx.db.writer().as_ref())?, ctx.state_manager.chain_config().clone(), @@ -308,7 +307,7 @@ fn create_mpool( fn create_chain_follower( opts: &CliOpts, p2p_service: &Libp2pService, - mpool: Arc>>, + mpool: Arc>>>, ctx: &AppContext, ) -> anyhow::Result> { let network_send = p2p_service.network_sender().clone(); @@ -369,7 +368,7 @@ async fn maybe_start_health_check_service( fn maybe_start_rpc_service( services: &mut JoinSet>, config: &Config, - mpool: Arc>>, + mpool: Arc>>>, chain_follower: &ChainFollower, start_time: chrono::DateTime, shutdown: mpsc::Sender<()>, @@ -497,14 +496,14 @@ fn maybe_start_indexer_service( && !opts.stateless && !ctx.state_manager.chain_config().is_devnet() { - let mut receiver = ctx.state_manager.chain_store().publisher().subscribe(); + let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes(); let chain_store = ctx.state_manager.chain_store().clone(); services.spawn(async move { tracing::info!("Starting indexer service"); // Continuously listen for head changes loop { - for ts in receiver.recv().await?.applies { + for ts in head_changes_rx.recv().await?.applies { tracing::debug!("Indexing tipset {}", ts.key()); let delegated_messages = chain_store.headers_delegated_messages(ts.block_headers().iter())?; diff --git a/src/message_pool/mod.rs b/src/message_pool/mod.rs index abcbae2807ec..e30066e2449e 100644 --- a/src/message_pool/mod.rs +++ b/src/message_pool/mod.rs @@ -9,7 +9,7 @@ mod msgpool; pub use self::{ config::*, errors::*, - msgpool::{msg_pool::MessagePool, provider::MpoolRpcProvider, *}, + msgpool::{msg_pool::MessagePool, *}, }; pub use block_prob::block_probabilities; diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 26e9736aad92..427cd989705d 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -544,7 +544,7 @@ where mp.load_local()?; - let mut subscriber = mp.api.subscribe_head_changes(); + let mut head_changes_rx = mp.api.subscribe_head_changes(); let api = mp.api.clone(); let bls_sig_cache = mp.bls_sig_cache.clone(); @@ -557,7 +557,7 @@ where // Reacts to new HeadChanges services.spawn(async move { loop { - match subscriber.recv().await { + match head_changes_rx.recv().await { Ok(HeadChanges { reverts, applies }) => { if let Err(e) = head_change( api.as_ref(), diff --git a/src/message_pool/msgpool/provider.rs b/src/message_pool/msgpool/provider.rs index 8180b71d19eb..0005f2edcc65 100644 --- a/src/message_pool/msgpool/provider.rs +++ b/src/message_pool/msgpool/provider.rs @@ -1,11 +1,10 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::sync::Arc; - use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; -use crate::chain::HeadChanges; +use crate::chain::{ChainStore, HeadChanges}; use crate::message::{ChainMessage, SignedMessage}; +use crate::message_pool::errors::Error; use crate::message_pool::msg_pool::{ MAX_ACTOR_PENDING_MESSAGES, MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES, }; @@ -16,22 +15,19 @@ use crate::shim::{ message::Message, state_tree::{ActorState, StateTree}, }; -use crate::state_manager::StateManager; use crate::utils::db::CborStoreExt; -use async_trait::async_trait; +use auto_impl::auto_impl; use cid::Cid; use fvm_ipld_blockstore::Blockstore; -use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher}; - -use crate::message_pool::errors::Error; +use tokio::sync::broadcast; /// Provider Trait. This trait will be used by the message pool to interact with /// some medium in order to do the operations that are listed below that are /// required for the message pool. -#[async_trait] +#[auto_impl(Arc)] pub trait Provider { /// Update `Mpool`'s `cur_tipset` whenever there is a change to the provider - fn subscribe_head_changes(&self) -> Subscriber; + fn subscribe_head_changes(&self) -> broadcast::Receiver; /// Get the heaviest Tipset in the provider fn get_heaviest_tipset(&self) -> Tipset; /// Add a message to the `MpoolProvider`, return either Cid or Error @@ -60,30 +56,17 @@ pub trait Provider { } } -/// This is the default Provider implementation that will be used for the -/// `mpool` RPC. -#[derive(derive_more::Constructor)] -pub struct MpoolRpcProvider { - subscriber: Publisher, - sm: Arc>, -} - -#[async_trait] -impl Provider for MpoolRpcProvider -where - DB: Blockstore + Sync + Send + 'static, -{ - fn subscribe_head_changes(&self) -> Subscriber { - self.subscriber.subscribe() +impl Provider for ChainStore { + fn subscribe_head_changes(&self) -> broadcast::Receiver { + self.subscribe_head_changes() } fn get_heaviest_tipset(&self) -> Tipset { - self.sm.chain_store().heaviest_tipset() + self.heaviest_tipset() } fn put_message(&self, msg: &ChainMessage) -> Result { let cid = self - .sm .blockstore() .put_cbor_default(msg) .map_err(|err| Error::Other(err.to_string()))?; @@ -91,7 +74,7 @@ where } fn get_actor_after(&self, addr: &Address, ts: &Tipset) -> Result { - let state = StateTree::new_from_root(self.sm.blockstore_owned(), ts.parent_state()) + let state = StateTree::new_from_root(self.blockstore().clone(), ts.parent_state()) .map_err(|e| Error::Other(e.to_string()))?; Ok(state.get_required_actor(addr)?) } @@ -100,20 +83,16 @@ where &self, h: &CachingBlockHeader, ) -> Result<(Vec, Vec), Error> { - crate::chain::block_messages(self.sm.blockstore(), h).map_err(|err| err.into()) + crate::chain::block_messages(self.blockstore(), h).map_err(|err| err.into()) } fn load_tipset(&self, tsk: &TipsetKey) -> Result { - Ok(self - .sm - .chain_store() - .chain_index() - .load_required_tipset(tsk)?) + Ok(self.chain_index().load_required_tipset(tsk)?) } fn chain_compute_base_fee(&self, ts: &Tipset) -> Result { - let smoke_height = self.sm.chain_config().epoch(Height::Smoke); - crate::chain::compute_base_fee(self.sm.blockstore(), ts, smoke_height) + let smoke_height = self.chain_config().epoch(Height::Smoke); + crate::chain::compute_base_fee(self.blockstore(), ts, smoke_height) .map_err(|err| err.into()) } } diff --git a/src/message_pool/msgpool/test_provider.rs b/src/message_pool/msgpool/test_provider.rs index 0e18816deabf..5dd08dff2a1f 100644 --- a/src/message_pool/msgpool/test_provider.rs +++ b/src/message_pool/msgpool/test_provider.rs @@ -14,18 +14,16 @@ use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; use crate::message_pool::{Error, provider::Provider}; use crate::shim::{address::Address, econ::TokenAmount, message::Message, state_tree::ActorState}; use ahash::HashMap; -use async_trait::async_trait; use cid::Cid; use num::BigInt; use parking_lot::Mutex; use tokio::sync::broadcast; -use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher}; /// Structure used for creating a provider when writing tests involving message /// pool pub struct TestApi { pub inner: Mutex, - pub publisher: Publisher, + pub head_changes_tx: broadcast::Sender, } #[derive(Default)] @@ -40,13 +38,13 @@ pub struct TestApiInner { impl Default for TestApi { /// Create a new `TestApi` fn default() -> Self { - let (publisher, _) = broadcast::channel(1); + let (head_changes_tx, _) = broadcast::channel(1); TestApi { inner: Mutex::new(TestApiInner { max_actor_pending_messages: 20000, ..TestApiInner::default() }), - publisher, + head_changes_tx, } } } @@ -60,7 +58,7 @@ impl TestApi { max_actor_pending_messages, ..TestApiInner::default() }), - publisher, + head_changes_tx: publisher, } } @@ -81,7 +79,7 @@ impl TestApi { /// Set the heaviest tipset for `TestApi` pub fn set_heaviest_tipset(&self, ts: Tipset) { - self.publisher + self.head_changes_tx .send(HeadChanges { applies: vec![ts], reverts: vec![], @@ -122,10 +120,9 @@ impl TestApiInner { } } -#[async_trait] impl Provider for TestApi { - fn subscribe_head_changes(&self) -> Subscriber { - self.publisher.subscribe() + fn subscribe_head_changes(&self) -> broadcast::Receiver { + self.head_changes_tx.subscribe() } fn get_heaviest_tipset(&self) -> Tipset { diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 34318f09f9d7..535d6877a580 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -84,10 +84,10 @@ pub(crate) fn new_heads( ) -> (Subscriber, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - let mut subscriber = data.chain_store().publisher().subscribe(); + let mut head_changes_rx = data.chain_store().subscribe_head_changes(); let handle = tokio::spawn(async move { - while let Ok(changes) = subscriber.recv().await { + while let Ok(changes) = head_changes_rx.recv().await { for ts in changes.applies { // Convert the tipset to an Ethereum block with full transaction info // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block @@ -121,12 +121,12 @@ pub(crate) fn logs( ) -> (Subscriber>, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - let mut subscriber = ctx.chain_store().publisher().subscribe(); + let mut head_changes_rx = ctx.chain_store().subscribe_head_changes(); let ctx = ctx.clone(); let handle = tokio::spawn(async move { - while let Ok(changes) = subscriber.recv().await { + while let Ok(changes) = head_changes_rx.recv().await { for ts in changes.applies { match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { Ok(logs) => { @@ -1320,12 +1320,12 @@ pub(crate) fn chain_notify( .send(vec![ApiHeadChange { change, tipset }]) .expect("receiver is not dropped"); - let mut subscriber = data.chain_store().publisher().subscribe(); + let mut head_changes_rx = data.chain_store().subscribe_head_changes(); tokio::spawn(async move { // Skip first message - let _ = subscriber.recv().await; - while let Ok(changes) = subscriber.recv().await { + let _ = head_changes_rx.recv().await; + while let Ok(changes) = head_changes_rx.recv().await { let api_changes = changes .into_change_vec() .into_iter() diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 4e7da8d7be18..070614ff78a4 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -173,7 +173,7 @@ mod tests { use crate::db::MemoryDB; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::{NetworkMessage, PeerManager}; - use crate::message_pool::{MessagePool, MpoolRpcProvider}; + use crate::message_pool::MessagePool; use crate::networks::ChainConfig; use crate::rpc::RPCState; use crate::rpc::eth::filter::EthEventHandler; @@ -202,7 +202,6 @@ mod tests { ); let state_manager = Arc::new(StateManager::new(cs_arc.clone()).unwrap()); - let state_manager_for_thread = state_manager.clone(); let cs_for_test = &cs_arc; let mpool_network_send = network_send.clone(); let pool = { @@ -218,13 +217,11 @@ mod tests { db.put_keyed(&i, &bz2).unwrap(); } - let provider = - MpoolRpcProvider::new(cs_arc.publisher().clone(), state_manager_for_thread.clone()); MessagePool::new( - provider, + cs_arc, mpool_network_send, Default::default(), - state_manager_for_thread.chain_config().clone(), + state_manager.chain_config().clone(), &mut services, ) .unwrap() diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d759728fbbd5..f7cc4bef31be 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -471,7 +471,7 @@ const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; pub struct RPCState { pub keystore: Arc>, pub state_manager: Arc>, - pub mpool: Arc>>, + pub mpool: Arc>>>, pub bad_blocks: Option>, pub msgs_in_tipset: Arc, pub sync_status: crate::chain_sync::SyncStatus, diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 7f2c5179fdd3..2293b5b0ddf6 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -1171,7 +1171,7 @@ where look_back_limit: Option, allow_replaced: Option, ) -> Result<(Option, Option), Error> { - let mut subscriber = self.cs.publisher().subscribe(); + let mut head_changes_rx = self.cs.subscribe_head_changes(); let (sender, mut receiver) = oneshot::channel::<()>(); let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) .map_err(|err| Error::Other(format!("failed to load message {err:}")))?; @@ -1209,7 +1209,7 @@ where // Wait for message to be included in head change. let mut subscriber_poll = tokio::task::spawn(async move { loop { - match subscriber.recv().await { + match head_changes_rx.recv().await { Ok(head_changes) => { for tipset in head_changes.reverts { if candidate_tipset diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 84b6a110d877..01b273eaa55c 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -15,7 +15,7 @@ use crate::db::{ use crate::genesis::read_genesis_header; use crate::key_management::{KeyStore, KeyStoreConfig}; use crate::libp2p::PeerManager; -use crate::message_pool::{MessagePool, MpoolRpcProvider}; +use crate::message_pool::MessagePool; use crate::networks::{ChainConfig, NetworkChain}; use crate::rpc::eth::filter::EthEventHandler; use crate::rpc::{RPCState, start_rpc}; @@ -83,7 +83,7 @@ where let (tipset_send, _) = flume::bounded(5); let message_pool = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + chain_store.clone(), network_send.clone(), Default::default(), state_manager.chain_config().clone(), diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index c7a39a99b3e8..e20f7e4f866f 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -17,7 +17,7 @@ use crate::{ genesis::read_genesis_header, libp2p::{NetworkMessage, PeerManager}, libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite, Block64}, - message_pool::{MessagePool, MpoolRpcProvider}, + message_pool::MessagePool, networks::ChainConfig, shim::address::CurrentNetwork, state_manager::StateManager, @@ -132,7 +132,7 @@ async fn ctx( let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); let message_pool = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + chain_store.clone(), network_send.clone(), Default::default(), state_manager.chain_config().clone(), diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 86feb82e3ec8..463f95e220e0 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -12,7 +12,7 @@ use crate::{ genesis::read_genesis_header, libp2p::{NetworkMessage, PeerManager}, lotus_json::HasLotusJson, - message_pool::{MessagePool, MpoolRpcProvider}, + message_pool::MessagePool, networks::{ChainConfig, NetworkChain}, rpc::{ ApiPaths, RPCState, RpcMethod, RpcMethodExt as _, @@ -150,7 +150,7 @@ async fn ctx( )?); let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); let message_pool = MessagePool::new( - MpoolRpcProvider::new(chain_store.publisher().clone(), state_manager.clone()), + chain_store.clone(), network_send.clone(), Default::default(), state_manager.chain_config().clone(), From c6258a6660a52343d2c3520d1fa7ccd05f44de97 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 16 Mar 2026 15:08:08 +0800 Subject: [PATCH 17/20] fix --- src/chain/store/indexer.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs index 539354f039b3..ce4567b022e9 100644 --- a/src/chain/store/indexer.rs +++ b/src/chain/store/indexer.rs @@ -71,8 +71,6 @@ impl SqliteIndexerOptions { } pub struct SqliteIndexer { - // ensures writes are serialized so backfilling does not race with index updates - mu: tokio::sync::Mutex<()>, options: SqliteIndexerOptions, cs: Arc>, db: sqlx::SqlitePool, @@ -100,7 +98,6 @@ where .await?; let stmts = PreparedStatements::default(); Ok(Self { - mu: tokio::sync::Mutex::const_new(()), options, cs, db, @@ -128,7 +125,11 @@ where let HeadChanges { reverts, applies } = head_change_subscriber.recv().await?; for ts in reverts { if let Err(e) = self.revert_tipset(&ts).await { - tracing::warn!("failed to index new head@{}({}): {e}", ts.epoch(), ts.key()); + tracing::warn!( + "failed to revert new head@{}({}): {e}", + ts.epoch(), + ts.key() + ); } } for ts in applies { @@ -557,13 +558,14 @@ where } pub async fn revert_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + tracing::debug!("reverting tipset@{}[{}]", ts.epoch(), ts.key().terse()); let tsk_cid_bytes = ts.key().cid()?.to_bytes(); // Because of deferred execution in Filecoin, events at tipset T are reverted when a tipset T+1 is reverted. // However, the tipet `T` itself is not reverted. let pts = Tipset::load_required(self.cs.blockstore(), ts.parents())?; let events_tsk_cid_bytes = pts.key().cid()?.to_bytes(); let mut tx = self.db.begin().await?; - sqlx::query(self.stmts.update_events_to_reverted) + sqlx::query(self.stmts.update_tipset_to_reverted) .bind(&tsk_cid_bytes) .execute(tx.deref_mut()) .await?; @@ -578,6 +580,7 @@ where } pub async fn index_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + tracing::debug!("indexing tipset@{}[{}]", ts.epoch(), ts.key().terse()); let mut tx = self.db.begin().await?; self.index_tipset_and_parent_events_with_tx(&mut tx, ts) .await?; From 664ffc522785920aa9fdd7927072cc86917ce7ad Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 16 Mar 2026 16:29:45 +0800 Subject: [PATCH 18/20] complete index validation --- src/chain/store/indexer.rs | 82 +++++++++++++++++++++++++++++++-- src/chain/store/indexer/ddls.rs | 6 +++ 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs index ce4567b022e9..c953676ee505 100644 --- a/src/chain/store/indexer.rs +++ b/src/chain/store/indexer.rs @@ -50,9 +50,6 @@ struct IndexedTipsetData { #[derive(Debug, smart_default::SmartDefault)] pub struct SqliteIndexerOptions { pub gc_retention_epochs: ChainEpoch, - pub reconcile_empty_index: bool, - #[default(3 * EPOCHS_IN_DAY as u64)] - pub max_reconcile_tipsets: u64, } impl SqliteIndexerOptions { @@ -423,7 +420,32 @@ where ); // compare the events AMT root between the indexed events and the events in the chain state - for (message, _, _) in executed_messages {} + for (message, receipt, _) in executed_messages { + let msg_cid = message.cid(); + let indexed_events_root = self.amt_root_for_events(&tsk_cid, &msg_cid).await?; + match (indexed_events_root, receipt.events_root()) { + (Some(a), Some(b)) => { + anyhow::ensure!( + a == b, + "index corruption: events AMT root mismatch for message {msg_cid} at height {}. Index root: {a}, Receipt root: {b}", + ts.epoch() + ); + } + (None, None) => continue, + (Some(_), None) => { + anyhow::bail!( + "index corruption: events found in index for message {msg_cid} at height {}, but message receipt has no events root", + ts.epoch() + ) + } + (None, Some(b)) => { + anyhow::bail!( + "index corruption: no events found in index for message {msg_cid} at height {}, but message receipt has events root {b}", + ts.epoch() + ) + } + } + } Ok(()) } @@ -464,6 +486,58 @@ where Ok(row.map(|r| (r.get(0), r.get(1)))) } + async fn amt_root_for_events( + &self, + tsk_cid: &Cid, + msg_cid: &Cid, + ) -> anyhow::Result> { + use crate::state_manager::EVENTS_AMT_BITWIDTH; + use fil_actors_shared::fvm_ipld_amt::Amt; + use fvm_ipld_blockstore::MemoryBlockstore; + use fvm_shared4::event::{ActorEvent, Entry, Flags, StampedEvent}; + + let mut tx = self.db.begin().await?; + let rows = sqlx::query(self.stmts.get_event_id_and_emitter_id) + .bind(tsk_cid.to_bytes()) + .bind(msg_cid.to_bytes()) + .fetch_all(tx.deref_mut()) + .await?; + if rows.is_empty() { + Ok(None) + } else { + let mut events = Amt::new_with_bit_width(MemoryBlockstore::new(), EVENTS_AMT_BITWIDTH); + for (i, row) in rows.iter().enumerate() { + let event_id: i64 = row.get(0); + let actor_id: i64 = row.get(1); + let mut event = StampedEvent { + emitter: actor_id as _, + event: ActorEvent { entries: vec![] }, + }; + let rows2 = sqlx::query(self.stmts.get_event_entries) + .bind(event_id) + .fetch_all(tx.deref_mut()) + .await?; + for row2 in rows2 { + let flags: Vec = row2.get(0); + if let Some(&flags) = flags.first() { + let key: String = row2.get(1); + let codec: i64 = row2.get(2); + let value: Vec = row2.get(3); + event.event.entries.push(Entry { + flags: Flags::from_bits_retain(flags as _), + key, + codec: codec as _, + value, + }); + } + } + events.set(i as _, event)?; + } + + Ok(Some(events.flush()?)) + } + } + fn load_executed_messages( &self, msg_ts: &Tipset, diff --git a/src/chain/store/indexer/ddls.rs b/src/chain/store/indexer/ddls.rs index 2f92d0d4ae10..64000cb7e95e 100644 --- a/src/chain/store/indexer/ddls.rs +++ b/src/chain/store/indexer/ddls.rs @@ -64,6 +64,8 @@ pub struct PreparedStatements { pub insert_event_entry: &'static str, pub remove_tipsets_before_height: &'static str, pub remove_eth_hashes_older_than: &'static str, + pub get_event_entries: &'static str, + pub get_event_id_and_emitter_id: &'static str, } impl Default for PreparedStatements { @@ -93,6 +95,8 @@ impl Default for PreparedStatements { let remove_tipsets_before_height = "DELETE FROM tipset_message WHERE height < ?"; let remove_eth_hashes_older_than = "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)"; + let get_event_entries = "SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC"; + let get_event_id_and_emitter_id = "SELECT e.id, e.emitter_id FROM event e JOIN tipset_message tm ON e.message_id = tm.id WHERE tm.tipset_key_cid = ? AND tm.message_cid = ? ORDER BY e.event_index ASC"; Self { has_tipset, @@ -115,6 +119,8 @@ impl Default for PreparedStatements { insert_event_entry, remove_tipsets_before_height, remove_eth_hashes_older_than, + get_event_entries, + get_event_id_and_emitter_id, } } } From 5594a0dc4fcc4ff0b9d3efcfc3d388c204417fb4 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 16 Mar 2026 17:06:54 +0800 Subject: [PATCH 19/20] promote get_chain_path failure log level --- src/chain/store/chain_store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 7741971e9dd7..84086620907b 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -42,7 +42,7 @@ use parking_lot::{Mutex, RwLock}; use serde::{Serialize, de::DeserializeOwned}; use std::{num::NonZeroUsize, sync::Arc}; use tokio::sync::broadcast; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; // A cap on the size of the future_sink const SINK_CAP: usize = 200; @@ -168,7 +168,7 @@ where Err(e) => { // Do not warn when the old head is genesis if old_head.epoch() > 0 { - warn!("failed to get chain path changes: {e}"); + error!("failed to get chain path changes: {e}"); } // Fallback to single apply PathChanges { From b63a8a57b9d7ec316e105fb6a5c0895eb027321d Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 16 Mar 2026 19:21:55 +0800 Subject: [PATCH 20/20] write lock for sqlite --- src/chain/store/indexer.rs | 86 +++++++++++++++++++------------- src/cli/subcommands/index_cmd.rs | 6 +-- 2 files changed, 54 insertions(+), 38 deletions(-) diff --git a/src/chain/store/indexer.rs b/src/chain/store/indexer.rs index b5be7bfc869b..d0e7e86711c0 100644 --- a/src/chain/store/indexer.rs +++ b/src/chain/store/indexer.rs @@ -68,6 +68,7 @@ impl SqliteIndexerOptions { } pub struct SqliteIndexer { + lock: tokio::sync::Mutex<()>, options: SqliteIndexerOptions, cs: Arc>, db: sqlx::SqlitePool, @@ -95,6 +96,7 @@ where .await?; let stmts = PreparedStatements::default(); Ok(Self { + lock: Default::default(), options, cs, db, @@ -104,6 +106,12 @@ where }) } + /// Acquires write lock. Note that `WAL` (Write-Ahead Logging) mode is enabled to allow + /// concurrent reads to occur while a single write transaction is active + pub async fn aquire_write_lock(&self) -> tokio::sync::MutexGuard<'_, ()> { + self.lock.lock().await + } + pub fn with_actor_to_delegated_address_func(mut self, f: ActorToDelegatedAddressFunc) -> Self { self.actor_to_delegated_address_func = Some(f); self @@ -120,6 +128,7 @@ where ) -> anyhow::Result<()> { loop { let HeadChanges { reverts, applies } = head_changes_rx.recv().await?; + let _lock = self.aquire_write_lock().await; for ts in reverts { if let Err(e) = self.revert_tipset(&ts).await { tracing::warn!( @@ -151,6 +160,7 @@ where } async fn gc(&self) { + let _lock = self.aquire_write_lock().await; tracing::info!("starting index gc"); let head = self.cs.heaviest_tipset(); let removal_epoch = head.epoch() - self.options.gc_retention_epochs - 10; // 10 is for some grace period @@ -207,11 +217,44 @@ where } } + pub async fn populate(&self) -> anyhow::Result<()> { + let _lock = self.aquire_write_lock().await; + let start = Instant::now(); + let head = self.cs.heaviest_tipset(); + tracing::info!( + "starting to populate chainindex at head epoch {}", + head.epoch() + ); + let mut tx = self.db.begin().await?; + let mut total_indexed = 0; + for ts in head.chain(self.cs.blockstore()) { + if let Err(e) = self.index_tipset_with_tx(&mut tx, &ts).await { + tracing::info!( + "stopping chainindex population at epoch {}: {e}", + ts.epoch() + ); + break; + } + total_indexed += 1; + } + tx.commit().await?; + tracing::info!( + "successfully populated chain index with {total_indexed} tipsets, took {}", + humantime::format_duration(start.elapsed()) + ); + Ok(()) + } + pub async fn validate_index( &self, epoch: ChainEpoch, backfill: bool, ) -> anyhow::Result { + let _lock = if backfill { + Some(self.aquire_write_lock().await) + } else { + None + }; let head = self.cs.heaviest_tipset(); anyhow::ensure!( epoch < head.epoch(), @@ -604,34 +647,7 @@ where Ok(executed) } - pub async fn populate(&self) -> anyhow::Result<()> { - let start = Instant::now(); - let head = self.cs.heaviest_tipset(); - tracing::info!( - "starting to populate chainindex at head epoch {}", - head.epoch() - ); - let mut tx = self.db.begin().await?; - let mut total_indexed = 0; - for ts in head.chain(self.cs.blockstore()) { - if let Err(e) = self.index_tipset_with_tx(&mut tx, &ts).await { - tracing::info!( - "stopping chainindex population at epoch {}: {e}", - ts.epoch() - ); - break; - } - total_indexed += 1; - } - tx.commit().await?; - tracing::info!( - "successfully populated chain index with {total_indexed} tipsets, took {}", - humantime::format_duration(start.elapsed()) - ); - Ok(()) - } - - pub async fn revert_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + async fn revert_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { tracing::debug!("reverting tipset@{}[{}]", ts.epoch(), ts.key().terse()); let tsk_cid_bytes = ts.key().cid()?.to_bytes(); // Because of deferred execution in Filecoin, events at tipset T are reverted when a tipset T+1 is reverted. @@ -653,7 +669,7 @@ where Ok(()) } - pub async fn index_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { + async fn index_tipset(&self, ts: &Tipset) -> anyhow::Result<()> { tracing::debug!("indexing tipset@{}[{}]", ts.epoch(), ts.key().terse()); let mut tx = self.db.begin().await?; self.index_tipset_and_parent_events_with_tx(&mut tx, ts) @@ -662,7 +678,7 @@ where Ok(()) } - pub async fn index_tipset_with_tx<'a>( + async fn index_tipset_with_tx<'a>( &self, tx: &mut sqlx::SqliteTransaction<'a>, ts: &Tipset, @@ -716,7 +732,7 @@ where } } - pub async fn index_tipset_and_parent_events_with_tx<'a>( + async fn index_tipset_and_parent_events_with_tx<'a>( &self, tx: &mut sqlx::SqliteTransaction<'a>, ts: &Tipset, @@ -741,7 +757,7 @@ where .map_err(|e| anyhow::anyhow!("failed to index events: {e}")) } - pub async fn index_events_with_tx<'a>( + async fn index_events_with_tx<'a>( &self, tx: &mut sqlx::SqliteTransaction<'a>, msg_ts: &Tipset, @@ -841,7 +857,7 @@ where Ok(()) } - pub async fn restore_tipset_if_exists_with_tx<'a>( + async fn restore_tipset_if_exists_with_tx<'a>( &self, tx: &mut sqlx::SqliteTransaction<'a>, tsk_cid_bytes: &[u8], @@ -866,7 +882,7 @@ where } } - pub async fn index_signed_message_with_tx<'a>( + async fn index_signed_message_with_tx<'a>( &self, tx: &mut sqlx::SqliteTransaction<'a>, smsg: &SignedMessage, @@ -882,7 +898,7 @@ where .await } - pub async fn index_eth_tx_hash_with_tx<'a>( + async fn index_eth_tx_hash_with_tx<'a>( &self, tx: &mut sqlx::SqliteTransaction<'a>, tx_hash: EthHash, diff --git a/src/cli/subcommands/index_cmd.rs b/src/cli/subcommands/index_cmd.rs index d16cfadb54b0..73bb586f62c9 100644 --- a/src/cli/subcommands/index_cmd.rs +++ b/src/cli/subcommands/index_cmd.rs @@ -25,8 +25,8 @@ pub enum IndexCommands { #[arg(long, required = true)] to: ChainEpoch, /// determines whether to backfill missing index entries during validation - #[arg(long, default_value_t = true)] - backfill: bool, + #[arg(long, default_missing_value = "true", default_value = "true")] + backfill: Option, }, } @@ -34,7 +34,7 @@ impl IndexCommands { pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> { match self { Self::ValidateBackfill { from, to, backfill } => { - validate_backfill(&client, from, to, backfill).await + validate_backfill(&client, from, to, backfill.unwrap_or_default()).await } } }