Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions forester-utils/src/address_staging_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl AddressStagingTree {
low_element_next_values: &[[u8; 32]],
low_element_indices: &[u64],
low_element_next_indices: &[u64],
low_element_proofs: &[Vec<[u8; 32]>],
low_element_proofs: &[[[u8; 32]; HEIGHT]],
leaves_hashchain: [u8; 32],
zkp_batch_size: usize,
epoch: u64,
Expand All @@ -145,15 +145,12 @@ impl AddressStagingTree {
let inputs = get_batch_address_append_circuit_inputs::<HEIGHT>(
next_index,
old_root,
low_element_values.to_vec(),
low_element_next_values.to_vec(),
low_element_indices.iter().map(|v| *v as usize).collect(),
low_element_next_indices
.iter()
.map(|v| *v as usize)
.collect(),
low_element_proofs.to_vec(),
addresses.to_vec(),
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
addresses,
&mut self.sparse_tree,
leaves_hashchain,
zkp_batch_size,
Expand Down
63 changes: 49 additions & 14 deletions forester/src/processor/v2/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use light_client::{
indexer::{AddressQueueData, Indexer, QueueElementsV2Options, StateQueueData},
rpc::Rpc,
};
use light_hasher::hash_chain::create_hash_chain_from_slice;

use crate::processor::v2::{common::clamp_to_u16, BatchContext};

Expand All @@ -22,6 +23,17 @@ pub(crate) fn lock_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> Mu
}
}

#[derive(Debug, Clone)]
pub struct AddressBatchSnapshot<const HEIGHT: usize> {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
pub low_element_proofs: Vec<[[u8; 32]; HEIGHT]>,
pub leaves_hashchain: [u8; 32],
}

pub async fn fetch_zkp_batch_size<R: Rpc>(context: &BatchContext<R>) -> crate::Result<u64> {
let rpc = context.rpc_pool.get_connection().await?;
let mut account = rpc
Expand Down Expand Up @@ -474,20 +486,52 @@ impl StreamingAddressQueue {
}
}

pub fn get_batch_data(&self, start: usize, end: usize) -> Option<BatchDataSlice> {
pub fn get_batch_snapshot<const HEIGHT: usize>(
&self,
start: usize,
end: usize,
hashchain_idx: usize,
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
let available = self.wait_for_batch(end);
if start >= available {
return None;
return Ok(None);
}
let actual_end = end.min(available);
let data = lock_recover(&self.data, "streaming_address_queue.data");
Some(BatchDataSlice {
addresses: data.addresses[start..actual_end].to_vec(),

let addresses = data.addresses[start..actual_end].to_vec();
if addresses.is_empty() {
return Err(anyhow!("Empty batch at start={}", start));
}

let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
Some(hashchain) => hashchain,
None => {
tracing::debug!(
"Missing leaves_hash_chain for batch {} (available: {}), deriving from addresses",
hashchain_idx,
data.leaves_hash_chains.len()
);
create_hash_chain_from_slice(&addresses).map_err(|error| {
anyhow!(
"Failed to derive leaves_hash_chain for batch {} from {} addresses: {}",
hashchain_idx,
addresses.len(),
error
)
})?
}
};

Ok(Some(AddressBatchSnapshot {
low_element_values: data.low_element_values[start..actual_end].to_vec(),
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
})
low_element_proofs: data.reconstruct_proofs::<HEIGHT>(start..actual_end)?,
addresses,
leaves_hashchain,
}))
Comment on lines +489 to +534
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject truncated snapshots before constructing proofs.

Line 499 truncates to actual_end = end.min(available), so this helper can return a short batch when wait_for_batch() times out or the fetch completes with fewer than end elements. The caller then feeds addresses.len() into process_batch, which turns a slow fetch or partial tail into an unsupported address-append proof size instead of a clean "not ready yet" result. The same path also slices low_element_* vectors without checking they reach actual_end, so malformed indexer data would panic the worker.

Return Ok(None) (or a specific incomplete-batch error) unless available >= end, and validate the sibling vector lengths before slicing. Based on learnings: ZKP batch sizes for address trees must be 10 or 250, and tree updates require batch proofs with the batch hash chain as a public input.

🩹 Suggested guardrails
     let available = self.wait_for_batch(end);
-    if start >= available {
+    if start >= available || available < end {
         return Ok(None);
     }
-    let actual_end = end.min(available);
+    let actual_end = end;
     let data = lock_recover(&self.data, "streaming_address_queue.data");
+
+    for (name, len) in [
+        ("low_element_values", data.low_element_values.len()),
+        ("low_element_next_values", data.low_element_next_values.len()),
+        ("low_element_indices", data.low_element_indices.len()),
+        ("low_element_next_indices", data.low_element_next_indices.len()),
+    ] {
+        if len < actual_end {
+            return Err(anyhow!(
+                "Incomplete address batch metadata: {} has {}, need {}",
+                name,
+                len,
+                actual_end
+            ));
+        }
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v2/helpers.rs` around lines 489 - 534, In
get_batch_snapshot: reject truncated snapshots early by changing the available
check to return Ok(None) unless available >= end (i.e., if
self.wait_for_batch(end) < end -> Ok(None)) so we never construct proofs for
partial batches; before slicing validate that data.low_element_values,
low_element_next_values, low_element_indices, low_element_next_indices and
data.addresses have length >= actual_end and that
data.reconstruct_proofs::<HEIGHT>(start..actual_end) can be produced (otherwise
return an error like "incomplete batch data"); keep the existing
leaves_hash_chains fallback logic but do not proceed to build
AddressBatchSnapshot when any of these sibling/vector length checks fail.

}

pub fn into_data(self) -> AddressQueueData {
Expand Down Expand Up @@ -553,15 +597,6 @@ impl StreamingAddressQueue {
}
}

#[derive(Debug, Clone)]
pub struct BatchDataSlice {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
}

pub async fn fetch_streaming_address_batches<R: Rpc + 'static>(
context: &BatchContext<R>,
total_elements: u64,
Expand Down
79 changes: 32 additions & 47 deletions forester/src/processor/v2/strategy/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use tracing::{debug, info, instrument};

use crate::processor::v2::{
batch_job_builder::BatchJobBuilder,
common::get_leaves_hashchain,
errors::V2Error,
helpers::{
fetch_address_zkp_batch_size, fetch_onchain_address_root, fetch_streaming_address_batches,
lock_recover, StreamingAddressQueue,
AddressBatchSnapshot, StreamingAddressQueue,
},
proof_worker::ProofInput,
root_guard::{reconcile_alignment, AlignmentDecision},
Expand Down Expand Up @@ -267,9 +266,23 @@ impl BatchJobBuilder for AddressQueueData {

let batch_end = start + zkp_batch_size_usize;

let batch_data = self
.streaming_queue
.get_batch_data(start, batch_end)
let streaming_queue = &self.streaming_queue;
let staging_tree = &mut self.staging_tree;
let hashchain_idx = start / zkp_batch_size_usize;
let AddressBatchSnapshot {
addresses,
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
leaves_hashchain,
} = streaming_queue
.get_batch_snapshot::<{ DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize }>(
start,
batch_end,
hashchain_idx,
)?
.ok_or_else(|| {
anyhow!(
"Batch data not available: start={}, end={}, available={}",
Expand All @@ -278,31 +291,21 @@ impl BatchJobBuilder for AddressQueueData {
self.streaming_queue.available_batches() * zkp_batch_size_usize
)
})?;

let addresses = &batch_data.addresses;
let zkp_batch_size_actual = addresses.len();

if zkp_batch_size_actual == 0 {
return Err(anyhow!("Empty batch at start={}", start));
}

let low_element_values = &batch_data.low_element_values;
let low_element_next_values = &batch_data.low_element_next_values;
let low_element_indices = &batch_data.low_element_indices;
let low_element_next_indices = &batch_data.low_element_next_indices;

let low_element_proofs: Vec<Vec<[u8; 32]>> = {
let data = lock_recover(self.streaming_queue.data.as_ref(), "streaming_queue.data");
(start..start + zkp_batch_size_actual)
.map(|i| data.reconstruct_proof(i, DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as u8))
.collect::<Result<Vec<_>, _>>()?
};

let hashchain_idx = start / zkp_batch_size_usize;
let leaves_hashchain = {
let data = lock_recover(self.streaming_queue.data.as_ref(), "streaming_queue.data");
get_leaves_hashchain(&data.leaves_hash_chains, hashchain_idx)?
};
let result = staging_tree
.process_batch(
&addresses,
&low_element_values,
&low_element_next_values,
&low_element_indices,
&low_element_next_indices,
&low_element_proofs,
leaves_hashchain,
zkp_batch_size_actual,
epoch,
tree,
)
.map_err(|err| map_address_staging_error(tree, err))?;

let tree_batch = tree_next_index / zkp_batch_size_usize;
let absolute_index = data_start + start;
Expand All @@ -318,24 +321,6 @@ impl BatchJobBuilder for AddressQueueData {
self.streaming_queue.is_complete()
);

let result = self.staging_tree.process_batch(
addresses,
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
&low_element_proofs,
leaves_hashchain,
zkp_batch_size_actual,
epoch,
tree,
);

let result = match result {
Ok(r) => r,
Err(err) => return Err(map_address_staging_error(tree, err)),
};

Ok(Some((
ProofInput::AddressAppend(result.circuit_inputs),
result.new_root,
Expand Down
Loading
Loading