Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/journal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ pub trait Storage {
type Buffer;

fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>;
// TODO: Get rid of the `len` usize, we need to do changes in `Simulator` in order to support that.
// Maybe we should go back to passing in the `Buffer` again, but I am not sure how to handle it in the `Partitions Journal`, since we use in-memory impl
// which extracts the buffer out of the `Vec<Message>` and we don't need to allocate a new buffer.
fn read(&self, offset: usize, len: usize) -> impl Future<Output = Self::Buffer>;
}

pub trait JournalHandle {
Expand Down
51 changes: 46 additions & 5 deletions core/partitions/src/iggy_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::journal::{Noop, PartitionJournal};
use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
use crate::log::SegmentedLog;
use crate::{AppendResult, Partition};
use crate::{AppendResult, Partition, decode_send_messages_batch};
use iggy_common::{
ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut,
IggyTimestamp, PartitionStats,
header::{Operation, PrepareHeader},
message::Message,
};
use journal::Journal as _;
use std::sync::Arc;
Expand All @@ -30,7 +32,8 @@ use tokio::sync::Mutex as TokioMutex;
// This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`.
#[derive(Debug)]
pub struct IggyPartition {
pub log: SegmentedLog<PartitionJournal, Noop>,
pub log:
SegmentedLog<PartitionJournal<PartitionJournalMemStorage>, PartitionJournalMemStorage>,
/// Committed offset — advanced only after quorum ack.
pub offset: Arc<AtomicU64>,
/// Dirty offset — advanced on every prepare (before commit).
Expand All @@ -46,6 +49,35 @@ pub struct IggyPartition {
}

impl IggyPartition {
fn prepare_message_from_batch(
mut header: PrepareHeader,
batch: &IggyMessagesBatchMut,
) -> Message<PrepareHeader> {
let indexes = batch.indexes();
let count = batch.count();
let body_len = 4 + indexes.len() + batch.len();
let total_size = std::mem::size_of::<PrepareHeader>() + body_len;
header.size = u32::try_from(total_size).expect("prepare_message_from_batch: batch size exceeds u32::MAX");

let message = Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| {
*new = header;
});

let mut bytes = message
.into_inner()
.try_into_mut()
.expect("prepare_message_from_batch: expected unique bytes buffer");
let header_size = std::mem::size_of::<PrepareHeader>();
bytes[header_size..header_size + 4].copy_from_slice(&count.to_le_bytes());
let mut position = header_size + 4;
bytes[position..position + indexes.len()].copy_from_slice(indexes);
position += indexes.len();
bytes[position..position + batch.len()].copy_from_slice(batch);

Message::<PrepareHeader>::from_bytes(bytes.freeze())
.expect("prepare_message_from_batch: invalid prepared message bytes")
}

pub fn new(stats: Arc<PartitionStats>) -> Self {
Self {
log: SegmentedLog::default(),
Expand All @@ -65,8 +97,16 @@ impl IggyPartition {
impl Partition for IggyPartition {
async fn append_messages(
&mut self,
mut batch: IggyMessagesBatchMut,
message: Message<PrepareHeader>,
) -> Result<AppendResult, IggyError> {
let header = *message.header();
if header.operation != Operation::SendMessages {
return Err(IggyError::CannotAppendMessage);
}

let mut batch = decode_send_messages_batch(message.body_bytes())
.ok_or(IggyError::CannotAppendMessage)?;

if batch.count() == 0 {
return Ok(AppendResult::new(0, 0, 0));
}
Expand Down Expand Up @@ -116,7 +156,8 @@ impl Partition for IggyPartition {
journal.info.end_timestamp = ts;
}

journal.inner.append(batch).await;
let message = Self::prepare_message_from_batch(header, &batch);
journal.inner.append(message).await;

Ok(AppendResult::new(
dirty_offset,
Expand Down
54 changes: 16 additions & 38 deletions core/partitions/src/iggy_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use consensus::{
};
use iggy_common::header::Command2;
use iggy_common::{
INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer,
Segment, SegmentStorage,
IggyByteSize, PartitionStats, Segment, SegmentStorage,
header::{
ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader,
},
Expand Down Expand Up @@ -355,13 +354,13 @@ where
}

async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) {
let header = message.header();
let header = *message.header();
let namespace = IggyNamespace::from_raw(header.namespace);
let consensus = self
.consensus()
.expect("on_replicate: consensus not initialized");

let current_op = match replicate_preflight(consensus, header) {
let current_op = match replicate_preflight(consensus, &header) {
Ok(current_op) => current_op,
Err(reason) => {
warn!(
Expand All @@ -372,7 +371,7 @@ where
}
};

let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
let is_old_prepare = fence_old_prepare_by_commit(consensus, &header);
if is_old_prepare {
warn!("received old prepare, not replicating");
} else {
Expand All @@ -387,9 +386,9 @@ where
// TODO: Figure out the flow of the partition operations.
// In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard.
// I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now!
self.apply_replicated_operation(&namespace, &message).await;
self.apply_replicated_operation(&namespace, message).await;

self.send_prepare_ok(header).await;
self.send_prepare_ok(&header).await;

if consensus.is_follower() {
self.commit_journal(namespace);
Expand Down Expand Up @@ -539,37 +538,21 @@ where
.register_namespace(ns);
}

// TODO: Move this elsewhere, also do not reallocate, we do reallocationg now becauise we use PooledBuffer for the batch body
// but `Bytes` for `Message` payload.
fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
assert!(body.len() >= 4, "prepare body too small for batch header");
let count = u32::from_le_bytes(body[0..4].try_into().unwrap());
let indexes_len = count as usize * INDEX_SIZE;
let indexes_end = 4 + indexes_len;
assert!(
body.len() >= indexes_end,
"prepare body too small for {count} indexes",
);

let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
let messages = PooledBuffer::from(&body[indexes_end..]);
IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
}

async fn apply_replicated_operation(
&self,
namespace: &IggyNamespace,
message: &Message<PrepareHeader>,
message: Message<PrepareHeader>,
) {
let consensus = self
.consensus()
.expect("apply_replicated_operation: consensus not initialized");
let header = message.header();
let header = *message.header();

// TODO: WE have to distinguish between an `message` recv by leader and follower.
// In the follower path, we have to skip the `prepare_for_persistance` path, just append to journal.
match header.operation {
Operation::SendMessages => {
let body = message.body_bytes();
self.append_send_messages_to_journal(namespace, body.as_ref())
self.append_send_messages_to_journal(namespace, message)
.await;
debug!(
replica = consensus.replica(),
Expand Down Expand Up @@ -598,28 +581,23 @@ where
}
}

async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, body: &[u8]) {
let batch = Self::batch_from_body(body);
self.append_messages_to_journal(namespace, batch).await;
}

/// Append a batch to a partition's journal with offset assignment.
/// Append a prepare message to a partition's journal with offset assignment.
///
/// Updates `segment.current_position` (logical position for indexing) but
/// not `segment.end_offset` or `segment.end_timestamp` (committed state).
/// Those are updated during commit.
///
/// Uses `dirty_offset` for offset assignment so that multiple prepares
/// can be pipelined before any commit.
async fn append_messages_to_journal(
async fn append_send_messages_to_journal(
&self,
namespace: &IggyNamespace,
batch: IggyMessagesBatchMut,
message: Message<PrepareHeader>,
) {
let partition = self
.get_mut_by_ns(namespace)
.expect("append_messages_to_journal: partition not found for namespace");
let _ = partition.append_messages(batch).await;
.expect("append_send_messages_to_journal: partition not found for namespace");
let _ = partition.append_messages(message).await;
}

/// Replicate a prepare message to the next replica in the chain.
Expand Down
Loading
Loading