refactor(journal): refactor partition journal utilize storage trait#2909
refactor(journal): refactor partition journal utilize storage trait#2909
Conversation
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #2909 +/- ##
============================================
- Coverage 70.39% 70.26% -0.14%
Complexity 776 776
============================================
Files 1041 1041
Lines 86472 86644 +172
Branches 62742 62923 +181
============================================
+ Hits 60873 60881 +8
- Misses 23074 23232 +158
- Partials 2525 2531 +6
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
b8e0bbe to
9e65721
Compare
| async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> { | ||
| let offset = usize::try_from(op).ok()?; | ||
| let bytes = { | ||
| let inner = unsafe { &*self.inner.get() }; | ||
| inner.storage.read(offset, ZERO_LEN).await | ||
| }; | ||
|
|
||
| Some( | ||
| Message::from_bytes(bytes) | ||
| .expect("partition.journal.storage.read: invalid bytes for message"), | ||
| ) | ||
| } |
There was a problem hiding this comment.
message_by_op returns Option but panics for any op that isn't in storage.
PartitionJournalMemStorage::read returns Bytes::new() for unknown ops, Message::from_bytes on empty bytes returns Err, and the .expect() panics. This is directly reachable from load_messages_from_storage which walks ops with op += 1; since consensus ops are globally scoped, so gaps per-partition are expected and will trigger this.
There was a problem hiding this comment.
Good catch, we shouldn't expect op to be monotonic linearly, as you've mentioned they are shared between partition messages and partition metadata operations, so the op_number could potentially be scattered.
| pub fn commit(&self) -> IggyMessagesBatchSet { | ||
| let batch_set = unsafe { &mut *self.batch_set.get() }; | ||
| std::mem::take(batch_set) | ||
| let entries = { | ||
| let inner = unsafe { &*self.inner.get() }; | ||
| inner.storage.drain() | ||
| }; | ||
|
|
||
| let mut messages = Vec::with_capacity(entries.len()); | ||
| for bytes in entries { | ||
| if let Ok(message) = Message::from_bytes(bytes) { | ||
| messages.push(message); | ||
| } | ||
| } | ||
|
|
||
| let headers = unsafe { &mut *self.headers.get() }; | ||
| headers.clear(); | ||
| let offsets = unsafe { &mut *self.message_offset_to_op.get() }; | ||
| offsets.clear(); | ||
| let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; | ||
| timestamps.clear(); | ||
|
|
||
| Self::messages_to_batch_set(&messages) |
There was a problem hiding this comment.
commit() drains storage destructively before parsing. If any Message::from_bytes call returns Err, the entry is dropped. The old code had no parsing step on this path, so this failure mode is new and unique to this refactor.
There was a problem hiding this comment.
The goal of this API is to drain entries from journal without validating (panic if validation fails), because all of the validation happens before any entry enters the journal.
I think the point is valid that in case of failed validation, we should make sure that we don't leave journal in inconsistent state, but the way we will achieve that is by crashing the server (if we fail parsing batch read from journal as the commit essentially is like reading from it, then something really bad happend)
|
|
||
| #[derive(Debug, Default)] | ||
| pub struct PartitionJournalMemStorage { | ||
| entries: UnsafeCell<Vec<Bytes>>, |
There was a problem hiding this comment.
I think entries can be made into UnsafeCell<Vec<u8>> and store a hashmap in PartitionJournal which maps op -> (offset, byte_length) inside the above Vec<u8>.
This would simplify impl read and write of PartitionJournalStorage
There was a problem hiding this comment.
Yeah I agree, the only reason why the op_to_index mapping even exists there, is because we have to handle the commit path which resets the storage offset, therefore our op_number != storage_offset. I will move that translation to the PartitionJournal rather than having it in storage.
Refactor the
PartitionJournalto use theStroragetrait as backing storage, rather than storing data inline.