From 926288a2374816df0794e91c4344d30524e5188b Mon Sep 17 00:00:00 2001 From: numinex Date: Mon, 9 Mar 2026 13:09:04 +0100 Subject: [PATCH 1/6] temp --- .../PARTITION_JOURNAL2_STORAGE_PLAN.md | 141 ++++++++++++++ core/partitions/src/journal.rs | 182 ++++++++++++++++-- 2 files changed, 312 insertions(+), 11 deletions(-) create mode 100644 core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md diff --git a/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md new file mode 100644 index 0000000000..ec42f4d7d6 --- /dev/null +++ b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md @@ -0,0 +1,141 @@ +# PartitionJournal2 Storage Proposal + +## Objective +Use `journal::Storage` as the actual backing store for serialized prepare entries (`Bytes`) and decode to `Message` on read. + +## Current Problem +- `PartitionJournal2Impl` currently stores entries directly in: + - `UnsafeCell>>` +- `Noop` storage is unused for real data. +- The old `Buffer = Entry` idea is too rigid for this path. +- `Storage::read(&self, offset, buffer)` still requires a fallback buffer argument. + +## Design Direction + +### 1. Use serialized buffer (`Bytes`) in storage +For `PartitionJournal2`, enforce: + +```rust +S: Storage +``` + +Journal entry remains: + +```rust +type Entry = Message; +``` + +Conversion boundary: +- write path: `Message -> Bytes` (serialize/store) +- read path: `Bytes -> Message` via `Message::from_bytes` + +### 2. Replace `Noop` with in-memory prepare-message storage +Introduce a dedicated storage: + +```rust +pub struct InMemoryPrepareStorage { + entries: UnsafeCell>, +} +``` + +Behavior: +- `write(bytes)` appends serialized message bytes to `entries`. +- `read(offset, ...)` treats `offset` as `op_number` and returns that op entry. + +This keeps storage raw and simple, while typed decoding happens at journal boundary. + +### 2.1 `offset` semantics for this journal: `offset == op_number` +For `PartitionJournal2`, define: +- `Storage::read(offset, ...)` where `offset` is VSR `PrepareHeader.op` (op number), not byte offset. +- Journal append path stores entries in op order, so op lookup is O(1)-ish via index map (or direct vec index if contiguous). + +Implementation detail: +- Maintain `op_to_index: HashMap` (or rely on contiguous `op` if guaranteed). +- On `append(entry)`, cache `op_to_index.insert(entry.header().op, vec_index)`. +- On `entry(header)`, call storage read using `header.op as usize` or map-resolved index. + +### 3. Make `PartitionJournal2Impl` storage-backed +Refactor `PartitionJournal2Impl` to own a storage instance: + +```rust +pub struct PartitionJournal2Impl> { + storage: S, + // metadata/indexes only + headers: UnsafeCell>, + message_offset_to_op: UnsafeCell>, + timestamp_to_op: UnsafeCell>, +} +``` + +Responsibilities split: +- `storage`: source of truth for serialized entry bytes +- `headers`: lightweight header cache to satisfy `header()` / `previous_header()` reference semantics +- maps: query acceleration for `get` + +### 4. Journal method behavior with storage +- `append(entry)`: + - decode send-messages info for maps + - serialize: `let bytes = entry.into_inner()` (or equivalent) + - `storage.write(bytes)` + - push `*entry.header()` into `headers` + - cache `op_number -> storage position` +- `entry(header)`: + - resolve by `op_number` (from `header.op`) + - fetch bytes from storage via read + - decode immediately: `Message::::from_bytes(bytes)` +- `header(idx)` and `previous_header(header)`: + - use `headers` vector (no full entry decode needed) + +### 5. `get` path stays batch-conversion based +`get` still performs: +- collect candidate bytes from storage +- decode into `Message` +- convert `Vec> -> IggyMessagesBatchSet` +- apply `MessageLookup` filter (`get_by_offset` / `get_by_timestamp`) + +The existing conversion helpers remain valid. + +## Storage Trait Consideration + +Current trait: + +```rust +fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future; +``` + +For typed object lookups this is awkward. Recommended adjustment: + +```rust +fn read(&self, offset: usize) -> impl Future>; +``` + +Why: +- avoids fake fallback buffer construction +- maps naturally to indexed storage +- still works for byte storage by returning `Option` + +If trait-wide change is too large right now, keep current signature temporarily and ignore/use the incoming `buffer` only as a fallback for out-of-range reads. + +For this phase, no trait change is required: just interpret the existing `offset` argument as `op_number` in `PartitionJournal2` storage. + +## Proposed File-Level Changes +- `core/partitions/src/journal.rs` + - add `InMemoryPrepareStorage` + - make `PartitionJournal2Impl` generic over storage (or concrete to `InMemoryPrepareStorage`) + - remove direct `inner.set: Vec>` as primary store + - keep lightweight header metadata cache + - keep/adjust current lookup maps +- `core/partitions/src/iggy_partition.rs` (only if/when wiring `PartitionJournal2` into partition log) + - replace `Noop` for this path with `InMemoryPrepareStorage` + +## Migration Sequence (Low Risk) +1. Add `InMemoryPrepareStorage` without removing existing fields. +2. Mirror writes to both old `inner.set` and storage. +3. Switch reads (`entry`, `get`) to storage-backed path. +4. Remove old `inner.set` once parity is confirmed. +5. Optionally evolve `Storage::read` signature in a separate PR. + +## Expected Outcome +- `Storage` is no longer a no-op for `PartitionJournal2`. +- Storage buffer is raw serialized data (`Bytes`), and decoding to `Message` happens at read boundary. +- The in-memory backend stays simple (`UnsafeCell>`) and aligned with your proposed flow. diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index ab281e940d..5f964b46b0 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,9 +15,14 @@ // specific language governing permissions and limitations // under the License. -use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet}; +use bytes::BytesMut; +use iggy_common::{ + INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, PooledBuffer, + header::{Operation, PrepareHeader}, + message::Message, +}; use journal::{Journal, Storage}; -use std::cell::UnsafeCell; +use std::{cell::UnsafeCell, collections::HashMap}; // TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. /// No-op storage backend for the in-memory partition journal. @@ -31,7 +36,7 @@ impl Storage for Noop { 0 } - async fn read(&self, _offset: usize, _buffer: ()) {} + async fn read(&self, _offset: usize, _buffer: ()) -> () { ()} } /// Lookup key for querying messages from the journal. @@ -49,14 +54,7 @@ impl std::ops::Deref for MessageLookup { } } -/// In-memory journal that accumulates message batches as an `IggyMessagesBatchSet`. -/// -/// This is a pure storage layer — it holds batches and supports lookups via -/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives -/// outside the journal in the `SegmentedLog`'s `JournalInfo`. -/// -/// Uses `UnsafeCell` for interior mutability, matching the single-threaded -/// per-shard execution model. +// [LEGACY] pub struct PartitionJournal { batch_set: UnsafeCell, } @@ -92,6 +90,168 @@ impl std::fmt::Debug for PartitionJournal { } } +pub trait PartitionJournal2: Journal +where + S: Storage, +{ + type Query; + + fn get(&self, query: &Self::Query) -> impl Future>; +} + +pub struct PartitionJournal2Impl { + message_offset_to_op: UnsafeCell>, + timestamp_to_op: UnsafeCell>, + inner: UnsafeCell, +} + +struct JournalInner { + set: Vec>, +} + +impl Default for PartitionJournal2Impl { + fn default() -> Self { + Self { + message_offset_to_op: UnsafeCell::new(HashMap::new()), + timestamp_to_op: UnsafeCell::new(HashMap::new()), + inner: UnsafeCell::new(JournalInner { set: Vec::new() }), + } + } +} + +impl PartitionJournal2Impl { + fn decode_send_messages_batch(body: bytes::Bytes) -> Option { + // TODO: This is bad, + let mut body = body + .try_into_mut() + .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + + if body.len() < 4 { + return None; + } + + let count_bytes = body.split_to(4); + let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); + let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + + if body.len() < indexes_len { + return None; + } + + let indexes_bytes = body.split_to(indexes_len); + let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); + let messages = PooledBuffer::from(body); + + Some(IggyMessagesBatchMut::from_indexes_and_messages( + indexes, messages, + )) + } + + fn message_to_batch(message: &Message) -> Option { + if message.header().operation != Operation::SendMessages { + return None; + } + + Self::decode_send_messages_batch(message.body_bytes()) + } + + fn messages_to_batch_set<'a>(messages: impl Iterator>) -> IggyMessagesBatchSet { + let mut batch_set = IggyMessagesBatchSet::empty(); + + for message in messages { + if let Some(batch) = Self::message_to_batch(message) { + batch_set.add_batch(batch); + } + } + + batch_set + } + + fn candidate_start_op(&self, query: &MessageLookup) -> usize { + match query { + MessageLookup::Offset { offset, .. } => { + let offsets = unsafe { &*self.message_offset_to_op.get() }; + offsets.get(offset).copied().unwrap_or_default() + } + MessageLookup::Timestamp { timestamp, .. } => { + let timestamps = unsafe { &*self.timestamp_to_op.get() }; + timestamps.get(timestamp).copied().unwrap_or_default() + } + } + } + + fn messages_from_op(&self, start_op: usize) -> impl Iterator> { + let inner = unsafe { &*self.inner.get() }; + inner.set.iter().skip(start_op) + } +} + +impl Journal for PartitionJournal2Impl { + type Header = PrepareHeader; + type Entry = Message; + type HeaderRef<'a> = &'a Self::Header; + + fn header(&self, idx: usize) -> Option> { + // TODO: Fixes + let inner = unsafe { &*self.inner.get() }; + inner.set.get(idx).map(|msg| msg.header()) + } + + fn previous_header(&self, header: &Self::Header) -> Option> { + // TODO: Fixes + let prev_idx = header.op.saturating_sub(1) as usize; + let inner = unsafe { &*self.inner.get() }; + inner.set.get(prev_idx).map(|msg| msg.header()) + } + + async fn append(&self, entry: Self::Entry) { + let first_offset_and_timestamp = Self::message_to_batch(&entry) + .and_then(|batch| Some((batch.first_offset()?, batch.first_timestamp()?))); + + let inner = unsafe { &mut *self.inner.get() }; + let op = inner.set.len(); + inner.set.push(entry); + + if let Some((offset, timestamp)) = first_offset_and_timestamp { + let offsets = unsafe { &mut *self.message_offset_to_op.get() }; + offsets.insert(offset, op); + + let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; + timestamps.insert(timestamp, op); + } + } + + async fn entry(&self, header: &Self::Header) -> Option { + let op = header.op as usize; + let inner = unsafe { &*self.inner.get() }; + inner.set.get(op).cloned() + } +} + +impl PartitionJournal2 for PartitionJournal2Impl { + type Query = MessageLookup; + + async fn get(&self, query: &Self::Query) -> Option { + let query = *query; + let start_op = self.candidate_start_op(&query); + let messages = self.messages_from_op(start_op); + let batch_set = Self::messages_to_batch_set(messages); + + let result = match query { + MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), + MessageLookup::Timestamp { timestamp, count } => { + batch_set.get_by_timestamp(timestamp, count) + } + }; + + if result.is_empty() { + None + } else { + Some(result) + } + } +} + impl Journal for PartitionJournal { type Header = MessageLookup; type Entry = IggyMessagesBatchMut; From 90d982cf8bce44ff1a4b3a3ccf982d3341ee4e8b Mon Sep 17 00:00:00 2001 From: numinex Date: Mon, 9 Mar 2026 21:00:50 +0100 Subject: [PATCH 2/6] smth --- core/journal/src/lib.rs | 5 +- core/partitions/src/journal.rs | 220 ++++++++++++++++++++++++--------- core/simulator/src/deps.rs | 6 +- 3 files changed, 171 insertions(+), 60 deletions(-) diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs index f1b4081dd2..55a6b14fbd 100644 --- a/core/journal/src/lib.rs +++ b/core/journal/src/lib.rs @@ -39,7 +39,10 @@ pub trait Storage { type Buffer; fn write(&self, buf: Self::Buffer) -> impl Future; - fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future; + // TODO: Get rid of the `len` usize, we need to do changes in `Simulator` in order to support that. + // Maybe we should go back to passing in the `Buffer` again, but I am not sure how to handle it in the `Partitions Journal`, since we use in-memory impl + // which extracts the buffer out of the `Vec` and we don't need to allocate a new buffer. + fn read(&self, offset: usize, len: usize) -> impl Future; } pub trait JournalHandle { diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 5f964b46b0..93d3733fc5 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,14 +15,19 @@ // specific language governing permissions and limitations // under the License. -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use iggy_common::{ INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, PooledBuffer, header::{Operation, PrepareHeader}, message::Message, }; use journal::{Journal, Storage}; -use std::{cell::UnsafeCell, collections::HashMap}; +use std::{ + cell::UnsafeCell, + collections::{BTreeMap, HashMap}, +}; + +const ZERO_LEN: usize = 0; // TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. /// No-op storage backend for the in-memory partition journal. @@ -36,7 +41,7 @@ impl Storage for Noop { 0 } - async fn read(&self, _offset: usize, _buffer: ()) -> () { ()} + async fn read(&self, _offset: usize, _len: usize) -> () {} } /// Lookup key for querying messages from the journal. @@ -99,29 +104,69 @@ where fn get(&self, query: &Self::Query) -> impl Future>; } -pub struct PartitionJournal2Impl { - message_offset_to_op: UnsafeCell>, - timestamp_to_op: UnsafeCell>, - inner: UnsafeCell, +#[derive(Default)] +pub struct PartitionJournalMemStorage { + entries: UnsafeCell>, + op_to_index: UnsafeCell>, } -struct JournalInner { - set: Vec>, -} +impl Storage for PartitionJournalMemStorage { + type Buffer = Bytes; -impl Default for PartitionJournal2Impl { - fn default() -> Self { - Self { - message_offset_to_op: UnsafeCell::new(HashMap::new()), - timestamp_to_op: UnsafeCell::new(HashMap::new()), - inner: UnsafeCell::new(JournalInner { set: Vec::new() }), + async fn write(&self, buf: Self::Buffer) -> usize { + let op = Message::::from_bytes(buf.clone()) + .ok() + .map(|message| message.header().op); + + let entries = unsafe { &mut *self.entries.get() }; + let index = entries.len(); + entries.push(buf.clone()); + + if let Some(op) = op { + let op_to_index = unsafe { &mut *self.op_to_index.get() }; + op_to_index.insert(op, index); } + + buf.len() + } + + async fn read(&self, offset: usize, _len: usize) -> Self::Buffer { + let op = offset as u64; + let Some(index) = ({ + let op_to_index = unsafe { &*self.op_to_index.get() }; + op_to_index.get(&op).copied() + }) else { + return Bytes::new(); + }; + + let entries = unsafe { &*self.entries.get() }; + entries.get(index).cloned().unwrap_or_default() } } -impl PartitionJournal2Impl { +pub struct PartitionJournal2Impl +where + S: Storage, +{ + message_offset_to_op: UnsafeCell>, + timestamp_to_op: UnsafeCell>, + headers: UnsafeCell>, + inner: UnsafeCell>, +} + +struct JournalInner +where + S: Storage, +{ + storage: S, +} + +impl PartitionJournal2Impl +where + S: Storage, +{ fn decode_send_messages_batch(body: bytes::Bytes) -> Option { - // TODO: This is bad, + // TODO: This is bad, let mut body = body .try_into_mut() .unwrap_or_else(|body| BytesMut::from(body.as_ref())); @@ -155,10 +200,10 @@ impl PartitionJournal2Impl { Self::decode_send_messages_batch(message.body_bytes()) } - fn messages_to_batch_set<'a>(messages: impl Iterator>) -> IggyMessagesBatchSet { + fn messages_to_batch_set(messages: Vec>) -> IggyMessagesBatchSet { let mut batch_set = IggyMessagesBatchSet::empty(); - for message in messages { + for message in &messages { if let Some(batch) = Self::message_to_batch(message) { batch_set.add_batch(batch); } @@ -167,50 +212,108 @@ impl PartitionJournal2Impl { batch_set } - fn candidate_start_op(&self, query: &MessageLookup) -> usize { + fn candidate_start_op(&self, query: &MessageLookup) -> Option { match query { MessageLookup::Offset { offset, .. } => { let offsets = unsafe { &*self.message_offset_to_op.get() }; - offsets.get(offset).copied().unwrap_or_default() + offsets + .range(..=*offset) + .next_back() + .or_else(|| offsets.range(*offset..).next()) + .map(|(_, op)| *op) } MessageLookup::Timestamp { timestamp, .. } => { let timestamps = unsafe { &*self.timestamp_to_op.get() }; - timestamps.get(timestamp).copied().unwrap_or_default() + timestamps + .range(..=*timestamp) + .next_back() + .or_else(|| timestamps.range(*timestamp..).next()) + .map(|(_, op)| *op) } } } - fn messages_from_op(&self, start_op: usize) -> impl Iterator> { - let inner = unsafe { &*self.inner.get() }; - inner.set.iter().skip(start_op) + async fn message_by_op(&self, op: u64) -> Option> { + let offset = usize::try_from(op).ok()?; + let bytes = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.read(offset, ZERO_LEN).await + }; + + Some(Message::from_bytes(bytes).expect("invalid message bytes read from storage")) + } + + async fn load_messages_from_storage( + &self, + start_op: u64, + count: u32, + ) -> Vec> { + if count == 0 { + return Vec::new(); + } + + let mut messages = Vec::new(); + let mut loaded_messages = 0u32; + let mut op = start_op; + + while loaded_messages < count { + let Some(message) = self.message_by_op(op).await else { + break; + }; + + if let Some(batch) = Self::message_to_batch(&message) { + loaded_messages = loaded_messages.saturating_add(batch.count()); + messages.push(message); + } + + op += 1; + } + + messages } } -impl Journal for PartitionJournal2Impl { +impl Journal for PartitionJournal2Impl +where + S: Storage, +{ type Header = PrepareHeader; type Entry = Message; - type HeaderRef<'a> = &'a Self::Header; + #[rustfmt::skip] // Scuffed formatter. + type HeaderRef<'a> = &'a Self::Header where S: 'a; fn header(&self, idx: usize) -> Option> { - // TODO: Fixes - let inner = unsafe { &*self.inner.get() }; - inner.set.get(idx).map(|msg| msg.header()) + let headers = unsafe { &mut *self.headers.get() }; + headers.get(idx) } fn previous_header(&self, header: &Self::Header) -> Option> { - // TODO: Fixes - let prev_idx = header.op.saturating_sub(1) as usize; - let inner = unsafe { &*self.inner.get() }; - inner.set.get(prev_idx).map(|msg| msg.header()) + if header.op == 0 { + return None; + } + + let prev_op = header.op - 1; + let headers = unsafe { &*self.headers.get() }; + headers.get(prev_op as usize) } async fn append(&self, entry: Self::Entry) { let first_offset_and_timestamp = Self::message_to_batch(&entry) .and_then(|batch| Some((batch.first_offset()?, batch.first_timestamp()?))); - let inner = unsafe { &mut *self.inner.get() }; - let op = inner.set.len(); - inner.set.push(entry); + let header = *entry.header(); + let op = header.op; + + { + let headers = unsafe { &mut *self.headers.get() }; + headers.push(header); + }; + + let bytes = entry.into_inner(); + { + let inner = unsafe { &*self.inner.get() }; + let _ = inner.storage.write(bytes).await; + } if let Some((offset, timestamp)) = first_offset_and_timestamp { let offsets = unsafe { &mut *self.message_offset_to_op.get() }; @@ -222,33 +325,38 @@ impl Journal for PartitionJournal2Impl { } async fn entry(&self, header: &Self::Header) -> Option { - let op = header.op as usize; - let inner = unsafe { &*self.inner.get() }; - inner.set.get(op).cloned() + self.message_by_op(header.op).await } } -impl PartitionJournal2 for PartitionJournal2Impl { +impl PartitionJournal2 for PartitionJournal2Impl +where + S: Storage, +{ type Query = MessageLookup; async fn get(&self, query: &Self::Query) -> Option { let query = *query; - let start_op = self.candidate_start_op(&query); - let messages = self.messages_from_op(start_op); - let batch_set = Self::messages_to_batch_set(messages); - - let result = match query { - MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), - MessageLookup::Timestamp { timestamp, count } => { - batch_set.get_by_timestamp(timestamp, count) - } - }; + let start_op = self.candidate_start_op(&query)?; + let count = match query { + MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { count, .. } => count, + }; + + let messages = self.load_messages_from_storage(start_op, count).await; - if result.is_empty() { - None - } else { - Some(result) + let batch_set = Self::messages_to_batch_set(messages); + let result = match query { + MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), + MessageLookup::Timestamp { timestamp, count } => { + batch_set.get_by_timestamp(timestamp, count) } + }; + + if result.is_empty() { + None + } else { + Some(result) + } } } diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 12e78bb051..316a54fe74 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -45,7 +45,8 @@ impl Storage for MemStorage { len } - async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> Self::Buffer { + async fn read(&self, offset: usize, len: usize) -> Self::Buffer { + let buffer = vec![0; len]; let data = self.data.borrow(); let end = offset + buffer.len(); if offset < data.len() && end <= data.len() { @@ -106,8 +107,7 @@ impl>> Journal for SimJournal { let header = headers.get(&header.op)?; let offset = *offsets.get(&header.op)?; - let buffer = vec![0; header.size as usize]; - let buffer = self.storage.read(offset, buffer).await; + let buffer = self.storage.read(offset, header.size as usize).await; let message = Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes should be valid"); Some(message) From 8283f8ef4688fa0a1a8b8e7ae161a1d14597c7c6 Mon Sep 17 00:00:00 2001 From: numinex Date: Tue, 10 Mar 2026 10:01:03 +0100 Subject: [PATCH 3/6] fixes --- core/partitions/src/iggy_partition.rs | 51 +++++- core/partitions/src/iggy_partitions.rs | 54 ++----- core/partitions/src/journal.rs | 211 ++++++++++--------------- core/partitions/src/lib.rs | 35 +++- core/simulator/src/deps.rs | 2 +- 5 files changed, 177 insertions(+), 176 deletions(-) diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 49c820230d..bf309a7c87 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::journal::{Noop, PartitionJournal}; +use crate::journal::{PartitionJournal2Impl, PartitionJournalMemStorage}; use crate::log::SegmentedLog; -use crate::{AppendResult, Partition}; +use crate::{AppendResult, Partition, decode_send_messages_batch}; use iggy_common::{ ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut, IggyTimestamp, PartitionStats, + header::{Operation, PrepareHeader}, + message::Message, }; use journal::Journal as _; use std::sync::Arc; @@ -30,7 +32,8 @@ use tokio::sync::Mutex as TokioMutex; // This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`. #[derive(Debug)] pub struct IggyPartition { - pub log: SegmentedLog, + pub log: + SegmentedLog, PartitionJournalMemStorage>, /// Committed offset — advanced only after quorum ack. pub offset: Arc, /// Dirty offset — advanced on every prepare (before commit). @@ -46,6 +49,35 @@ pub struct IggyPartition { } impl IggyPartition { + fn prepare_message_from_batch( + mut header: PrepareHeader, + batch: &IggyMessagesBatchMut, + ) -> Message { + let indexes = batch.indexes(); + let count = batch.count(); + let body_len = 4 + indexes.len() + batch.len(); + let total_size = std::mem::size_of::() + body_len; + header.size = total_size as u32; + + let message = Message::::new(total_size).transmute_header(|_old, new| { + *new = header; + }); + + let mut bytes = message + .into_inner() + .try_into_mut() + .expect("prepare_message_from_batch: expected unique bytes buffer"); + let header_size = std::mem::size_of::(); + bytes[header_size..header_size + 4].copy_from_slice(&count.to_le_bytes()); + let mut position = header_size + 4; + bytes[position..position + indexes.len()].copy_from_slice(indexes); + position += indexes.len(); + bytes[position..position + batch.len()].copy_from_slice(batch); + + Message::::from_bytes(bytes.freeze()) + .expect("prepare_message_from_batch: invalid prepared message bytes") + } + pub fn new(stats: Arc) -> Self { Self { log: SegmentedLog::default(), @@ -65,8 +97,16 @@ impl IggyPartition { impl Partition for IggyPartition { async fn append_messages( &mut self, - mut batch: IggyMessagesBatchMut, + message: Message, ) -> Result { + let header = *message.header(); + if header.operation != Operation::SendMessages { + return Err(IggyError::CannotAppendMessage); + } + + let mut batch = decode_send_messages_batch(message.body_bytes()) + .ok_or(IggyError::CannotAppendMessage)?; + if batch.count() == 0 { return Ok(AppendResult::new(0, 0, 0)); } @@ -116,7 +156,8 @@ impl Partition for IggyPartition { journal.info.end_timestamp = ts; } - journal.inner.append(batch).await; + let message = Self::prepare_message_from_batch(header, &batch); + journal.inner.append(message).await; Ok(AppendResult::new( dirty_offset, diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index c3b1e0bc63..fda970d445 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -30,8 +30,7 @@ use consensus::{ }; use iggy_common::header::Command2; use iggy_common::{ - INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, - Segment, SegmentStorage, + IggyByteSize, PartitionStats, Segment, SegmentStorage, header::{ ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader, }, @@ -355,13 +354,13 @@ where } async fn on_replicate(&self, message: as Consensus>::Message) { - let header = message.header(); + let header = *message.header(); let namespace = IggyNamespace::from_raw(header.namespace); let consensus = self .consensus() .expect("on_replicate: consensus not initialized"); - let current_op = match replicate_preflight(consensus, header) { + let current_op = match replicate_preflight(consensus, &header) { Ok(current_op) => current_op, Err(reason) => { warn!( @@ -372,7 +371,7 @@ where } }; - let is_old_prepare = fence_old_prepare_by_commit(consensus, header); + let is_old_prepare = fence_old_prepare_by_commit(consensus, &header); if is_old_prepare { warn!("received old prepare, not replicating"); } else { @@ -387,9 +386,9 @@ where // TODO: Figure out the flow of the partition operations. // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! - self.apply_replicated_operation(&namespace, &message).await; + self.apply_replicated_operation(&namespace, message).await; - self.send_prepare_ok(header).await; + self.send_prepare_ok(&header).await; if consensus.is_follower() { self.commit_journal(namespace); @@ -539,37 +538,21 @@ where .register_namespace(ns); } - // TODO: Move this elsewhere, also do not reallocate, we do reallocationg now becauise we use PooledBuffer for the batch body - // but `Bytes` for `Message` payload. - fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut { - assert!(body.len() >= 4, "prepare body too small for batch header"); - let count = u32::from_le_bytes(body[0..4].try_into().unwrap()); - let indexes_len = count as usize * INDEX_SIZE; - let indexes_end = 4 + indexes_len; - assert!( - body.len() >= indexes_end, - "prepare body too small for {count} indexes", - ); - - let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0); - let messages = PooledBuffer::from(&body[indexes_end..]); - IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages) - } - async fn apply_replicated_operation( &self, namespace: &IggyNamespace, - message: &Message, + message: Message, ) { let consensus = self .consensus() .expect("apply_replicated_operation: consensus not initialized"); - let header = message.header(); + let header = *message.header(); + // TODO: WE have to distinguish between an `message` recv by leader and follower. + // In the follower path, we have to skip the `prepare_for_persistance` path, just append to journal. match header.operation { Operation::SendMessages => { - let body = message.body_bytes(); - self.append_send_messages_to_journal(namespace, body.as_ref()) + self.append_send_messages_to_journal(namespace, message) .await; debug!( replica = consensus.replica(), @@ -598,12 +581,7 @@ where } } - async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, body: &[u8]) { - let batch = Self::batch_from_body(body); - self.append_messages_to_journal(namespace, batch).await; - } - - /// Append a batch to a partition's journal with offset assignment. + /// Append a prepare message to a partition's journal with offset assignment. /// /// Updates `segment.current_position` (logical position for indexing) but /// not `segment.end_offset` or `segment.end_timestamp` (committed state). @@ -611,15 +589,15 @@ where /// /// Uses `dirty_offset` for offset assignment so that multiple prepares /// can be pipelined before any commit. - async fn append_messages_to_journal( + async fn append_send_messages_to_journal( &self, namespace: &IggyNamespace, - batch: IggyMessagesBatchMut, + message: Message, ) { let partition = self .get_mut_by_ns(namespace) - .expect("append_messages_to_journal: partition not found for namespace"); - let _ = partition.append_messages(batch).await; + .expect("append_send_messages_to_journal: partition not found for namespace"); + let _ = partition.append_messages(message).await; } /// Replicate a prepare message to the next replica in the chain. diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 93d3733fc5..2139290d31 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use iggy_common::{ - INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, PooledBuffer, + IggyMessagesBatchMut, IggyMessagesBatchSet, header::{Operation, PrepareHeader}, message::Message, }; @@ -29,21 +29,6 @@ use std::{ const ZERO_LEN: usize = 0; -// TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. -/// No-op storage backend for the in-memory partition journal. -#[derive(Debug)] -pub struct Noop; - -impl Storage for Noop { - type Buffer = (); - - async fn write(&self, _buf: ()) -> usize { - 0 - } - - async fn read(&self, _offset: usize, _len: usize) -> () {} -} - /// Lookup key for querying messages from the journal. #[derive(Debug, Clone, Copy)] pub enum MessageLookup { @@ -59,42 +44,7 @@ impl std::ops::Deref for MessageLookup { } } -// [LEGACY] -pub struct PartitionJournal { - batch_set: UnsafeCell, -} - -impl PartitionJournal { - pub fn new() -> Self { - Self { - batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()), - } - } - - /// Drain all accumulated batches, returning the batch set. - pub fn commit(&self) -> IggyMessagesBatchSet { - let batch_set = unsafe { &mut *self.batch_set.get() }; - std::mem::take(batch_set) - } - - pub fn is_empty(&self) -> bool { - let batch_set = unsafe { &*self.batch_set.get() }; - batch_set.is_empty() - } -} - -impl Default for PartitionJournal { - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for PartitionJournal { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PartitionJournal").finish() - } -} - +#[allow(dead_code)] pub trait PartitionJournal2: Journal where S: Storage, @@ -104,7 +54,7 @@ where fn get(&self, query: &Self::Query) -> impl Future>; } -#[derive(Default)] +#[derive(Debug, Default)] pub struct PartitionJournalMemStorage { entries: UnsafeCell>, op_to_index: UnsafeCell>, @@ -154,56 +104,100 @@ where inner: UnsafeCell>, } -struct JournalInner +impl Default for PartitionJournal2Impl +where + S: Storage + Default, +{ + fn default() -> Self { + Self { + message_offset_to_op: UnsafeCell::new(BTreeMap::new()), + timestamp_to_op: UnsafeCell::new(BTreeMap::new()), + headers: UnsafeCell::new(Vec::new()), + inner: UnsafeCell::new(JournalInner { + storage: S::default(), + }), + } + } +} + +impl std::fmt::Debug for PartitionJournal2Impl where S: Storage, { - storage: S, + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartitionJournal2Impl").finish() + } } -impl PartitionJournal2Impl +struct JournalInner where S: Storage, { - fn decode_send_messages_batch(body: bytes::Bytes) -> Option { - // TODO: This is bad, - let mut body = body - .try_into_mut() - .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + storage: S, +} - if body.len() < 4 { - return None; - } +impl PartitionJournalMemStorage { + fn drain(&self) -> Vec { + let entries = unsafe { &mut *self.entries.get() }; + let drained = std::mem::take(entries); + let op_to_index = unsafe { &mut *self.op_to_index.get() }; + op_to_index.clear(); + drained + } - let count_bytes = body.split_to(4); - let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); - let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + fn is_empty(&self) -> bool { + let entries = unsafe { &*self.entries.get() }; + entries.is_empty() + } +} - if body.len() < indexes_len { - return None; +impl PartitionJournal2Impl { + /// Drain all accumulated batches, matching the legacy PartitionJournal API. + pub fn commit(&self) -> IggyMessagesBatchSet { + let entries = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.drain() + }; + + let mut messages = Vec::with_capacity(entries.len()); + for bytes in entries { + if let Ok(message) = Message::from_bytes(bytes) { + messages.push(message); + } } - let indexes_bytes = body.split_to(indexes_len); - let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); - let messages = PooledBuffer::from(body); + let headers = unsafe { &mut *self.headers.get() }; + headers.clear(); + let offsets = unsafe { &mut *self.message_offset_to_op.get() }; + offsets.clear(); + let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; + timestamps.clear(); + + Self::messages_to_batch_set(&messages) + } - Some(IggyMessagesBatchMut::from_indexes_and_messages( - indexes, messages, - )) + pub fn is_empty(&self) -> bool { + let inner = unsafe { &*self.inner.get() }; + inner.storage.is_empty() } +} +impl PartitionJournal2Impl +where + S: Storage, +{ fn message_to_batch(message: &Message) -> Option { if message.header().operation != Operation::SendMessages { return None; } - Self::decode_send_messages_batch(message.body_bytes()) + crate::decode_send_messages_batch(message.body_bytes()) } - fn messages_to_batch_set(messages: Vec>) -> IggyMessagesBatchSet { + fn messages_to_batch_set(messages: &[Message]) -> IggyMessagesBatchSet { let mut batch_set = IggyMessagesBatchSet::empty(); - for message in &messages { + for message in messages { if let Some(batch) = Self::message_to_batch(message) { batch_set.add_batch(batch); } @@ -212,6 +206,7 @@ where batch_set } + #[allow(dead_code)] fn candidate_start_op(&self, query: &MessageLookup) -> Option { match query { MessageLookup::Offset { offset, .. } => { @@ -240,9 +235,13 @@ where inner.storage.read(offset, ZERO_LEN).await }; - Some(Message::from_bytes(bytes).expect("invalid message bytes read from storage")) + Some( + Message::from_bytes(bytes) + .expect("partition.journal.storage.read: invalid bytes for message"), + ) } + #[allow(dead_code)] async fn load_messages_from_storage( &self, start_op: u64, @@ -294,7 +293,7 @@ where let prev_op = header.op - 1; let headers = unsafe { &*self.headers.get() }; - headers.get(prev_op as usize) + headers.iter().find(|candidate| candidate.op == prev_op) } async fn append(&self, entry: Self::Entry) { @@ -344,7 +343,7 @@ where let messages = self.load_messages_from_storage(start_op, count).await; - let batch_set = Self::messages_to_batch_set(messages); + let batch_set = Self::messages_to_batch_set(&messages); let result = match query { MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), MessageLookup::Timestamp { timestamp, count } => { @@ -359,51 +358,3 @@ where } } } - -impl Journal for PartitionJournal { - type Header = MessageLookup; - type Entry = IggyMessagesBatchMut; - type HeaderRef<'a> = MessageLookup; - - fn header(&self, _idx: usize) -> Option> { - unreachable!("fn header: header lookup not supported for partition journal."); - } - - fn previous_header(&self, _header: &Self::Header) -> Option> { - unreachable!("fn previous_header: header lookup not supported for partition journal."); - } - - async fn append(&self, entry: Self::Entry) { - let batch_set = unsafe { &mut *self.batch_set.get() }; - batch_set.add_batch(entry); - } - - async fn entry(&self, header: &Self::Header) -> Option { - // Entry lookups go through SegmentedLog which uses JournalInfo - // to construct MessageLookup headers. The actual query is done - // via get() below, not through the Journal trait. - let _ = header; - unreachable!("fn entry: use SegmentedLog::get() instead for partition journal lookups."); - } -} - -impl PartitionJournal { - /// Query messages by offset or timestamp with count. - /// - /// This is called by `SegmentedLog` using `MessageLookup` headers - /// constructed from `JournalInfo`. - pub fn get(&self, header: &MessageLookup) -> Option { - let batch_set = unsafe { &*self.batch_set.get() }; - let result = match header { - MessageLookup::Offset { offset, count } => batch_set.get_by_offset(*offset, *count), - MessageLookup::Timestamp { timestamp, count } => { - batch_set.get_by_timestamp(*timestamp, *count) - } - }; - if result.is_empty() { - None - } else { - Some(result) - } - } -} diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index d997d72b7a..1f18e32ed8 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -23,7 +23,11 @@ mod journal; mod log; mod types; -use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet}; +use bytes::{Bytes, BytesMut}; +use iggy_common::{ + INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, + PooledBuffer, header::PrepareHeader, message::Message, +}; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; pub use types::{ @@ -31,6 +35,33 @@ pub use types::{ SendMessagesResult, }; +pub(crate) fn decode_send_messages_batch(body: Bytes) -> Option { + // TODO: This very is bad, IGGY-114 Fixes this. + let mut body = body + .try_into_mut() + .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + + if body.len() < 4 { + return None; + } + + let count_bytes = body.split_to(4); + let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); + let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + + if body.len() < indexes_len { + return None; + } + + let indexes_bytes = body.split_to(indexes_len); + let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); + let messages = PooledBuffer::from(body); + + Some(IggyMessagesBatchMut::from_indexes_and_messages( + indexes, messages, + )) +} + /// Partition-level data plane operations. /// /// `send_messages` MUST only append to the partition journal (prepare phase), @@ -38,7 +69,7 @@ pub use types::{ pub trait Partition { fn append_messages( &mut self, - batch: IggyMessagesBatchMut, + message: Message, ) -> impl Future>; fn poll_messages( diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 316a54fe74..9f4d64b790 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -46,7 +46,7 @@ impl Storage for MemStorage { } async fn read(&self, offset: usize, len: usize) -> Self::Buffer { - let buffer = vec![0; len]; + let mut buffer = vec![0; len]; let data = self.data.borrow(); let end = offset + buffer.len(); if offset < data.len() && end <= data.len() { From c25f6d59b94058b3a37fb41c7434c823b4c7e173 Mon Sep 17 00:00:00 2001 From: numinex Date: Tue, 10 Mar 2026 10:05:41 +0100 Subject: [PATCH 4/6] clipy lint --- core/partitions/src/iggy_partition.rs | 2 +- core/partitions/src/journal.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index bf309a7c87..81a771fe7f 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -57,7 +57,7 @@ impl IggyPartition { let count = batch.count(); let body_len = 4 + indexes.len() + batch.len(); let total_size = std::mem::size_of::() + body_len; - header.size = total_size as u32; + header.size = u32::try_from(total_size).expect("prepare_message_from_batch: batch size exceeds u32::MAX"); let message = Message::::new(total_size).transmute_header(|_old, new| { *new = header; diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 2139290d31..8bf9d2a30f 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -32,7 +32,9 @@ const ZERO_LEN: usize = 0; /// Lookup key for querying messages from the journal. #[derive(Debug, Clone, Copy)] pub enum MessageLookup { + #[allow(dead_code)] Offset { offset: u64, count: u32 }, + #[allow(dead_code)] Timestamp { timestamp: u64, count: u32 }, } @@ -152,7 +154,7 @@ impl PartitionJournalMemStorage { } impl PartitionJournal2Impl { - /// Drain all accumulated batches, matching the legacy PartitionJournal API. + /// Drain all accumulated batches, matching the legacy `PartitionJournal` API. pub fn commit(&self) -> IggyMessagesBatchSet { let entries = { let inner = unsafe { &*self.inner.get() }; From 778132da1820dfbeccdba0a1955ca1683305a4bd Mon Sep 17 00:00:00 2001 From: numinex Date: Tue, 10 Mar 2026 10:06:17 +0100 Subject: [PATCH 5/6] remove plan --- .../PARTITION_JOURNAL2_STORAGE_PLAN.md | 141 ------------------ 1 file changed, 141 deletions(-) delete mode 100644 core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md diff --git a/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md deleted file mode 100644 index ec42f4d7d6..0000000000 --- a/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md +++ /dev/null @@ -1,141 +0,0 @@ -# PartitionJournal2 Storage Proposal - -## Objective -Use `journal::Storage` as the actual backing store for serialized prepare entries (`Bytes`) and decode to `Message` on read. - -## Current Problem -- `PartitionJournal2Impl` currently stores entries directly in: - - `UnsafeCell>>` -- `Noop` storage is unused for real data. -- The old `Buffer = Entry` idea is too rigid for this path. -- `Storage::read(&self, offset, buffer)` still requires a fallback buffer argument. - -## Design Direction - -### 1. Use serialized buffer (`Bytes`) in storage -For `PartitionJournal2`, enforce: - -```rust -S: Storage -``` - -Journal entry remains: - -```rust -type Entry = Message; -``` - -Conversion boundary: -- write path: `Message -> Bytes` (serialize/store) -- read path: `Bytes -> Message` via `Message::from_bytes` - -### 2. Replace `Noop` with in-memory prepare-message storage -Introduce a dedicated storage: - -```rust -pub struct InMemoryPrepareStorage { - entries: UnsafeCell>, -} -``` - -Behavior: -- `write(bytes)` appends serialized message bytes to `entries`. -- `read(offset, ...)` treats `offset` as `op_number` and returns that op entry. - -This keeps storage raw and simple, while typed decoding happens at journal boundary. - -### 2.1 `offset` semantics for this journal: `offset == op_number` -For `PartitionJournal2`, define: -- `Storage::read(offset, ...)` where `offset` is VSR `PrepareHeader.op` (op number), not byte offset. -- Journal append path stores entries in op order, so op lookup is O(1)-ish via index map (or direct vec index if contiguous). - -Implementation detail: -- Maintain `op_to_index: HashMap` (or rely on contiguous `op` if guaranteed). -- On `append(entry)`, cache `op_to_index.insert(entry.header().op, vec_index)`. -- On `entry(header)`, call storage read using `header.op as usize` or map-resolved index. - -### 3. Make `PartitionJournal2Impl` storage-backed -Refactor `PartitionJournal2Impl` to own a storage instance: - -```rust -pub struct PartitionJournal2Impl> { - storage: S, - // metadata/indexes only - headers: UnsafeCell>, - message_offset_to_op: UnsafeCell>, - timestamp_to_op: UnsafeCell>, -} -``` - -Responsibilities split: -- `storage`: source of truth for serialized entry bytes -- `headers`: lightweight header cache to satisfy `header()` / `previous_header()` reference semantics -- maps: query acceleration for `get` - -### 4. Journal method behavior with storage -- `append(entry)`: - - decode send-messages info for maps - - serialize: `let bytes = entry.into_inner()` (or equivalent) - - `storage.write(bytes)` - - push `*entry.header()` into `headers` - - cache `op_number -> storage position` -- `entry(header)`: - - resolve by `op_number` (from `header.op`) - - fetch bytes from storage via read - - decode immediately: `Message::::from_bytes(bytes)` -- `header(idx)` and `previous_header(header)`: - - use `headers` vector (no full entry decode needed) - -### 5. `get` path stays batch-conversion based -`get` still performs: -- collect candidate bytes from storage -- decode into `Message` -- convert `Vec> -> IggyMessagesBatchSet` -- apply `MessageLookup` filter (`get_by_offset` / `get_by_timestamp`) - -The existing conversion helpers remain valid. - -## Storage Trait Consideration - -Current trait: - -```rust -fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future; -``` - -For typed object lookups this is awkward. Recommended adjustment: - -```rust -fn read(&self, offset: usize) -> impl Future>; -``` - -Why: -- avoids fake fallback buffer construction -- maps naturally to indexed storage -- still works for byte storage by returning `Option` - -If trait-wide change is too large right now, keep current signature temporarily and ignore/use the incoming `buffer` only as a fallback for out-of-range reads. - -For this phase, no trait change is required: just interpret the existing `offset` argument as `op_number` in `PartitionJournal2` storage. - -## Proposed File-Level Changes -- `core/partitions/src/journal.rs` - - add `InMemoryPrepareStorage` - - make `PartitionJournal2Impl` generic over storage (or concrete to `InMemoryPrepareStorage`) - - remove direct `inner.set: Vec>` as primary store - - keep lightweight header metadata cache - - keep/adjust current lookup maps -- `core/partitions/src/iggy_partition.rs` (only if/when wiring `PartitionJournal2` into partition log) - - replace `Noop` for this path with `InMemoryPrepareStorage` - -## Migration Sequence (Low Risk) -1. Add `InMemoryPrepareStorage` without removing existing fields. -2. Mirror writes to both old `inner.set` and storage. -3. Switch reads (`entry`, `get`) to storage-backed path. -4. Remove old `inner.set` once parity is confirmed. -5. Optionally evolve `Storage::read` signature in a separate PR. - -## Expected Outcome -- `Storage` is no longer a no-op for `PartitionJournal2`. -- Storage buffer is raw serialized data (`Bytes`), and decoding to `Message` happens at read boundary. -- The in-memory backend stays simple (`UnsafeCell>`) and aligned with your proposed flow. From 9e65721d203d06128d2bc866966cd6ab809e3237 Mon Sep 17 00:00:00 2001 From: numinex Date: Tue, 10 Mar 2026 10:11:38 +0100 Subject: [PATCH 6/6] rename types --- core/partitions/src/iggy_partition.rs | 4 ++-- core/partitions/src/journal.rs | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 81a771fe7f..24f27add5e 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::journal::{PartitionJournal2Impl, PartitionJournalMemStorage}; +use crate::journal::{PartitionJournal, PartitionJournalMemStorage}; use crate::log::SegmentedLog; use crate::{AppendResult, Partition, decode_send_messages_batch}; use iggy_common::{ @@ -33,7 +33,7 @@ use tokio::sync::Mutex as TokioMutex; #[derive(Debug)] pub struct IggyPartition { pub log: - SegmentedLog, PartitionJournalMemStorage>, + SegmentedLog, PartitionJournalMemStorage>, /// Committed offset — advanced only after quorum ack. pub offset: Arc, /// Dirty offset — advanced on every prepare (before commit). diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 8bf9d2a30f..8b569537af 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -47,7 +47,7 @@ impl std::ops::Deref for MessageLookup { } #[allow(dead_code)] -pub trait PartitionJournal2: Journal +pub trait QueryableJournal: Journal where S: Storage, { @@ -96,7 +96,7 @@ impl Storage for PartitionJournalMemStorage { } } -pub struct PartitionJournal2Impl +pub struct PartitionJournal where S: Storage, { @@ -106,7 +106,7 @@ where inner: UnsafeCell>, } -impl Default for PartitionJournal2Impl +impl Default for PartitionJournal where S: Storage + Default, { @@ -122,7 +122,7 @@ where } } -impl std::fmt::Debug for PartitionJournal2Impl +impl std::fmt::Debug for PartitionJournal where S: Storage, { @@ -153,7 +153,7 @@ impl PartitionJournalMemStorage { } } -impl PartitionJournal2Impl { +impl PartitionJournal { /// Drain all accumulated batches, matching the legacy `PartitionJournal` API. pub fn commit(&self) -> IggyMessagesBatchSet { let entries = { @@ -184,7 +184,7 @@ impl PartitionJournal2Impl { } } -impl PartitionJournal2Impl +impl PartitionJournal where S: Storage, { @@ -274,7 +274,7 @@ where } } -impl Journal for PartitionJournal2Impl +impl Journal for PartitionJournal where S: Storage, { @@ -330,7 +330,7 @@ where } } -impl PartitionJournal2 for PartitionJournal2Impl +impl QueryableJournal for PartitionJournal where S: Storage, {