feat(metadata): Persistent WAL journal with recovery and compaction#2916
feat(metadata): Persistent WAL journal with recovery and compaction#2916krishvishal wants to merge 10 commits intoapache:masterfrom
Conversation
8162f81 to
f31d9f2
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2916 +/- ##
============================================
- Coverage 70.27% 70.25% -0.03%
Complexity 862 862
============================================
Files 1028 1032 +4
Lines 85279 85970 +691
Branches 62656 63356 +700
============================================
+ Hits 59932 60398 +466
- Misses 22833 23014 +181
- Partials 2514 2558 +44
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
The |
hmm maybe you can try a blanket super-trait alias like |
pollution. Now the requirement is captured at the creation of IggyMetaData. - Fix review comments
core/journal/src/file_storage.rs
Outdated
|
|
||
| /// File-backed storage implementing the `Storage` trait. | ||
| pub struct FileStorage { | ||
| file: RefCell<compio::fs::File>, |
There was a problem hiding this comment.
You cannot use RefCell together with components that have async methods.
core/journal/src/file_storage.rs
Outdated
| path: PathBuf, | ||
| } | ||
|
|
||
| #[allow(clippy::future_not_send, clippy::await_holding_refcell_ref)] |
There was a problem hiding this comment.
Let's not silence the await_holding_refcell_ref, use an UnsafeCell instead of RefCell
|
|
||
| /// Advance the snapshot watermark so entries at or below `op` may be | ||
| /// evicted from the journal's in-memory index. The default is a no-op | ||
| /// for journals that do not require this watermark. | ||
| fn set_snapshot_op(&self, _op: u64) {} | ||
|
|
||
| /// Number of entries that can be appended before the journal would need | ||
| /// to evict un-snapshotted slots. Returns `None` for journals that don't persist to disk. | ||
| fn remaining_capacity(&self) -> Option<usize> { | ||
| None | ||
| } | ||
|
|
||
| /// Remove snapshotted entries from the WAL to reclaim disk space. | ||
| /// The default is a no-op for journals that do not persist to disk. | ||
| /// | ||
| /// # Errors | ||
| /// Returns an I/O error if compaction fails. | ||
| fn compact(&self) -> impl Future<Output = io::Result<()>> { | ||
| async { Ok(()) } | ||
| } |
There was a problem hiding this comment.
I think, I'd prefer the journal to have some sort of drain method that allows to "extract" range of items, simiarly to how Vec::drain(begin..end) works. This way we do not hack some apis on the interface just to cover an edge case, but we create a general purpose API, that can be used to shrink the journal and we handle the watermark outside of the Journal.
And yeah an Stream iter would be perfect, but since AsyncIterator is still unstable and it's probably going to replace the Stream trait, we can return no-async drain iterator and do the disk read for the entire range in one go.
There was a problem hiding this comment.
I agree that drain API is much cleaner. One thing to consider is, drain would read and deserialize all removed entries to return them to the caller, but the main consumer today (checkpoint) doesn't need the returned entries, it just wants them removed from WAL. What do we do considering wasted the deserialization cost?
There was a problem hiding this comment.
Also the ops in the journal aren't necessarily in a contiguous range. There can be gaps from pipelined prepares arriving out of order, slots overwritten when a new op lands on the same index. So drain(begin..end) is semantically not suitable here.
core/journal/src/lib.rs
Outdated
|
|
||
| fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>; | ||
| fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>; | ||
| fn write(&self, buf: Self::Buffer) -> impl Future<Output = io::Result<usize>>; |
There was a problem hiding this comment.
I was thinking, since I've added offset to the read method, shouldn't we rename this method to write_at - fn write_at(&self, offset: usize, buf: Self::Buffer)and create on top of that fn append(&self, buf: Self::Buffer)`. What do you think ? I am wondering which API would serve us better from the perspective of implementing an InMemory storage both for server usage, aswell as simulator.
There was a problem hiding this comment.
I think write_at/read_at APIs make more sense here. Because for InMemory storage reads/writes are just index ops.
core/metadata/src/impls/metadata.rs
Outdated
| let mut file = fs::File::create(&tmp_path)?; | ||
| file.write_all(&encoded)?; | ||
| file.sync_all()?; | ||
| drop(file); | ||
|
|
||
| fs::rename(&tmp_path, path)?; | ||
|
|
||
| // Fsync the parent directory to ensure the rename is durable. | ||
| if let Some(parent) = path.parent() { | ||
| let dir = fs::File::open(parent)?; | ||
| dir.sync_all()?; | ||
| } | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
I think we should handle those errors inline here, or atleast convert them to error types that would represent different stages of snapshot persistance, as depending on the stage, we can be left in a broken state that is still recoverable. For example if we'd to fail in the rename stage, we can safely retry that operation, but if we'd to fail in the line 70-71, we have to start from scratch, as the write_all could partially write the buffer and then fail, same for sync_all.
I also think we should move to O_SYNC and O_DSYNC for the metadata, but that can wait, as I am working now on adding support to O_DIRECT for partitions and this requires quite a few changes to the Buffer that we use, as Bytes/BytesMut does not allow to allocate aligned buffers.
| pub fn load(path: &Path) -> Result<Self, SnapshotError> { | ||
| let data = std::fs::read(path)?; | ||
|
|
||
| // TODO: when checksum is added we need to check |
There was a problem hiding this comment.
create a linear issue for that.
| pub snapshot: Option<S>, | ||
| /// State machine - lives on all shards | ||
| pub mux_stm: M, | ||
| /// Root data directory, used by checkpoint to persist snapshots. |
There was a problem hiding this comment.
Maybe it's good idea to store some sort of snapshot_coordinator there struct, that would hide those details away ?
| /// # Errors | ||
| /// Returns `SnapshotError` if snapshotting, persistence, or compaction fails. | ||
| #[allow(clippy::future_not_send)] | ||
| pub async fn checkpoint(&self, data_dir: &Path, last_op: u64) -> Result<(), SnapshotError> |
There was a problem hiding this comment.
If we'd go with Comment on line R143, then this would be part of that coordinator I've mentioned on R143
Which issue does this PR close?
Closes #2915
Summary
FileStorage: file-backedStorageimpl with positional reads, appends, truncate, fsync.MetadataJournal: append-only WAL indexed by a ring buffer (op % SLOT_COUNT). Crash recovery scans forward and truncates partial tail entries. Compaction atomically rewrites the WAL keeping only entries above the snapshot watermark.recover(): startup recovery pipeline that loads the latest snapshot, opens the WAL, and replays entries past the snapshot sequence number through the state machine.checkpoint()onIggyMetadata: persists a snapshot then advances the journal watermark and compacts.Journal/Storagetraits:io::Resultreturn types,set_snapshot_op,remaining_capacity,compactdefault methods.