Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.6.4"
version = "0.6.5"
edition = "2024"
rust-version = "1.92"
authors = ["init4"]
Expand Down Expand Up @@ -35,13 +35,13 @@ incremental = false

[workspace.dependencies]
# internal
signet-hot = { version = "0.6.4", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.6.4", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.6.4", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.6.4", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.6.4", path = "./crates/cold-sql" }
signet-storage = { version = "0.6.4", path = "./crates/storage" }
signet-storage-types = { version = "0.6.4", path = "./crates/types" }
signet-hot = { version = "0.6.5", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.6.5", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.6.5", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.6.5", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.6.5", path = "./crates/cold-sql" }
signet-storage = { version = "0.6.5", path = "./crates/storage" }
signet-storage-types = { version = "0.6.5", path = "./crates/types" }

# External, in-house
signet-libmdbx = { version = "0.8.0" }
Expand Down
111 changes: 74 additions & 37 deletions crates/cold-mdbx/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,57 +501,90 @@ impl MdbxColdBackend {
Ok(())
}

fn truncate_above_inner(&self, block: BlockNumber) -> Result<(), MdbxColdError> {
let tx = self.env.tx_rw()?;
/// Collect all sealed headers above `block` from the given transaction.
fn collect_headers_above(
tx: &signet_hot_mdbx::Tx<signet_libmdbx::Rw>,
block: BlockNumber,
) -> Result<Vec<(BlockNumber, SealedHeader)>, MdbxColdError> {
let mut cursor = tx.new_cursor::<ColdHeaders>()?;
let mut headers: Vec<(BlockNumber, SealedHeader)> = Vec::new();

// Collect sealed headers above the cutoff
let headers_to_remove = {
let mut cursor = tx.new_cursor::<ColdHeaders>()?;
let mut headers: Vec<(BlockNumber, SealedHeader)> = Vec::new();
let start_block = block + 1;
let mut key_buf = [0u8; MAX_KEY_SIZE];
let key_bytes = start_block.encode_key(&mut key_buf);

let start_block = block + 1;
let mut key_buf = [0u8; MAX_KEY_SIZE];
let key_bytes = start_block.encode_key(&mut key_buf);
if let Some((key, value)) = cursor.lower_bound(key_bytes)? {
headers.push((BlockNumber::decode_key(&key)?, SealedHeader::decode_value(&value)?));

if let Some((key, value)) = cursor.lower_bound(key_bytes)? {
while let Some((key, value)) = cursor.read_next()? {
headers.push((BlockNumber::decode_key(&key)?, SealedHeader::decode_value(&value)?));
}
}
Ok(headers)
}

while let Some((key, value)) = cursor.read_next()? {
headers.push((
BlockNumber::decode_key(&key)?,
SealedHeader::decode_value(&value)?,
));
}
/// Delete all data for the given blocks from the transaction.
fn delete_blocks(
tx: &signet_hot_mdbx::Tx<signet_libmdbx::Rw>,
headers: &[(BlockNumber, SealedHeader)],
) -> Result<(), MdbxColdError> {
let mut tx_cursor = tx.traverse_dual::<ColdTransactions>()?;
for (block_num, sealed) in headers {
for item in tx_cursor.iter_k2(block_num)? {
let (_, tx_signed) = item?;
tx.queue_delete::<ColdTxHashIndex>(tx_signed.hash())?;
}
headers
};

if headers_to_remove.is_empty() {
tx.queue_delete::<ColdHeaders>(block_num)?;
tx.queue_delete::<ColdBlockHashIndex>(&sealed.hash())?;
tx.clear_k1_for::<ColdTransactions>(block_num)?;
tx.clear_k1_for::<ColdTxSenders>(block_num)?;
tx.clear_k1_for::<ColdReceipts>(block_num)?;
tx.clear_k1_for::<ColdSignetEvents>(block_num)?;
tx.queue_delete::<ColdZenithHeaders>(block_num)?;
}
Ok(())
}

fn truncate_above_inner(&self, block: BlockNumber) -> Result<(), MdbxColdError> {
let tx = self.env.tx_rw()?;
let headers = Self::collect_headers_above(&tx, block)?;
if headers.is_empty() {
return Ok(());
}
Self::delete_blocks(&tx, &headers)?;
tx.raw_commit()?;
Ok(())
}

// Delete each block's data
{
let mut tx_cursor = tx.traverse_dual::<ColdTransactions>()?;
for (block_num, sealed) in &headers_to_remove {
// Delete transaction hash indices
for item in tx_cursor.iter_k2(block_num)? {
let (_, tx_signed) = item?;
tx.queue_delete::<ColdTxHashIndex>(tx_signed.hash())?;
}
fn drain_above_inner(
&self,
block: BlockNumber,
) -> Result<Vec<Vec<ColdReceipt>>, MdbxColdError> {
let tx = self.env.tx_rw()?;
let headers = Self::collect_headers_above(&tx, block)?;
if headers.is_empty() {
return Ok(Vec::new());
}

tx.queue_delete::<ColdHeaders>(block_num)?;
tx.queue_delete::<ColdBlockHashIndex>(&sealed.hash())?;
tx.clear_k1_for::<ColdTransactions>(block_num)?;
tx.clear_k1_for::<ColdTxSenders>(block_num)?;
tx.clear_k1_for::<ColdReceipts>(block_num)?;
tx.clear_k1_for::<ColdSignetEvents>(block_num)?;
tx.queue_delete::<ColdZenithHeaders>(block_num)?;
}
// Read receipts before deleting
let mut all_receipts = Vec::with_capacity(headers.len());
let mut receipt_cursor = tx.traverse_dual::<ColdReceipts>()?;
for (block_num, sealed) in &headers {
let block_receipts: Vec<ColdReceipt> = receipt_cursor
.iter_k2(block_num)?
.map(|item| {
let (idx, ir) = item?;
Ok::<_, MdbxColdError>(ColdReceipt::new(ir, sealed, idx))
})
.collect::<Result<_, _>>()?;
all_receipts.push(block_receipts);
}
drop(receipt_cursor);

Self::delete_blocks(&tx, &headers)?;
tx.raw_commit()?;
Ok(())
Ok(all_receipts)
}

fn get_logs_inner(
Expand Down Expand Up @@ -736,6 +769,10 @@ impl ColdStorage for MdbxColdBackend {
async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
Ok(self.truncate_above_inner(block)?)
}

async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
Ok(self.drain_above_inner(block)?)
}
}

#[cfg(all(test, feature = "test-utils"))]
Expand Down
29 changes: 29 additions & 0 deletions crates/cold/src/conformance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub async fn conformance<B: ColdStorage>(backend: B) -> ColdResult<()> {
test_cold_receipt_metadata(&handle).await?;
test_get_logs(&handle).await?;
test_stream_logs(&handle).await?;
test_drain_above(&handle).await?;
cancel.cancel();
Ok(())
}
Expand Down Expand Up @@ -608,3 +609,31 @@ pub async fn test_stream_logs(handle: &ColdStorageHandle) -> ColdResult<()> {

Ok(())
}

/// Test drain_above: reads receipts and truncates atomically.
pub async fn test_drain_above(handle: &ColdStorageHandle) -> ColdResult<()> {
// Append 3 blocks with txs (use block numbers that don't collide)
handle.append_block(make_test_block_with_txs(900, 2)).await?;
handle.append_block(make_test_block_with_txs(901, 3)).await?;
handle.append_block(make_test_block_with_txs(902, 1)).await?;

// Drain above block 900 — should return receipts for 901 and 902
let drained = handle.drain_above(900).await?;
assert_eq!(drained.len(), 2);
assert_eq!(drained[0].len(), 3); // block 901 had 3 txs
assert_eq!(drained[1].len(), 1); // block 902 had 1 tx

// Blocks 901 and 902 should be gone
assert!(handle.get_header(HeaderSpecifier::Number(901)).await?.is_none());
assert!(handle.get_header(HeaderSpecifier::Number(902)).await?.is_none());

// Block 900 should still exist
assert!(handle.get_header(HeaderSpecifier::Number(900)).await?.is_some());
assert_eq!(handle.get_latest_block().await?, Some(900));

// Drain with nothing above — should return empty
let empty = handle.drain_above(900).await?;
assert!(empty.is_empty());

Ok(())
}
66 changes: 48 additions & 18 deletions crates/cold/src/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ impl MemColdBackendInner {
fn confirmation_meta(&self, block: BlockNumber, index: u64) -> Option<ConfirmationMeta> {
self.headers.get(&block).map(|h| ConfirmationMeta::new(block, h.hash(), index))
}

/// Remove all data for blocks above `block`.
fn truncate_above(&mut self, block: BlockNumber) {
let to_remove: Vec<_> = self.headers.range((block + 1)..).map(|(k, _)| *k).collect();
for k in &to_remove {
if let Some(sealed) = self.headers.remove(k) {
self.header_hashes.remove(&sealed.hash());
}
if let Some(txs) = self.transactions.remove(k) {
for tx in txs {
self.tx_hashes.remove(tx.hash());
}
}
if self.receipts.remove(k).is_some() {
self.receipt_tx_hashes.retain(|_, (b, _)| *b <= block);
}
self.signet_events.remove(k);
self.zenith_headers.remove(k);
}
}
}

impl ColdStorage for MemColdBackend {
Expand Down Expand Up @@ -312,27 +332,37 @@ impl ColdStorage for MemColdBackend {

async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
let mut inner = self.inner.write().await;
inner.truncate_above(block);
Ok(())
}

// Collect keys to remove
let to_remove: Vec<_> = inner.headers.range((block + 1)..).map(|(k, _)| *k).collect();
async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
let mut inner = self.inner.write().await;

for k in &to_remove {
if let Some(sealed) = inner.headers.remove(k) {
inner.header_hashes.remove(&sealed.hash());
}
if let Some(txs) = inner.transactions.remove(k) {
for tx in txs {
inner.tx_hashes.remove(tx.hash());
}
}
if inner.receipts.remove(k).is_some() {
inner.receipt_tx_hashes.retain(|_, (b, _)| *b <= block);
}
inner.signet_events.remove(k);
inner.zenith_headers.remove(k);
}
// Collect receipts for blocks above `block` in ascending order
let blocks_above: Vec<_> = inner.headers.range((block + 1)..).map(|(k, _)| *k).collect();
let all_receipts = blocks_above
.iter()
.map(|&block_num| {
let Some(header) = inner.headers.get(&block_num) else {
return Vec::new();
};
inner
.receipts
.get(&block_num)
.map(|receipts| {
receipts
.iter()
.enumerate()
.map(|(idx, ir)| ColdReceipt::new(ir.clone(), header, idx as u64))
.collect()
})
.unwrap_or_default()
})
.collect();

Ok(())
inner.truncate_above(block);
Ok(all_receipts)
}
}

Expand Down
7 changes: 7 additions & 0 deletions crates/cold/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,11 @@ pub enum ColdWriteRequest {
/// The response channel.
resp: Responder<()>,
},
/// Read receipts and truncate all data above the given block.
DrainAbove {
/// The block number to drain above.
block: BlockNumber,
/// The response channel.
resp: Responder<Vec<Vec<ColdReceipt>>>,
},
}
9 changes: 9 additions & 0 deletions crates/cold/src/task/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,15 @@ impl ColdStorageHandle {
.map_err(map_dispatch_error)
}

/// Read and remove all blocks above the given block number.
///
/// Returns receipts for each block above `block` in ascending order,
/// then truncates. Index 0 = block+1, index 1 = block+2, etc.
pub async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
let (resp, rx) = oneshot::channel();
self.send_write(ColdWriteRequest::DrainAbove { block, resp }, rx).await
}

/// Dispatch truncate without waiting for response (non-blocking).
///
/// Unlike [`truncate_above`](Self::truncate_above), this method returns
Expand Down
7 changes: 7 additions & 0 deletions crates/cold/src/task/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ impl<B: ColdStorage> ColdStorageTaskInner<B> {
}
let _ = resp.send(result);
}
ColdWriteRequest::DrainAbove { block, resp } => {
let result = self.backend.drain_above(block).await;
if result.is_ok() {
self.cache.lock().await.invalidate_above(block);
}
let _ = resp.send(result);
}
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions crates/cold/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,30 @@ pub trait ColdStorage: Send + Sync + 'static {
///
/// This removes block N+1 and higher from all tables. Used for reorg handling.
fn truncate_above(&self, block: BlockNumber) -> impl Future<Output = ColdResult<()>> + Send;

/// Read and remove all blocks above the given block number.
///
/// Returns receipts for each block above `block` in ascending order,
/// then truncates. Index 0 = block+1, index 1 = block+2, etc.
/// Blocks with no receipts have empty vecs.
///
/// The default implementation composes `get_latest_block` +
/// `get_receipts_in_block` + `truncate_above`. It is correct but
/// not atomic. Backends should override with an atomic version
/// when possible.
fn drain_above(
&self,
block: BlockNumber,
) -> impl Future<Output = ColdResult<Vec<Vec<ColdReceipt>>>> + Send {
async move {
let mut all_receipts = Vec::new();
if let Some(latest) = self.get_latest_block().await? {
for n in (block + 1)..=latest {
all_receipts.push(self.get_receipts_in_block(n).await?);
}
}
self.truncate_above(block).await?;
Ok(all_receipts)
}
}
}
2 changes: 1 addition & 1 deletion crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub mod either;
pub use either::Either;

mod unified;
pub use unified::UnifiedStorage;
pub use unified::{DrainedBlock, UnifiedStorage};

// Re-export connector traits
pub use signet_cold::ColdConnect;
Expand Down
Loading