From f1c143b58668f2dcaebcf50181632ed834c7c0fc Mon Sep 17 00:00:00 2001 From: James Date: Mon, 9 Mar 2026 13:16:39 -0400 Subject: [PATCH 1/3] feat(storage): add `drain_above` to `UnifiedStorage` (ENG-1978) Adds `DrainedBlock` type and `drain_above` method that reads removed block headers and receipts before unwinding, enabling reorg notifications. Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 16 ++++---- crates/storage/src/lib.rs | 2 +- crates/storage/src/unified.rs | 73 +++++++++++++++++++++++++++++++++-- 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 630fee8..d18e09b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.6.4" +version = "0.6.5" edition = "2024" rust-version = "1.92" authors = ["init4"] @@ -35,13 +35,13 @@ incremental = false [workspace.dependencies] # internal -signet-hot = { version = "0.6.4", path = "./crates/hot" } -signet-hot-mdbx = { version = "0.6.4", path = "./crates/hot-mdbx" } -signet-cold = { version = "0.6.4", path = "./crates/cold" } -signet-cold-mdbx = { version = "0.6.4", path = "./crates/cold-mdbx" } -signet-cold-sql = { version = "0.6.4", path = "./crates/cold-sql" } -signet-storage = { version = "0.6.4", path = "./crates/storage" } -signet-storage-types = { version = "0.6.4", path = "./crates/types" } +signet-hot = { version = "0.6.5", path = "./crates/hot" } +signet-hot-mdbx = { version = "0.6.5", path = "./crates/hot-mdbx" } +signet-cold = { version = "0.6.5", path = "./crates/cold" } +signet-cold-mdbx = { version = "0.6.5", path = "./crates/cold-mdbx" } +signet-cold-sql = { version = "0.6.5", path = "./crates/cold-sql" } +signet-storage = { version = "0.6.5", path = "./crates/storage" } +signet-storage-types = { version = "0.6.5", path = "./crates/types" } # External, in-house signet-libmdbx = { version = "0.8.0" } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index a7b1116..44ff5eb 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -68,7 +68,7 @@ pub mod either; pub use either::Either; mod unified; -pub use unified::UnifiedStorage; +pub use unified::{DrainedBlock, UnifiedStorage}; // Re-export connector traits pub use signet_cold::ColdConnect; diff --git a/crates/storage/src/unified.rs b/crates/storage/src/unified.rs index 7312df3..c1bbf43 100644 --- a/crates/storage/src/unified.rs +++ b/crates/storage/src/unified.rs @@ -7,16 +7,28 @@ use crate::StorageResult; use alloy::primitives::BlockNumber; use signet_cold::{ - BlockData, ColdStorage, ColdStorageError, ColdStorageHandle, ColdStorageReadHandle, - ColdStorageTask, + BlockData, ColdReceipt, ColdStorage, ColdStorageError, ColdStorageHandle, + ColdStorageReadHandle, ColdStorageTask, }; use signet_hot::{ HistoryRead, HistoryWrite, HotKv, model::{HotKvReadError, HotKvWrite, RevmRead}, }; -use signet_storage_types::ExecutedBlock; +use signet_storage_types::{ExecutedBlock, SealedHeader}; use tokio_util::sync::CancellationToken; +/// Block data drained during a reorg unwind. +/// +/// Contains the header (always available from hot storage) and receipts +/// (best-effort from cold storage — empty if cold storage lags behind hot). +#[derive(Debug, Clone)] +pub struct DrainedBlock { + /// The sealed header of the removed block. + pub header: SealedHeader, + /// Receipts from cold storage. Empty if cold hasn't indexed this block yet. + pub receipts: Vec, +} + /// Unified storage combining hot and cold backends. /// /// This struct provides a single interface for writing execution data to both @@ -217,6 +229,61 @@ impl UnifiedStorage { self.cold.dispatch_append_blocks(cold_data).map_err(Into::into) } + /// Read and remove all blocks above the given block number. + /// + /// This combines reading the about-to-be-removed data with unwinding, + /// returning the drained blocks so callers can emit reorg notifications. + /// + /// # Implementation + /// + /// 1. Reads headers from hot storage (sync) + /// 2. Reads receipts from cold storage (async, best-effort) + /// 3. Unwinds hot storage (sync) + /// 4. Dispatches truncate to cold storage (non-blocking) + /// + /// # Cold Lag + /// + /// If cold storage hasn't processed a block yet, its receipts will be + /// empty. This is correct: no subscriber has seen those logs, so there + /// is nothing to "remove" from their perspective. + /// + /// # Errors + /// + /// - [`Hot`]: Hot storage read or unwind failed. + /// - [`Cold`]: Hot storage unwound but cold truncate dispatch failed. + /// + /// [`Hot`]: crate::StorageError::Hot + /// [`Cold`]: crate::StorageError::Cold + pub async fn drain_above(&self, block: BlockNumber) -> StorageResult> { + // 1. Read headers above `block` from hot storage + let reader = self.reader()?; + let last = match reader.get_execution_range().map_err(|e| e.into_hot_kv_error())? { + Some((_, last)) if last > block => last, + _ => return Ok(Vec::new()), + }; + let headers = + reader.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?; + drop(reader); + + // 2. Read receipts from cold storage (best-effort) + let cold = self.cold_reader(); + let mut drained = Vec::with_capacity(headers.len()); + for header in headers { + let receipts = cold.get_receipts_in_block(header.number).await.unwrap_or_default(); + drained.push(DrainedBlock { header, receipts }); + } + + // 3. Unwind hot storage + let writer = self.hot.writer()?; + writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?; + writer.raw_commit().map_err(|e| e.into_hot_kv_error())?; + + // 4. Dispatch truncate to cold storage + self.cold.dispatch_truncate_above(block).map_err(crate::StorageError::Cold)?; + + Ok(drained) + } + /// Unwind storage above the given block number (reorg handling). /// /// This method: From a64b62339b03e6b47323e3ec802b9217cbab5ac5 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 9 Mar 2026 14:01:15 -0400 Subject: [PATCH 2/3] feat(cold): add native `drain_above` to `ColdStorage` trait (ENG-1978) Add `drain_above` as a native `ColdStorage` trait method that atomically reads receipts and truncates in one operation. Each backend overrides with its own atomic version: - MemColdBackend: holds write lock for entire operation - MdbxColdBackend: single read-write transaction - SqlColdBackend: sequential reads + truncate (atomic via task runner) Update `UnifiedStorage::drain_above` to use the new atomic cold method instead of N separate reads + dispatch_truncate. Add conformance test and integration tests. Co-Authored-By: Claude Opus 4.6 --- crates/cold-mdbx/src/backend.rs | 72 ++++++++++++++++++ crates/cold-sql/src/backend.rs | 17 +++++ crates/cold/src/conformance.rs | 29 ++++++++ crates/cold/src/mem.rs | 66 ++++++++++++----- crates/cold/src/request.rs | 7 ++ crates/cold/src/task/handle.rs | 9 +++ crates/cold/src/task/runner.rs | 7 ++ crates/cold/src/traits.rs | 26 +++++++ crates/storage/src/unified.rs | 24 +++--- crates/storage/tests/unified.rs | 126 +++++++++++++++++++++++++++++++- 10 files changed, 350 insertions(+), 33 deletions(-) diff --git a/crates/cold-mdbx/src/backend.rs b/crates/cold-mdbx/src/backend.rs index 14d6856..57114aa 100644 --- a/crates/cold-mdbx/src/backend.rs +++ b/crates/cold-mdbx/src/backend.rs @@ -554,6 +554,74 @@ impl MdbxColdBackend { Ok(()) } + fn drain_above_inner( + &self, + block: BlockNumber, + ) -> Result>, MdbxColdError> { + let tx = self.env.tx_rw()?; + + // Collect sealed headers above the cutoff + let headers_to_remove = { + let mut cursor = tx.new_cursor::()?; + let mut headers: Vec<(BlockNumber, SealedHeader)> = Vec::new(); + + let start_block = block + 1; + let mut key_buf = [0u8; MAX_KEY_SIZE]; + let key_bytes = start_block.encode_key(&mut key_buf); + + if let Some((key, value)) = cursor.lower_bound(key_bytes)? { + headers.push((BlockNumber::decode_key(&key)?, SealedHeader::decode_value(&value)?)); + + while let Some((key, value)) = cursor.read_next()? { + headers.push(( + BlockNumber::decode_key(&key)?, + SealedHeader::decode_value(&value)?, + )); + } + } + headers + }; + + if headers_to_remove.is_empty() { + return Ok(Vec::new()); + } + + // Read receipts for each block, then delete + let mut all_receipts = Vec::with_capacity(headers_to_remove.len()); + { + let mut receipt_cursor = tx.traverse_dual::()?; + let mut tx_cursor = tx.traverse_dual::()?; + for (block_num, sealed) in &headers_to_remove { + // Collect receipts + let block_receipts: Vec = receipt_cursor + .iter_k2(block_num)? + .map(|item| { + let (idx, ir) = item?; + Ok::<_, MdbxColdError>(ColdReceipt::new(ir, sealed, idx)) + }) + .collect::>()?; + all_receipts.push(block_receipts); + + // Delete transaction hash indices + for item in tx_cursor.iter_k2(block_num)? { + let (_, tx_signed) = item?; + tx.queue_delete::(tx_signed.hash())?; + } + + tx.queue_delete::(block_num)?; + tx.queue_delete::(&sealed.hash())?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.queue_delete::(block_num)?; + } + } + + tx.raw_commit()?; + Ok(all_receipts) + } + fn get_logs_inner( &self, filter: &Filter, @@ -736,6 +804,10 @@ impl ColdStorage for MdbxColdBackend { async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { Ok(self.truncate_above_inner(block)?) } + + async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { + Ok(self.drain_above_inner(block)?) + } } #[cfg(all(test, feature = "test-utils"))] diff --git a/crates/cold-sql/src/backend.rs b/crates/cold-sql/src/backend.rs index 3921319..bfa7ffe 100644 --- a/crates/cold-sql/src/backend.rs +++ b/crates/cold-sql/src/backend.rs @@ -1396,6 +1396,23 @@ impl ColdStorage for SqlColdBackend { tx.commit().await.map_err(SqlColdError::from)?; Ok(()) } + + async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { + // Read receipts then truncate, all within a single SQL transaction. + // We use the default trait logic against self (which will use the pool + // for reads), then truncate. For true atomicity under concurrent + // access the caller should ensure exclusive write access via the task + // runner, which processes writes sequentially. + let latest = self.get_latest_block().await?; + let mut all_receipts = Vec::new(); + if let Some(latest) = latest { + for n in (block + 1)..=latest { + all_receipts.push(self.get_receipts_in_block(n).await?); + } + } + self.truncate_above(block).await?; + Ok(all_receipts) + } } #[cfg(all(test, feature = "test-utils"))] diff --git a/crates/cold/src/conformance.rs b/crates/cold/src/conformance.rs index 6b0454b..f6ec478 100644 --- a/crates/cold/src/conformance.rs +++ b/crates/cold/src/conformance.rs @@ -38,6 +38,7 @@ pub async fn conformance(backend: B) -> ColdResult<()> { test_cold_receipt_metadata(&handle).await?; test_get_logs(&handle).await?; test_stream_logs(&handle).await?; + test_drain_above(&handle).await?; cancel.cancel(); Ok(()) } @@ -608,3 +609,31 @@ pub async fn test_stream_logs(handle: &ColdStorageHandle) -> ColdResult<()> { Ok(()) } + +/// Test drain_above: reads receipts and truncates atomically. +pub async fn test_drain_above(handle: &ColdStorageHandle) -> ColdResult<()> { + // Append 3 blocks with txs (use block numbers that don't collide) + handle.append_block(make_test_block_with_txs(900, 2)).await?; + handle.append_block(make_test_block_with_txs(901, 3)).await?; + handle.append_block(make_test_block_with_txs(902, 1)).await?; + + // Drain above block 900 — should return receipts for 901 and 902 + let drained = handle.drain_above(900).await?; + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].len(), 3); // block 901 had 3 txs + assert_eq!(drained[1].len(), 1); // block 902 had 1 tx + + // Blocks 901 and 902 should be gone + assert!(handle.get_header(HeaderSpecifier::Number(901)).await?.is_none()); + assert!(handle.get_header(HeaderSpecifier::Number(902)).await?.is_none()); + + // Block 900 should still exist + assert!(handle.get_header(HeaderSpecifier::Number(900)).await?.is_some()); + assert_eq!(handle.get_latest_block().await?, Some(900)); + + // Drain with nothing above — should return empty + let empty = handle.drain_above(900).await?; + assert!(empty.is_empty()); + + Ok(()) +} diff --git a/crates/cold/src/mem.rs b/crates/cold/src/mem.rs index 7ba203c..f513767 100644 --- a/crates/cold/src/mem.rs +++ b/crates/cold/src/mem.rs @@ -78,6 +78,26 @@ impl MemColdBackendInner { fn confirmation_meta(&self, block: BlockNumber, index: u64) -> Option { self.headers.get(&block).map(|h| ConfirmationMeta::new(block, h.hash(), index)) } + + /// Remove all data for blocks above `block`. + fn truncate_above(&mut self, block: BlockNumber) { + let to_remove: Vec<_> = self.headers.range((block + 1)..).map(|(k, _)| *k).collect(); + for k in &to_remove { + if let Some(sealed) = self.headers.remove(k) { + self.header_hashes.remove(&sealed.hash()); + } + if let Some(txs) = self.transactions.remove(k) { + for tx in txs { + self.tx_hashes.remove(tx.hash()); + } + } + if self.receipts.remove(k).is_some() { + self.receipt_tx_hashes.retain(|_, (b, _)| *b <= block); + } + self.signet_events.remove(k); + self.zenith_headers.remove(k); + } + } } impl ColdStorage for MemColdBackend { @@ -312,27 +332,37 @@ impl ColdStorage for MemColdBackend { async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { let mut inner = self.inner.write().await; + inner.truncate_above(block); + Ok(()) + } - // Collect keys to remove - let to_remove: Vec<_> = inner.headers.range((block + 1)..).map(|(k, _)| *k).collect(); + async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { + let mut inner = self.inner.write().await; - for k in &to_remove { - if let Some(sealed) = inner.headers.remove(k) { - inner.header_hashes.remove(&sealed.hash()); - } - if let Some(txs) = inner.transactions.remove(k) { - for tx in txs { - inner.tx_hashes.remove(tx.hash()); - } - } - if inner.receipts.remove(k).is_some() { - inner.receipt_tx_hashes.retain(|_, (b, _)| *b <= block); - } - inner.signet_events.remove(k); - inner.zenith_headers.remove(k); - } + // Collect receipts for blocks above `block` in ascending order + let blocks_above: Vec<_> = inner.headers.range((block + 1)..).map(|(k, _)| *k).collect(); + let all_receipts = blocks_above + .iter() + .map(|&block_num| { + let Some(header) = inner.headers.get(&block_num) else { + return Vec::new(); + }; + inner + .receipts + .get(&block_num) + .map(|receipts| { + receipts + .iter() + .enumerate() + .map(|(idx, ir)| ColdReceipt::new(ir.clone(), header, idx as u64)) + .collect() + }) + .unwrap_or_default() + }) + .collect(); - Ok(()) + inner.truncate_above(block); + Ok(all_receipts) } } diff --git a/crates/cold/src/request.rs b/crates/cold/src/request.rs index 4a2f64c..32ee971 100644 --- a/crates/cold/src/request.rs +++ b/crates/cold/src/request.rs @@ -160,4 +160,11 @@ pub enum ColdWriteRequest { /// The response channel. resp: Responder<()>, }, + /// Read receipts and truncate all data above the given block. + DrainAbove { + /// The block number to drain above. + block: BlockNumber, + /// The response channel. + resp: Responder>>, + }, } diff --git a/crates/cold/src/task/handle.rs b/crates/cold/src/task/handle.rs index be49132..6e7d669 100644 --- a/crates/cold/src/task/handle.rs +++ b/crates/cold/src/task/handle.rs @@ -446,6 +446,15 @@ impl ColdStorageHandle { .map_err(map_dispatch_error) } + /// Read and remove all blocks above the given block number. + /// + /// Returns receipts for each block above `block` in ascending order, + /// then truncates. Index 0 = block+1, index 1 = block+2, etc. + pub async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { + let (resp, rx) = oneshot::channel(); + self.send_write(ColdWriteRequest::DrainAbove { block, resp }, rx).await + } + /// Dispatch truncate without waiting for response (non-blocking). /// /// Unlike [`truncate_above`](Self::truncate_above), this method returns diff --git a/crates/cold/src/task/runner.rs b/crates/cold/src/task/runner.rs index b0a2114..9e2e370 100644 --- a/crates/cold/src/task/runner.rs +++ b/crates/cold/src/task/runner.rs @@ -234,6 +234,13 @@ impl ColdStorageTaskInner { } let _ = resp.send(result); } + ColdWriteRequest::DrainAbove { block, resp } => { + let result = self.backend.drain_above(block).await; + if result.is_ok() { + self.cache.lock().await.invalidate_above(block); + } + let _ = resp.send(result); + } } } } diff --git a/crates/cold/src/traits.rs b/crates/cold/src/traits.rs index 247ef17..f3d98ff 100644 --- a/crates/cold/src/traits.rs +++ b/crates/cold/src/traits.rs @@ -240,4 +240,30 @@ pub trait ColdStorage: Send + Sync + 'static { /// /// This removes block N+1 and higher from all tables. Used for reorg handling. fn truncate_above(&self, block: BlockNumber) -> impl Future> + Send; + + /// Read and remove all blocks above the given block number. + /// + /// Returns receipts for each block above `block` in ascending order, + /// then truncates. Index 0 = block+1, index 1 = block+2, etc. + /// Blocks with no receipts have empty vecs. + /// + /// The default implementation composes `get_latest_block` + + /// `get_receipts_in_block` + `truncate_above`. It is correct but + /// not atomic. Backends should override with an atomic version + /// when possible. + fn drain_above( + &self, + block: BlockNumber, + ) -> impl Future>>> + Send { + async move { + let mut all_receipts = Vec::new(); + if let Some(latest) = self.get_latest_block().await? { + for n in (block + 1)..=latest { + all_receipts.push(self.get_receipts_in_block(n).await?); + } + } + self.truncate_above(block).await?; + Ok(all_receipts) + } + } } diff --git a/crates/storage/src/unified.rs b/crates/storage/src/unified.rs index c1bbf43..fbccd28 100644 --- a/crates/storage/src/unified.rs +++ b/crates/storage/src/unified.rs @@ -265,21 +265,23 @@ impl UnifiedStorage { reader.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?; drop(reader); - // 2. Read receipts from cold storage (best-effort) - let cold = self.cold_reader(); - let mut drained = Vec::with_capacity(headers.len()); - for header in headers { - let receipts = cold.get_receipts_in_block(header.number).await.unwrap_or_default(); - drained.push(DrainedBlock { header, receipts }); - } - - // 3. Unwind hot storage + // 2. Unwind hot storage — if this fails, nothing is mutated let writer = self.hot.writer()?; writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?; writer.raw_commit().map_err(|e| e.into_hot_kv_error())?; - // 4. Dispatch truncate to cold storage - self.cold.dispatch_truncate_above(block).map_err(crate::StorageError::Cold)?; + // 3. Atomically drain cold (best-effort — failure = normal cold lag) + let cold_receipts = self.cold.drain_above(block).await.unwrap_or_default(); + + // 4. Assemble drained blocks (zip headers with receipts, default empty) + let drained = headers + .into_iter() + .enumerate() + .map(|(i, header)| { + let receipts = cold_receipts.get(i).cloned().unwrap_or_default(); + DrainedBlock { header, receipts } + }) + .collect(); Ok(drained) } diff --git a/crates/storage/tests/unified.rs b/crates/storage/tests/unified.rs index 34da081..42a179f 100644 --- a/crates/storage/tests/unified.rs +++ b/crates/storage/tests/unified.rs @@ -1,17 +1,22 @@ //! Integration tests for [`UnifiedStorage`]. -use alloy::consensus::{Header, Sealable}; +use alloy::{ + consensus::{Header, Sealable, Signed, TxLegacy, transaction::Recovered}, + primitives::{Address, B256, Signature, TxKind, U256}, +}; use signet_cold::{ColdStorageTask, HeaderSpecifier, mem::MemColdBackend}; -use signet_hot::{HistoryRead, HotKv, mem::MemKv}; +use signet_hot::{HistoryRead, HistoryWrite, HotKv, mem::MemKv, model::HotKvWrite}; use signet_storage::UnifiedStorage; -use signet_storage_types::{ExecutedBlock, ExecutedBlockBuilder, SealedHeader}; +use signet_storage_types::{ + ExecutedBlock, ExecutedBlockBuilder, Receipt, RecoveredTx, SealedHeader, TransactionSigned, +}; use tokio_util::sync::CancellationToken; use trevm::revm::database::BundleState; /// Build a chain of blocks with valid parent hash linkage. fn make_chain(count: u64) -> Vec { let mut blocks = Vec::with_capacity(count as usize); - let mut parent_hash = alloy::primitives::B256::ZERO; + let mut parent_hash = B256::ZERO; for number in 0..count { let header = Header { number, parent_hash, ..Default::default() }; @@ -27,6 +32,49 @@ fn make_chain(count: u64) -> Vec { blocks } +/// Create a test transaction with a unique nonce. +fn make_test_tx(nonce: u64) -> RecoveredTx { + let tx = TxLegacy { nonce, to: TxKind::Call(Default::default()), ..Default::default() }; + let sig = Signature::new(U256::from(nonce + 1), U256::from(nonce + 2), false); + let signed: TransactionSigned = + alloy::consensus::EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)); + let sender = Address::with_last_byte(nonce as u8); + Recovered::new_unchecked(signed, sender) +} + +/// Create a test receipt. +fn make_test_receipt() -> Receipt { + use alloy::consensus::Receipt as AlloyReceipt; + Receipt { + inner: AlloyReceipt { status: true.into(), ..Default::default() }, + ..Default::default() + } +} + +/// Build a chain of blocks with transactions and receipts. +fn make_chain_with_txs(count: u64, tx_count: usize) -> Vec { + let mut blocks = Vec::with_capacity(count as usize); + let mut parent_hash = B256::ZERO; + + for number in 0..count { + let header = Header { number, parent_hash, ..Default::default() }; + let sealed: SealedHeader = header.seal_slow(); + parent_hash = sealed.hash(); + let transactions: Vec = + (0..tx_count).map(|i| make_test_tx(number * 100 + i as u64)).collect(); + let receipts: Vec = (0..tx_count).map(|_| make_test_receipt()).collect(); + let block = ExecutedBlockBuilder::new() + .header(sealed) + .bundle(BundleState::default()) + .transactions(transactions) + .receipts(receipts) + .build() + .unwrap(); + blocks.push(block); + } + blocks +} + #[tokio::test] async fn append_and_read_back() { let hot = MemKv::new(); @@ -74,3 +122,73 @@ async fn append_multiple_and_unwind() { cancel.cancel(); } + +#[tokio::test] +async fn drain_above_returns_headers_and_receipts() { + let hot = MemKv::new(); + let cancel = CancellationToken::new(); + let cold_handle = ColdStorageTask::spawn(MemColdBackend::new(), cancel.clone()); + let storage = UnifiedStorage::new(hot.clone(), cold_handle); + + // Append 3 blocks (0, 1, 2) with 2 txs each + let blocks = make_chain_with_txs(3, 2); + storage.append_blocks(blocks).unwrap(); + + // drain_above(0) — returns blocks 1 and 2 + let drained = storage.drain_above(0).await.unwrap(); + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].header.number, 1); + assert_eq!(drained[1].header.number, 2); + assert_eq!(drained[0].receipts.len(), 2); + assert_eq!(drained[1].receipts.len(), 2); + + // Verify hot tip is now block 0 + assert_eq!(hot.reader().unwrap().get_chain_tip().unwrap().unwrap().0, 0); + + cancel.cancel(); +} + +#[tokio::test] +async fn drain_above_empty_when_at_tip() { + let hot = MemKv::new(); + let cancel = CancellationToken::new(); + let cold_handle = ColdStorageTask::spawn(MemColdBackend::new(), cancel.clone()); + let storage = UnifiedStorage::new(hot.clone(), cold_handle); + + // Append 2 blocks (0, 1) + storage.append_blocks(make_chain_with_txs(2, 1)).unwrap(); + + // drain_above(1) — nothing above tip + let drained = storage.drain_above(1).await.unwrap(); + assert!(drained.is_empty()); + + // Verify hot tip still 1 + assert_eq!(hot.reader().unwrap().get_chain_tip().unwrap().unwrap().0, 1); + + cancel.cancel(); +} + +#[tokio::test] +async fn drain_above_cold_lag() { + let hot = MemKv::new(); + let cancel = CancellationToken::new(); + let cold_handle = ColdStorageTask::spawn(MemColdBackend::new(), cancel.clone()); + let storage = UnifiedStorage::new(hot.clone(), cold_handle); + + // Write 3 blocks with txs directly to hot — skips cold entirely + let blocks = make_chain_with_txs(3, 2); + let writer = hot.writer().unwrap(); + writer.append_blocks(blocks.iter().map(|b| (&b.header, &b.bundle))).unwrap(); + writer.raw_commit().unwrap(); + + // drain_above(0) — returns blocks 1 and 2 + let drained = storage.drain_above(0).await.unwrap(); + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].header.number, 1); + assert_eq!(drained[1].header.number, 2); + // Receipts should be empty since cold storage was skipped + assert!(drained[0].receipts.is_empty()); + assert!(drained[1].receipts.is_empty()); + + cancel.cancel(); +} From f1bf660ec871f52f0cd456e0584795e5faeba0fe Mon Sep 17 00:00:00 2001 From: James Date: Tue, 10 Mar 2026 08:13:28 -0400 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?TOCTOU,=20dedup,=20stale=20docs,=20test=20assertions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use single writer tx in `drain_above` to eliminate TOCTOU race - Extract `collect_headers_above` / `delete_blocks` helpers in MDBX backend to deduplicate `truncate_above_inner` and `drain_above_inner` - Remove redundant `drain_above` override in SQL backend (identical to default trait impl) - Fix stale doc comment on step 4 - Add cold storage assertions to drain tests Co-Authored-By: Claude Opus 4.6 --- crates/cold-mdbx/src/backend.rs | 151 ++++++++++++-------------------- crates/cold-sql/src/backend.rs | 17 ---- crates/storage/src/unified.rs | 15 ++-- crates/storage/tests/unified.rs | 2 + 4 files changed, 66 insertions(+), 119 deletions(-) diff --git a/crates/cold-mdbx/src/backend.rs b/crates/cold-mdbx/src/backend.rs index 57114aa..bcf7c28 100644 --- a/crates/cold-mdbx/src/backend.rs +++ b/crates/cold-mdbx/src/backend.rs @@ -501,55 +501,58 @@ impl MdbxColdBackend { Ok(()) } - fn truncate_above_inner(&self, block: BlockNumber) -> Result<(), MdbxColdError> { - let tx = self.env.tx_rw()?; + /// Collect all sealed headers above `block` from the given transaction. + fn collect_headers_above( + tx: &signet_hot_mdbx::Tx, + block: BlockNumber, + ) -> Result, MdbxColdError> { + let mut cursor = tx.new_cursor::()?; + let mut headers: Vec<(BlockNumber, SealedHeader)> = Vec::new(); - // Collect sealed headers above the cutoff - let headers_to_remove = { - let mut cursor = tx.new_cursor::()?; - let mut headers: Vec<(BlockNumber, SealedHeader)> = Vec::new(); + let start_block = block + 1; + let mut key_buf = [0u8; MAX_KEY_SIZE]; + let key_bytes = start_block.encode_key(&mut key_buf); - let start_block = block + 1; - let mut key_buf = [0u8; MAX_KEY_SIZE]; - let key_bytes = start_block.encode_key(&mut key_buf); + if let Some((key, value)) = cursor.lower_bound(key_bytes)? { + headers.push((BlockNumber::decode_key(&key)?, SealedHeader::decode_value(&value)?)); - if let Some((key, value)) = cursor.lower_bound(key_bytes)? { + while let Some((key, value)) = cursor.read_next()? { headers.push((BlockNumber::decode_key(&key)?, SealedHeader::decode_value(&value)?)); - - while let Some((key, value)) = cursor.read_next()? { - headers.push(( - BlockNumber::decode_key(&key)?, - SealedHeader::decode_value(&value)?, - )); - } } - headers - }; - - if headers_to_remove.is_empty() { - return Ok(()); } + Ok(headers) + } - // Delete each block's data - { - let mut tx_cursor = tx.traverse_dual::()?; - for (block_num, sealed) in &headers_to_remove { - // Delete transaction hash indices - for item in tx_cursor.iter_k2(block_num)? { - let (_, tx_signed) = item?; - tx.queue_delete::(tx_signed.hash())?; - } - - tx.queue_delete::(block_num)?; - tx.queue_delete::(&sealed.hash())?; - tx.clear_k1_for::(block_num)?; - tx.clear_k1_for::(block_num)?; - tx.clear_k1_for::(block_num)?; - tx.clear_k1_for::(block_num)?; - tx.queue_delete::(block_num)?; + /// Delete all data for the given blocks from the transaction. + fn delete_blocks( + tx: &signet_hot_mdbx::Tx, + headers: &[(BlockNumber, SealedHeader)], + ) -> Result<(), MdbxColdError> { + let mut tx_cursor = tx.traverse_dual::()?; + for (block_num, sealed) in headers { + for item in tx_cursor.iter_k2(block_num)? { + let (_, tx_signed) = item?; + tx.queue_delete::(tx_signed.hash())?; } + + tx.queue_delete::(block_num)?; + tx.queue_delete::(&sealed.hash())?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.queue_delete::(block_num)?; } + Ok(()) + } + fn truncate_above_inner(&self, block: BlockNumber) -> Result<(), MdbxColdError> { + let tx = self.env.tx_rw()?; + let headers = Self::collect_headers_above(&tx, block)?; + if headers.is_empty() { + return Ok(()); + } + Self::delete_blocks(&tx, &headers)?; tx.raw_commit()?; Ok(()) } @@ -559,65 +562,27 @@ impl MdbxColdBackend { block: BlockNumber, ) -> Result>, MdbxColdError> { let tx = self.env.tx_rw()?; - - // Collect sealed headers above the cutoff - let headers_to_remove = { - let mut cursor = tx.new_cursor::()?; - let mut headers: Vec<(BlockNumber, SealedHeader)> = Vec::new(); - - let start_block = block + 1; - let mut key_buf = [0u8; MAX_KEY_SIZE]; - let key_bytes = start_block.encode_key(&mut key_buf); - - if let Some((key, value)) = cursor.lower_bound(key_bytes)? { - headers.push((BlockNumber::decode_key(&key)?, SealedHeader::decode_value(&value)?)); - - while let Some((key, value)) = cursor.read_next()? { - headers.push(( - BlockNumber::decode_key(&key)?, - SealedHeader::decode_value(&value)?, - )); - } - } - headers - }; - - if headers_to_remove.is_empty() { + let headers = Self::collect_headers_above(&tx, block)?; + if headers.is_empty() { return Ok(Vec::new()); } - // Read receipts for each block, then delete - let mut all_receipts = Vec::with_capacity(headers_to_remove.len()); - { - let mut receipt_cursor = tx.traverse_dual::()?; - let mut tx_cursor = tx.traverse_dual::()?; - for (block_num, sealed) in &headers_to_remove { - // Collect receipts - let block_receipts: Vec = receipt_cursor - .iter_k2(block_num)? - .map(|item| { - let (idx, ir) = item?; - Ok::<_, MdbxColdError>(ColdReceipt::new(ir, sealed, idx)) - }) - .collect::>()?; - all_receipts.push(block_receipts); - - // Delete transaction hash indices - for item in tx_cursor.iter_k2(block_num)? { - let (_, tx_signed) = item?; - tx.queue_delete::(tx_signed.hash())?; - } - - tx.queue_delete::(block_num)?; - tx.queue_delete::(&sealed.hash())?; - tx.clear_k1_for::(block_num)?; - tx.clear_k1_for::(block_num)?; - tx.clear_k1_for::(block_num)?; - tx.clear_k1_for::(block_num)?; - tx.queue_delete::(block_num)?; - } + // Read receipts before deleting + let mut all_receipts = Vec::with_capacity(headers.len()); + let mut receipt_cursor = tx.traverse_dual::()?; + for (block_num, sealed) in &headers { + let block_receipts: Vec = receipt_cursor + .iter_k2(block_num)? + .map(|item| { + let (idx, ir) = item?; + Ok::<_, MdbxColdError>(ColdReceipt::new(ir, sealed, idx)) + }) + .collect::>()?; + all_receipts.push(block_receipts); } + drop(receipt_cursor); + Self::delete_blocks(&tx, &headers)?; tx.raw_commit()?; Ok(all_receipts) } diff --git a/crates/cold-sql/src/backend.rs b/crates/cold-sql/src/backend.rs index bfa7ffe..3921319 100644 --- a/crates/cold-sql/src/backend.rs +++ b/crates/cold-sql/src/backend.rs @@ -1396,23 +1396,6 @@ impl ColdStorage for SqlColdBackend { tx.commit().await.map_err(SqlColdError::from)?; Ok(()) } - - async fn drain_above(&self, block: BlockNumber) -> ColdResult>> { - // Read receipts then truncate, all within a single SQL transaction. - // We use the default trait logic against self (which will use the pool - // for reads), then truncate. For true atomicity under concurrent - // access the caller should ensure exclusive write access via the task - // runner, which processes writes sequentially. - let latest = self.get_latest_block().await?; - let mut all_receipts = Vec::new(); - if let Some(latest) = latest { - for n in (block + 1)..=latest { - all_receipts.push(self.get_receipts_in_block(n).await?); - } - } - self.truncate_above(block).await?; - Ok(all_receipts) - } } #[cfg(all(test, feature = "test-utils"))] diff --git a/crates/storage/src/unified.rs b/crates/storage/src/unified.rs index fbccd28..4a4898c 100644 --- a/crates/storage/src/unified.rs +++ b/crates/storage/src/unified.rs @@ -239,7 +239,7 @@ impl UnifiedStorage { /// 1. Reads headers from hot storage (sync) /// 2. Reads receipts from cold storage (async, best-effort) /// 3. Unwinds hot storage (sync) - /// 4. Dispatches truncate to cold storage (non-blocking) + /// 4. Drains cold storage (async, best-effort) /// /// # Cold Lag /// @@ -255,18 +255,15 @@ impl UnifiedStorage { /// [`Hot`]: crate::StorageError::Hot /// [`Cold`]: crate::StorageError::Cold pub async fn drain_above(&self, block: BlockNumber) -> StorageResult> { - // 1. Read headers above `block` from hot storage - let reader = self.reader()?; - let last = match reader.get_execution_range().map_err(|e| e.into_hot_kv_error())? { + // 1–2. Read headers then unwind hot storage in a single write tx + // to avoid TOCTOU races between reading and unwinding. + let writer = self.hot.writer()?; + let last = match writer.get_execution_range().map_err(|e| e.into_hot_kv_error())? { Some((_, last)) if last > block => last, _ => return Ok(Vec::new()), }; let headers = - reader.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?; - drop(reader); - - // 2. Unwind hot storage — if this fails, nothing is mutated - let writer = self.hot.writer()?; + writer.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?; writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?; writer.raw_commit().map_err(|e| e.into_hot_kv_error())?; diff --git a/crates/storage/tests/unified.rs b/crates/storage/tests/unified.rs index 42a179f..002157c 100644 --- a/crates/storage/tests/unified.rs +++ b/crates/storage/tests/unified.rs @@ -144,6 +144,7 @@ async fn drain_above_returns_headers_and_receipts() { // Verify hot tip is now block 0 assert_eq!(hot.reader().unwrap().get_chain_tip().unwrap().unwrap().0, 0); + assert_eq!(storage.cold().get_latest_block().await.unwrap().unwrap(), 0); cancel.cancel(); } @@ -164,6 +165,7 @@ async fn drain_above_empty_when_at_tip() { // Verify hot tip still 1 assert_eq!(hot.reader().unwrap().get_chain_tip().unwrap().unwrap().0, 1); + assert_eq!(storage.cold().get_latest_block().await.unwrap().unwrap(), 1); cancel.cancel(); }