diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs index f1b4081dd2..55a6b14fbd 100644 --- a/core/journal/src/lib.rs +++ b/core/journal/src/lib.rs @@ -39,7 +39,10 @@ pub trait Storage { type Buffer; fn write(&self, buf: Self::Buffer) -> impl Future; - fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future; + // TODO: Get rid of the `len` usize, we need to do changes in `Simulator` in order to support that. + // Maybe we should go back to passing in the `Buffer` again, but I am not sure how to handle it in the `Partitions Journal`, since we use in-memory impl + // which extracts the buffer out of the `Vec` and we don't need to allocate a new buffer. + fn read(&self, offset: usize, len: usize) -> impl Future; } pub trait JournalHandle { diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 49c820230d..24f27add5e 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::journal::{Noop, PartitionJournal}; +use crate::journal::{PartitionJournal, PartitionJournalMemStorage}; use crate::log::SegmentedLog; -use crate::{AppendResult, Partition}; +use crate::{AppendResult, Partition, decode_send_messages_batch}; use iggy_common::{ ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut, IggyTimestamp, PartitionStats, + header::{Operation, PrepareHeader}, + message::Message, }; use journal::Journal as _; use std::sync::Arc; @@ -30,7 +32,8 @@ use tokio::sync::Mutex as TokioMutex; // This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`. #[derive(Debug)] pub struct IggyPartition { - pub log: SegmentedLog, + pub log: + SegmentedLog, PartitionJournalMemStorage>, /// Committed offset — advanced only after quorum ack. pub offset: Arc, /// Dirty offset — advanced on every prepare (before commit). @@ -46,6 +49,35 @@ pub struct IggyPartition { } impl IggyPartition { + fn prepare_message_from_batch( + mut header: PrepareHeader, + batch: &IggyMessagesBatchMut, + ) -> Message { + let indexes = batch.indexes(); + let count = batch.count(); + let body_len = 4 + indexes.len() + batch.len(); + let total_size = std::mem::size_of::() + body_len; + header.size = u32::try_from(total_size).expect("prepare_message_from_batch: batch size exceeds u32::MAX"); + + let message = Message::::new(total_size).transmute_header(|_old, new| { + *new = header; + }); + + let mut bytes = message + .into_inner() + .try_into_mut() + .expect("prepare_message_from_batch: expected unique bytes buffer"); + let header_size = std::mem::size_of::(); + bytes[header_size..header_size + 4].copy_from_slice(&count.to_le_bytes()); + let mut position = header_size + 4; + bytes[position..position + indexes.len()].copy_from_slice(indexes); + position += indexes.len(); + bytes[position..position + batch.len()].copy_from_slice(batch); + + Message::::from_bytes(bytes.freeze()) + .expect("prepare_message_from_batch: invalid prepared message bytes") + } + pub fn new(stats: Arc) -> Self { Self { log: SegmentedLog::default(), @@ -65,8 +97,16 @@ impl IggyPartition { impl Partition for IggyPartition { async fn append_messages( &mut self, - mut batch: IggyMessagesBatchMut, + message: Message, ) -> Result { + let header = *message.header(); + if header.operation != Operation::SendMessages { + return Err(IggyError::CannotAppendMessage); + } + + let mut batch = decode_send_messages_batch(message.body_bytes()) + .ok_or(IggyError::CannotAppendMessage)?; + if batch.count() == 0 { return Ok(AppendResult::new(0, 0, 0)); } @@ -116,7 +156,8 @@ impl Partition for IggyPartition { journal.info.end_timestamp = ts; } - journal.inner.append(batch).await; + let message = Self::prepare_message_from_batch(header, &batch); + journal.inner.append(message).await; Ok(AppendResult::new( dirty_offset, diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index c3b1e0bc63..fda970d445 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -30,8 +30,7 @@ use consensus::{ }; use iggy_common::header::Command2; use iggy_common::{ - INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, - Segment, SegmentStorage, + IggyByteSize, PartitionStats, Segment, SegmentStorage, header::{ ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader, }, @@ -355,13 +354,13 @@ where } async fn on_replicate(&self, message: as Consensus>::Message) { - let header = message.header(); + let header = *message.header(); let namespace = IggyNamespace::from_raw(header.namespace); let consensus = self .consensus() .expect("on_replicate: consensus not initialized"); - let current_op = match replicate_preflight(consensus, header) { + let current_op = match replicate_preflight(consensus, &header) { Ok(current_op) => current_op, Err(reason) => { warn!( @@ -372,7 +371,7 @@ where } }; - let is_old_prepare = fence_old_prepare_by_commit(consensus, header); + let is_old_prepare = fence_old_prepare_by_commit(consensus, &header); if is_old_prepare { warn!("received old prepare, not replicating"); } else { @@ -387,9 +386,9 @@ where // TODO: Figure out the flow of the partition operations. // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! - self.apply_replicated_operation(&namespace, &message).await; + self.apply_replicated_operation(&namespace, message).await; - self.send_prepare_ok(header).await; + self.send_prepare_ok(&header).await; if consensus.is_follower() { self.commit_journal(namespace); @@ -539,37 +538,21 @@ where .register_namespace(ns); } - // TODO: Move this elsewhere, also do not reallocate, we do reallocationg now becauise we use PooledBuffer for the batch body - // but `Bytes` for `Message` payload. - fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut { - assert!(body.len() >= 4, "prepare body too small for batch header"); - let count = u32::from_le_bytes(body[0..4].try_into().unwrap()); - let indexes_len = count as usize * INDEX_SIZE; - let indexes_end = 4 + indexes_len; - assert!( - body.len() >= indexes_end, - "prepare body too small for {count} indexes", - ); - - let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0); - let messages = PooledBuffer::from(&body[indexes_end..]); - IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages) - } - async fn apply_replicated_operation( &self, namespace: &IggyNamespace, - message: &Message, + message: Message, ) { let consensus = self .consensus() .expect("apply_replicated_operation: consensus not initialized"); - let header = message.header(); + let header = *message.header(); + // TODO: WE have to distinguish between an `message` recv by leader and follower. + // In the follower path, we have to skip the `prepare_for_persistance` path, just append to journal. match header.operation { Operation::SendMessages => { - let body = message.body_bytes(); - self.append_send_messages_to_journal(namespace, body.as_ref()) + self.append_send_messages_to_journal(namespace, message) .await; debug!( replica = consensus.replica(), @@ -598,12 +581,7 @@ where } } - async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, body: &[u8]) { - let batch = Self::batch_from_body(body); - self.append_messages_to_journal(namespace, batch).await; - } - - /// Append a batch to a partition's journal with offset assignment. + /// Append a prepare message to a partition's journal with offset assignment. /// /// Updates `segment.current_position` (logical position for indexing) but /// not `segment.end_offset` or `segment.end_timestamp` (committed state). @@ -611,15 +589,15 @@ where /// /// Uses `dirty_offset` for offset assignment so that multiple prepares /// can be pipelined before any commit. - async fn append_messages_to_journal( + async fn append_send_messages_to_journal( &self, namespace: &IggyNamespace, - batch: IggyMessagesBatchMut, + message: Message, ) { let partition = self .get_mut_by_ns(namespace) - .expect("append_messages_to_journal: partition not found for namespace"); - let _ = partition.append_messages(batch).await; + .expect("append_send_messages_to_journal: partition not found for namespace"); + let _ = partition.append_messages(message).await; } /// Replicate a prepare message to the next replica in the chain. diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index ab281e940d..8b569537af 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,29 +15,26 @@ // specific language governing permissions and limitations // under the License. -use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet}; +use bytes::Bytes; +use iggy_common::{ + IggyMessagesBatchMut, IggyMessagesBatchSet, + header::{Operation, PrepareHeader}, + message::Message, +}; use journal::{Journal, Storage}; -use std::cell::UnsafeCell; +use std::{ + cell::UnsafeCell, + collections::{BTreeMap, HashMap}, +}; -// TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. -/// No-op storage backend for the in-memory partition journal. -#[derive(Debug)] -pub struct Noop; - -impl Storage for Noop { - type Buffer = (); - - async fn write(&self, _buf: ()) -> usize { - 0 - } - - async fn read(&self, _offset: usize, _buffer: ()) {} -} +const ZERO_LEN: usize = 0; /// Lookup key for querying messages from the journal. #[derive(Debug, Clone, Copy)] pub enum MessageLookup { + #[allow(dead_code)] Offset { offset: u64, count: u32 }, + #[allow(dead_code)] Timestamp { timestamp: u64, count: u32 }, } @@ -49,89 +46,313 @@ impl std::ops::Deref for MessageLookup { } } -/// In-memory journal that accumulates message batches as an `IggyMessagesBatchSet`. -/// -/// This is a pure storage layer — it holds batches and supports lookups via -/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives -/// outside the journal in the `SegmentedLog`'s `JournalInfo`. -/// -/// Uses `UnsafeCell` for interior mutability, matching the single-threaded -/// per-shard execution model. -pub struct PartitionJournal { - batch_set: UnsafeCell, +#[allow(dead_code)] +pub trait QueryableJournal: Journal +where + S: Storage, +{ + type Query; + + fn get(&self, query: &Self::Query) -> impl Future>; +} + +#[derive(Debug, Default)] +pub struct PartitionJournalMemStorage { + entries: UnsafeCell>, + op_to_index: UnsafeCell>, +} + +impl Storage for PartitionJournalMemStorage { + type Buffer = Bytes; + + async fn write(&self, buf: Self::Buffer) -> usize { + let op = Message::::from_bytes(buf.clone()) + .ok() + .map(|message| message.header().op); + + let entries = unsafe { &mut *self.entries.get() }; + let index = entries.len(); + entries.push(buf.clone()); + + if let Some(op) = op { + let op_to_index = unsafe { &mut *self.op_to_index.get() }; + op_to_index.insert(op, index); + } + + buf.len() + } + + async fn read(&self, offset: usize, _len: usize) -> Self::Buffer { + let op = offset as u64; + let Some(index) = ({ + let op_to_index = unsafe { &*self.op_to_index.get() }; + op_to_index.get(&op).copied() + }) else { + return Bytes::new(); + }; + + let entries = unsafe { &*self.entries.get() }; + entries.get(index).cloned().unwrap_or_default() + } +} + +pub struct PartitionJournal +where + S: Storage, +{ + message_offset_to_op: UnsafeCell>, + timestamp_to_op: UnsafeCell>, + headers: UnsafeCell>, + inner: UnsafeCell>, } -impl PartitionJournal { - pub fn new() -> Self { +impl Default for PartitionJournal +where + S: Storage + Default, +{ + fn default() -> Self { Self { - batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()), + message_offset_to_op: UnsafeCell::new(BTreeMap::new()), + timestamp_to_op: UnsafeCell::new(BTreeMap::new()), + headers: UnsafeCell::new(Vec::new()), + inner: UnsafeCell::new(JournalInner { + storage: S::default(), + }), } } +} + +impl std::fmt::Debug for PartitionJournal +where + S: Storage, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartitionJournal2Impl").finish() + } +} + +struct JournalInner +where + S: Storage, +{ + storage: S, +} + +impl PartitionJournalMemStorage { + fn drain(&self) -> Vec { + let entries = unsafe { &mut *self.entries.get() }; + let drained = std::mem::take(entries); + let op_to_index = unsafe { &mut *self.op_to_index.get() }; + op_to_index.clear(); + drained + } - /// Drain all accumulated batches, returning the batch set. + fn is_empty(&self) -> bool { + let entries = unsafe { &*self.entries.get() }; + entries.is_empty() + } +} + +impl PartitionJournal { + /// Drain all accumulated batches, matching the legacy `PartitionJournal` API. pub fn commit(&self) -> IggyMessagesBatchSet { - let batch_set = unsafe { &mut *self.batch_set.get() }; - std::mem::take(batch_set) + let entries = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.drain() + }; + + let mut messages = Vec::with_capacity(entries.len()); + for bytes in entries { + if let Ok(message) = Message::from_bytes(bytes) { + messages.push(message); + } + } + + let headers = unsafe { &mut *self.headers.get() }; + headers.clear(); + let offsets = unsafe { &mut *self.message_offset_to_op.get() }; + offsets.clear(); + let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; + timestamps.clear(); + + Self::messages_to_batch_set(&messages) } pub fn is_empty(&self) -> bool { - let batch_set = unsafe { &*self.batch_set.get() }; - batch_set.is_empty() + let inner = unsafe { &*self.inner.get() }; + inner.storage.is_empty() } } -impl Default for PartitionJournal { - fn default() -> Self { - Self::new() +impl PartitionJournal +where + S: Storage, +{ + fn message_to_batch(message: &Message) -> Option { + if message.header().operation != Operation::SendMessages { + return None; + } + + crate::decode_send_messages_batch(message.body_bytes()) } -} -impl std::fmt::Debug for PartitionJournal { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PartitionJournal").finish() + fn messages_to_batch_set(messages: &[Message]) -> IggyMessagesBatchSet { + let mut batch_set = IggyMessagesBatchSet::empty(); + + for message in messages { + if let Some(batch) = Self::message_to_batch(message) { + batch_set.add_batch(batch); + } + } + + batch_set + } + + #[allow(dead_code)] + fn candidate_start_op(&self, query: &MessageLookup) -> Option { + match query { + MessageLookup::Offset { offset, .. } => { + let offsets = unsafe { &*self.message_offset_to_op.get() }; + offsets + .range(..=*offset) + .next_back() + .or_else(|| offsets.range(*offset..).next()) + .map(|(_, op)| *op) + } + MessageLookup::Timestamp { timestamp, .. } => { + let timestamps = unsafe { &*self.timestamp_to_op.get() }; + timestamps + .range(..=*timestamp) + .next_back() + .or_else(|| timestamps.range(*timestamp..).next()) + .map(|(_, op)| *op) + } + } + } + + async fn message_by_op(&self, op: u64) -> Option> { + let offset = usize::try_from(op).ok()?; + let bytes = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.read(offset, ZERO_LEN).await + }; + + Some( + Message::from_bytes(bytes) + .expect("partition.journal.storage.read: invalid bytes for message"), + ) + } + + #[allow(dead_code)] + async fn load_messages_from_storage( + &self, + start_op: u64, + count: u32, + ) -> Vec> { + if count == 0 { + return Vec::new(); + } + + let mut messages = Vec::new(); + let mut loaded_messages = 0u32; + let mut op = start_op; + + while loaded_messages < count { + let Some(message) = self.message_by_op(op).await else { + break; + }; + + if let Some(batch) = Self::message_to_batch(&message) { + loaded_messages = loaded_messages.saturating_add(batch.count()); + messages.push(message); + } + + op += 1; + } + + messages } } -impl Journal for PartitionJournal { - type Header = MessageLookup; - type Entry = IggyMessagesBatchMut; - type HeaderRef<'a> = MessageLookup; +impl Journal for PartitionJournal +where + S: Storage, +{ + type Header = PrepareHeader; + type Entry = Message; + #[rustfmt::skip] // Scuffed formatter. + type HeaderRef<'a> = &'a Self::Header where S: 'a; - fn header(&self, _idx: usize) -> Option> { - unreachable!("fn header: header lookup not supported for partition journal."); + fn header(&self, idx: usize) -> Option> { + let headers = unsafe { &mut *self.headers.get() }; + headers.get(idx) } - fn previous_header(&self, _header: &Self::Header) -> Option> { - unreachable!("fn previous_header: header lookup not supported for partition journal."); + fn previous_header(&self, header: &Self::Header) -> Option> { + if header.op == 0 { + return None; + } + + let prev_op = header.op - 1; + let headers = unsafe { &*self.headers.get() }; + headers.iter().find(|candidate| candidate.op == prev_op) } async fn append(&self, entry: Self::Entry) { - let batch_set = unsafe { &mut *self.batch_set.get() }; - batch_set.add_batch(entry); + let first_offset_and_timestamp = Self::message_to_batch(&entry) + .and_then(|batch| Some((batch.first_offset()?, batch.first_timestamp()?))); + + let header = *entry.header(); + let op = header.op; + + { + let headers = unsafe { &mut *self.headers.get() }; + headers.push(header); + }; + + let bytes = entry.into_inner(); + { + let inner = unsafe { &*self.inner.get() }; + let _ = inner.storage.write(bytes).await; + } + + if let Some((offset, timestamp)) = first_offset_and_timestamp { + let offsets = unsafe { &mut *self.message_offset_to_op.get() }; + offsets.insert(offset, op); + + let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; + timestamps.insert(timestamp, op); + } } async fn entry(&self, header: &Self::Header) -> Option { - // Entry lookups go through SegmentedLog which uses JournalInfo - // to construct MessageLookup headers. The actual query is done - // via get() below, not through the Journal trait. - let _ = header; - unreachable!("fn entry: use SegmentedLog::get() instead for partition journal lookups."); + self.message_by_op(header.op).await } } -impl PartitionJournal { - /// Query messages by offset or timestamp with count. - /// - /// This is called by `SegmentedLog` using `MessageLookup` headers - /// constructed from `JournalInfo`. - pub fn get(&self, header: &MessageLookup) -> Option { - let batch_set = unsafe { &*self.batch_set.get() }; - let result = match header { - MessageLookup::Offset { offset, count } => batch_set.get_by_offset(*offset, *count), +impl QueryableJournal for PartitionJournal +where + S: Storage, +{ + type Query = MessageLookup; + + async fn get(&self, query: &Self::Query) -> Option { + let query = *query; + let start_op = self.candidate_start_op(&query)?; + let count = match query { + MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { count, .. } => count, + }; + + let messages = self.load_messages_from_storage(start_op, count).await; + + let batch_set = Self::messages_to_batch_set(&messages); + let result = match query { + MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), MessageLookup::Timestamp { timestamp, count } => { - batch_set.get_by_timestamp(*timestamp, *count) + batch_set.get_by_timestamp(timestamp, count) } }; + if result.is_empty() { None } else { diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index d997d72b7a..1f18e32ed8 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -23,7 +23,11 @@ mod journal; mod log; mod types; -use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet}; +use bytes::{Bytes, BytesMut}; +use iggy_common::{ + INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, + PooledBuffer, header::PrepareHeader, message::Message, +}; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; pub use types::{ @@ -31,6 +35,33 @@ pub use types::{ SendMessagesResult, }; +pub(crate) fn decode_send_messages_batch(body: Bytes) -> Option { + // TODO: This very is bad, IGGY-114 Fixes this. + let mut body = body + .try_into_mut() + .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + + if body.len() < 4 { + return None; + } + + let count_bytes = body.split_to(4); + let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); + let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + + if body.len() < indexes_len { + return None; + } + + let indexes_bytes = body.split_to(indexes_len); + let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); + let messages = PooledBuffer::from(body); + + Some(IggyMessagesBatchMut::from_indexes_and_messages( + indexes, messages, + )) +} + /// Partition-level data plane operations. /// /// `send_messages` MUST only append to the partition journal (prepare phase), @@ -38,7 +69,7 @@ pub use types::{ pub trait Partition { fn append_messages( &mut self, - batch: IggyMessagesBatchMut, + message: Message, ) -> impl Future>; fn poll_messages( diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 12e78bb051..9f4d64b790 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -45,7 +45,8 @@ impl Storage for MemStorage { len } - async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> Self::Buffer { + async fn read(&self, offset: usize, len: usize) -> Self::Buffer { + let mut buffer = vec![0; len]; let data = self.data.borrow(); let end = offset + buffer.len(); if offset < data.len() && end <= data.len() { @@ -106,8 +107,7 @@ impl>> Journal for SimJournal { let header = headers.get(&header.op)?; let offset = *offsets.get(&header.op)?; - let buffer = vec![0; header.size as usize]; - let buffer = self.storage.read(offset, buffer).await; + let buffer = self.storage.read(offset, header.size as usize).await; let message = Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes should be valid"); Some(message)