Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions forester-utils/src/address_staging_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl AddressStagingTree {
low_element_next_values: &[[u8; 32]],
low_element_indices: &[u64],
low_element_next_indices: &[u64],
low_element_proofs: &[Vec<[u8; 32]>],
low_element_proofs: &[[[u8; 32]; HEIGHT]],
leaves_hashchain: [u8; 32],
zkp_batch_size: usize,
epoch: u64,
Expand All @@ -145,15 +145,12 @@ impl AddressStagingTree {
let inputs = get_batch_address_append_circuit_inputs::<HEIGHT>(
next_index,
old_root,
low_element_values.to_vec(),
low_element_next_values.to_vec(),
low_element_indices.iter().map(|v| *v as usize).collect(),
low_element_next_indices
.iter()
.map(|v| *v as usize)
.collect(),
low_element_proofs.to_vec(),
addresses.to_vec(),
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
addresses,
&mut self.sparse_tree,
leaves_hashchain,
zkp_batch_size,
Expand Down
63 changes: 49 additions & 14 deletions forester/src/processor/v2/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use light_client::{
indexer::{AddressQueueData, Indexer, QueueElementsV2Options, StateQueueData},
rpc::Rpc,
};
use light_hasher::hash_chain::create_hash_chain_from_slice;

use crate::processor::v2::{common::clamp_to_u16, BatchContext};

Expand All @@ -22,6 +23,17 @@ pub(crate) fn lock_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> Mu
}
}

#[derive(Debug, Clone)]
pub struct AddressBatchSnapshot<const HEIGHT: usize> {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
pub low_element_proofs: Vec<[[u8; 32]; HEIGHT]>,
pub leaves_hashchain: [u8; 32],
}

pub async fn fetch_zkp_batch_size<R: Rpc>(context: &BatchContext<R>) -> crate::Result<u64> {
let rpc = context.rpc_pool.get_connection().await?;
let mut account = rpc
Expand Down Expand Up @@ -474,20 +486,52 @@ impl StreamingAddressQueue {
}
}

pub fn get_batch_data(&self, start: usize, end: usize) -> Option<BatchDataSlice> {
pub fn get_batch_snapshot<const HEIGHT: usize>(
&self,
start: usize,
end: usize,
hashchain_idx: usize,
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
let available = self.wait_for_batch(end);
if start >= available {
return None;
return Ok(None);
}
let actual_end = end.min(available);
let data = lock_recover(&self.data, "streaming_address_queue.data");
Some(BatchDataSlice {
addresses: data.addresses[start..actual_end].to_vec(),

let addresses = data.addresses[start..actual_end].to_vec();
if addresses.is_empty() {
return Err(anyhow!("Empty batch at start={}", start));
}

let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
Some(hashchain) => hashchain,
None => {
tracing::debug!(
"Missing leaves_hash_chain for batch {} (available: {}), deriving from addresses",
hashchain_idx,
data.leaves_hash_chains.len()
);
create_hash_chain_from_slice(&addresses).map_err(|error| {
anyhow!(
"Failed to derive leaves_hash_chain for batch {} from {} addresses: {}",
hashchain_idx,
addresses.len(),
error
)
})?
}
};

Ok(Some(AddressBatchSnapshot {
low_element_values: data.low_element_values[start..actual_end].to_vec(),
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
})
low_element_proofs: data.reconstruct_proofs::<HEIGHT>(start..actual_end)?,
addresses,
leaves_hashchain,
}))
}

pub fn into_data(self) -> AddressQueueData {
Expand Down Expand Up @@ -553,15 +597,6 @@ impl StreamingAddressQueue {
}
}

#[derive(Debug, Clone)]
pub struct BatchDataSlice {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
}

pub async fn fetch_streaming_address_batches<R: Rpc + 'static>(
context: &BatchContext<R>,
total_elements: u64,
Expand Down
79 changes: 32 additions & 47 deletions forester/src/processor/v2/strategy/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use tracing::{debug, info, instrument};

use crate::processor::v2::{
batch_job_builder::BatchJobBuilder,
common::get_leaves_hashchain,
errors::V2Error,
helpers::{
fetch_address_zkp_batch_size, fetch_onchain_address_root, fetch_streaming_address_batches,
lock_recover, StreamingAddressQueue,
AddressBatchSnapshot, StreamingAddressQueue,
},
proof_worker::ProofInput,
root_guard::{reconcile_alignment, AlignmentDecision},
Expand Down Expand Up @@ -267,9 +266,23 @@ impl BatchJobBuilder for AddressQueueData {

let batch_end = start + zkp_batch_size_usize;

let batch_data = self
.streaming_queue
.get_batch_data(start, batch_end)
let streaming_queue = &self.streaming_queue;
let staging_tree = &mut self.staging_tree;
let hashchain_idx = start / zkp_batch_size_usize;
let AddressBatchSnapshot {
addresses,
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
leaves_hashchain,
} = streaming_queue
.get_batch_snapshot::<{ DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize }>(
start,
batch_end,
hashchain_idx,
)?
.ok_or_else(|| {
anyhow!(
"Batch data not available: start={}, end={}, available={}",
Expand All @@ -278,31 +291,21 @@ impl BatchJobBuilder for AddressQueueData {
self.streaming_queue.available_batches() * zkp_batch_size_usize
)
})?;

let addresses = &batch_data.addresses;
let zkp_batch_size_actual = addresses.len();

if zkp_batch_size_actual == 0 {
return Err(anyhow!("Empty batch at start={}", start));
}

let low_element_values = &batch_data.low_element_values;
let low_element_next_values = &batch_data.low_element_next_values;
let low_element_indices = &batch_data.low_element_indices;
let low_element_next_indices = &batch_data.low_element_next_indices;

let low_element_proofs: Vec<Vec<[u8; 32]>> = {
let data = lock_recover(self.streaming_queue.data.as_ref(), "streaming_queue.data");
(start..start + zkp_batch_size_actual)
.map(|i| data.reconstruct_proof(i, DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as u8))
.collect::<Result<Vec<_>, _>>()?
};

let hashchain_idx = start / zkp_batch_size_usize;
let leaves_hashchain = {
let data = lock_recover(self.streaming_queue.data.as_ref(), "streaming_queue.data");
get_leaves_hashchain(&data.leaves_hash_chains, hashchain_idx)?
};
let result = staging_tree
.process_batch(
&addresses,
&low_element_values,
&low_element_next_values,
&low_element_indices,
&low_element_next_indices,
&low_element_proofs,
leaves_hashchain,
zkp_batch_size_actual,
epoch,
tree,
)
.map_err(|err| map_address_staging_error(tree, err))?;

let tree_batch = tree_next_index / zkp_batch_size_usize;
let absolute_index = data_start + start;
Expand All @@ -318,24 +321,6 @@ impl BatchJobBuilder for AddressQueueData {
self.streaming_queue.is_complete()
);

let result = self.staging_tree.process_batch(
addresses,
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
&low_element_proofs,
leaves_hashchain,
zkp_batch_size_actual,
epoch,
tree,
);

let result = match result {
Ok(r) => r,
Err(err) => return Err(map_address_staging_error(tree, err)),
};

Ok(Some((
ProofInput::AddressAppend(result.circuit_inputs),
result.new_root,
Expand Down
Loading
Loading