Skip to content

refactor(journal): refactor partition journal utilize storage trait#2909

Open
numinnex wants to merge 6 commits intomasterfrom
partitions_refactor
Open

refactor(journal): refactor partition journal utilize storage trait#2909
numinnex wants to merge 6 commits intomasterfrom
partitions_refactor

Conversation

@numinnex
Copy link
Contributor

Refactor the PartitionJournal to use the Strorage trait as backing storage, rather than storing data inline.

@codecov
Copy link

codecov bot commented Mar 10, 2026

Codecov Report

❌ Patch coverage is 0% with 229 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.26%. Comparing base (fc39760) to head (9e65721).

Files with missing lines Patch % Lines
core/partitions/src/journal.rs 0.00% 160 Missing ⚠️
core/partitions/src/iggy_partition.rs 0.00% 34 Missing ⚠️
core/partitions/src/lib.rs 0.00% 20 Missing ⚠️
core/partitions/src/iggy_partitions.rs 0.00% 12 Missing ⚠️
core/simulator/src/deps.rs 0.00% 3 Missing ⚠️

❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2909      +/-   ##
============================================
- Coverage     70.39%   70.26%   -0.14%     
  Complexity      776      776              
============================================
  Files          1041     1041              
  Lines         86472    86644     +172     
  Branches      62742    62923     +181     
============================================
+ Hits          60873    60881       +8     
- Misses        23074    23232     +158     
- Partials       2525     2531       +6     
Flag Coverage Δ
csharp 67.43% <ø> (-0.17%) ⬇️
go 36.37% <ø> (ø)
java 56.26% <ø> (ø)
node 91.48% <ø> (-0.04%) ⬇️
python 81.43% <ø> (ø)
rust 70.46% <0.00%> (-0.19%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/simulator/src/deps.rs 0.00% <0.00%> (ø)
core/partitions/src/iggy_partitions.rs 0.00% <0.00%> (ø)
core/partitions/src/lib.rs 0.00% <0.00%> (ø)
core/partitions/src/iggy_partition.rs 0.00% <0.00%> (ø)
core/partitions/src/journal.rs 0.00% <0.00%> (ø)

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@numinnex numinnex force-pushed the partitions_refactor branch from b8e0bbe to 9e65721 Compare March 10, 2026 14:22
Comment on lines +233 to +244
async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> {
let offset = usize::try_from(op).ok()?;
let bytes = {
let inner = unsafe { &*self.inner.get() };
inner.storage.read(offset, ZERO_LEN).await
};

Some(
Message::from_bytes(bytes)
.expect("partition.journal.storage.read: invalid bytes for message"),
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message_by_op returns Option but panics for any op that isn't in storage.

PartitionJournalMemStorage::read returns Bytes::new() for unknown ops, Message::from_bytes on empty bytes returns Err, and the .expect() panics. This is directly reachable from load_messages_from_storage which walks ops with op += 1; since consensus ops are globally scoped, so gaps per-partition are expected and will trigger this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we shouldn't expect op to be monotonic linearly, as you've mentioned they are shared between partition messages and partition metadata operations, so the op_number could potentially be scattered.

Comment on lines 158 to +178
pub fn commit(&self) -> IggyMessagesBatchSet {
let batch_set = unsafe { &mut *self.batch_set.get() };
std::mem::take(batch_set)
let entries = {
let inner = unsafe { &*self.inner.get() };
inner.storage.drain()
};

let mut messages = Vec::with_capacity(entries.len());
for bytes in entries {
if let Ok(message) = Message::from_bytes(bytes) {
messages.push(message);
}
}

let headers = unsafe { &mut *self.headers.get() };
headers.clear();
let offsets = unsafe { &mut *self.message_offset_to_op.get() };
offsets.clear();
let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
timestamps.clear();

Self::messages_to_batch_set(&messages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit() drains storage destructively before parsing. If any Message::from_bytes call returns Err, the entry is dropped. The old code had no parsing step on this path, so this failure mode is new and unique to this refactor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this API is to drain entries from journal without validating (panic if validation fails), because all of the validation happens before any entry enters the journal.

I think the point is valid that in case of failed validation, we should make sure that we don't leave journal in inconsistent state, but the way we will achieve that is by crashing the server (if we fail parsing batch read from journal as the commit essentially is like reading from it, then something really bad happend)


#[derive(Debug, Default)]
pub struct PartitionJournalMemStorage {
entries: UnsafeCell<Vec<Bytes>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think entries can be made into UnsafeCell<Vec<u8>> and store a hashmap in PartitionJournal which maps op -> (offset, byte_length) inside the above Vec<u8>.

This would simplify impl read and write of PartitionJournalStorage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, the only reason why the op_to_index mapping even exists there, is because we have to handle the commit path which resets the storage offset, therefore our op_number != storage_offset. I will move that translation to the PartitionJournal rather than having it in storage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants