Skip to content

feat(metadata): Persistent WAL journal with recovery and compaction#2916

Open
krishvishal wants to merge 10 commits intoapache:masterfrom
krishvishal:reboot-state
Open

feat(metadata): Persistent WAL journal with recovery and compaction#2916
krishvishal wants to merge 10 commits intoapache:masterfrom
krishvishal:reboot-state

Conversation

@krishvishal
Copy link
Contributor

Which issue does this PR close?

Closes #2915

Summary

  • Add FileStorage: file-backed Storage impl with positional reads, appends, truncate, fsync.
  • Add MetadataJournal: append-only WAL indexed by a ring buffer (op % SLOT_COUNT). Crash recovery scans forward and truncates partial tail entries. Compaction atomically rewrites the WAL keeping only entries above the snapshot watermark.
  • Add recover(): startup recovery pipeline that loads the latest snapshot, opens the WAL, and replays entries past the snapshot sequence number through the state machine.
  • Add checkpoint() on IggyMetadata: persists a snapshot then advances the journal watermark and compacts.
  • Update Journal / Storage traits: io::Result return types, set_snapshot_op, remaining_capacity, compact default methods.

@codecov
Copy link

codecov bot commented Mar 11, 2026

Codecov Report

❌ Patch coverage is 64.51613% with 253 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.25%. Comparing base (3f651e0) to head (5a99456).

Files with missing lines Patch % Lines
core/metadata/src/impls/metadata.rs 17.77% 72 Missing and 2 partials ⚠️
core/journal/src/metadata_journal.rs 83.46% 43 Missing and 18 partials ⚠️
core/metadata/src/impls/recovery.rs 70.40% 31 Missing and 6 partials ⚠️
core/simulator/src/deps.rs 0.00% 21 Missing ⚠️
core/journal/src/file_storage.rs 70.58% 13 Missing and 7 partials ⚠️
core/metadata/src/stm/snapshot.rs 0.00% 14 Missing ⚠️
core/journal/src/lib.rs 0.00% 7 Missing ⚠️
core/partitions/src/journal.rs 0.00% 7 Missing ⚠️
core/simulator/src/replica.rs 0.00% 6 Missing ⚠️
core/partitions/src/iggy_partition.rs 0.00% 5 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2916      +/-   ##
============================================
- Coverage     70.27%   70.25%   -0.03%     
  Complexity      862      862              
============================================
  Files          1028     1032       +4     
  Lines         85279    85970     +691     
  Branches      62656    63356     +700     
============================================
+ Hits          59932    60398     +466     
- Misses        22833    23014     +181     
- Partials       2514     2558      +44     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 36.37% <ø> (ø)
java 59.87% <ø> (ø)
node 91.37% <ø> (-0.15%) ⬇️
python 81.43% <ø> (ø)
rust 70.62% <64.51%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/partitions/src/log.rs 0.00% <0.00%> (ø)
core/partitions/src/iggy_partition.rs 0.00% <0.00%> (ø)
core/simulator/src/replica.rs 0.00% <0.00%> (ø)
core/journal/src/lib.rs 0.00% <0.00%> (ø)
core/partitions/src/journal.rs 0.00% <0.00%> (ø)
core/metadata/src/stm/snapshot.rs 84.13% <0.00%> (-6.08%) ⬇️
core/journal/src/file_storage.rs 70.58% <70.58%> (ø)
core/simulator/src/deps.rs 0.00% <0.00%> (ø)
core/metadata/src/impls/recovery.rs 70.40% <70.40%> (ø)
core/journal/src/metadata_journal.rs 83.46% <83.46%> (ø)
... and 1 more

... and 11 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@krishvishal
Copy link
Contributor Author

The FillSnapShot trait bound has become a bit viral. I'm thinking of ways to reduce that.

@atharvalade
Copy link
Contributor

The FillSnapShot trait bound has become a bit viral. I'm thinking of ways to reduce that.

hmm maybe you can try a blanket super-trait alias like trait MetadataStm: StateMachine<...> + FillSnapshot<MetadataSnapshot> {} with a blanket impl. It won't reduce the actual propagation but cuts the noise from 4 lines to 1 at each use site. Longer term, pulling checkpoint into a separate component that owns the FillSnapshot concern behind a callback would remove the bound from IggyMetadata's M parameter entirely.

pollution. Now the requirement is captured at the creation of
IggyMetaData.

- Fix review comments
Copy link
Contributor

@atharvalade atharvalade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm


/// File-backed storage implementing the `Storage` trait.
pub struct FileStorage {
file: RefCell<compio::fs::File>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot use RefCell together with components that have async methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

path: PathBuf,
}

#[allow(clippy::future_not_send, clippy::await_holding_refcell_ref)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not silence the await_holding_refcell_ref, use an UnsafeCell instead of RefCell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +39 to +58

/// Advance the snapshot watermark so entries at or below `op` may be
/// evicted from the journal's in-memory index. The default is a no-op
/// for journals that do not require this watermark.
fn set_snapshot_op(&self, _op: u64) {}

/// Number of entries that can be appended before the journal would need
/// to evict un-snapshotted slots. Returns `None` for journals that don't persist to disk.
fn remaining_capacity(&self) -> Option<usize> {
None
}

/// Remove snapshotted entries from the WAL to reclaim disk space.
/// The default is a no-op for journals that do not persist to disk.
///
/// # Errors
/// Returns an I/O error if compaction fails.
fn compact(&self) -> impl Future<Output = io::Result<()>> {
async { Ok(()) }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, I'd prefer the journal to have some sort of drain method that allows to "extract" range of items, simiarly to how Vec::drain(begin..end) works. This way we do not hack some apis on the interface just to cover an edge case, but we create a general purpose API, that can be used to shrink the journal and we handle the watermark outside of the Journal.

And yeah an Stream iter would be perfect, but since AsyncIterator is still unstable and it's probably going to replace the Stream trait, we can return no-async drain iterator and do the disk read for the entire range in one go.

Copy link
Contributor Author

@krishvishal krishvishal Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that drain API is much cleaner. One thing to consider is, drain would read and deserialize all removed entries to return them to the caller, but the main consumer today (checkpoint) doesn't need the returned entries, it just wants them removed from WAL. What do we do considering wasted the deserialization cost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the ops in the journal aren't necessarily in a contiguous range. There can be gaps from pipelined prepares arriving out of order, slots overwritten when a new op lands on the same index. So drain(begin..end) is semantically not suitable here.


fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>;
fn write(&self, buf: Self::Buffer) -> impl Future<Output = io::Result<usize>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking, since I've added offset to the read method, shouldn't we rename this method to write_at - fn write_at(&self, offset: usize, buf: Self::Buffer)and create on top of that fn append(&self, buf: Self::Buffer)`. What do you think ? I am wondering which API would serve us better from the perspective of implementing an InMemory storage both for server usage, aswell as simulator.

Copy link
Contributor Author

@krishvishal krishvishal Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think write_at/read_at APIs make more sense here. Because for InMemory storage reads/writes are just index ops.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +69 to +82
let mut file = fs::File::create(&tmp_path)?;
file.write_all(&encoded)?;
file.sync_all()?;
drop(file);

fs::rename(&tmp_path, path)?;

// Fsync the parent directory to ensure the rename is durable.
if let Some(parent) = path.parent() {
let dir = fs::File::open(parent)?;
dir.sync_all()?;
}

Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should handle those errors inline here, or atleast convert them to error types that would represent different stages of snapshot persistance, as depending on the stage, we can be left in a broken state that is still recoverable. For example if we'd to fail in the rename stage, we can safely retry that operation, but if we'd to fail in the line 70-71, we have to start from scratch, as the write_all could partially write the buffer and then fail, same for sync_all.

I also think we should move to O_SYNC and O_DSYNC for the metadata, but that can wait, as I am working now on adding support to O_DIRECT for partitions and this requires quite a few changes to the Buffer that we use, as Bytes/BytesMut does not allow to allocate aligned buffers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

pub fn load(path: &Path) -> Result<Self, SnapshotError> {
let data = std::fs::read(path)?;

// TODO: when checksum is added we need to check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a linear issue for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

pub snapshot: Option<S>,
/// State machine - lives on all shards
pub mux_stm: M,
/// Root data directory, used by checkpoint to persist snapshots.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's good idea to store some sort of snapshot_coordinator there struct, that would hide those details away ?

/// # Errors
/// Returns `SnapshotError` if snapshotting, persistence, or compaction fails.
#[allow(clippy::future_not_send)]
pub async fn checkpoint(&self, data_dir: &Path, last_op: u64) -> Result<(), SnapshotError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we'd go with Comment on line R143, then this would be part of that coordinator I've mentioned on R143

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

State loading from disk during replica bootup

3 participants