feat: initial attempt at replicator integration#1049
feat: initial attempt at replicator integration#1049bmuddha wants to merge 3 commits intobmuddha/executor/block-trackerfrom
Conversation
📝 WalkthroughWalkthroughThis pull request introduces replication support across the magicblock codebase. The changes add magicblock-replicator as a new workspace member and integrate replication functionality into the core validator and API layers. Key additions include a new ReplicationService field in MagicValidator, replication error handling in the API, a TransactionIndex type for transaction positioning, and Tokio-based message channels for replication communication. The replication service implements lifecycle management with CancellationToken support, allowing graceful shutdown and mode transitions between Primary and Standby states. Supporting configuration updates expose remote replication endpoints through the ReplicationMode enum. Suggested reviewers
✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-replicator/src/service/mod.rs (1)
117-124:⚠️ Potential issue | 🟠 MajorReplace
.expect()with proper error handling.Per coding guidelines, using
.expect()in production code undermagicblock-*/**is a major issue.🔧 Proposed fix
pub fn spawn(self) -> JoinHandle<Result<()>> { std::thread::spawn(move || { - let runtime = Builder::new_current_thread() + let runtime = match Builder::new_current_thread() .thread_name("replication-service") .build() - .expect("Failed to build replication service runtime"); + { + Ok(rt) => rt, + Err(e) => return Err(Error::Internal(format!( + "Failed to build replication service runtime: {e}" + ))), + }; runtime.block_on(tokio::task::unconstrained(self.run())) }) }As per coding guidelines: "Treat any usage of
.unwrap()or.expect()in production Rust code as a MAJOR issue."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/service/mod.rs` around lines 117 - 124, The code currently calls Builder::new_current_thread().build().expect(...) inside the std::thread::spawn closure, which must be replaced with proper error handling; change the call in the closure so Builder::new_current_thread().thread_name("replication-service").build() is matched (or ?-propagated) and handle Err by logging the error (using the crate logger) and returning early from the thread instead of panicking, or propagate the Result out of the surrounding function if its signature allows; ensure runtime.block_on(tokio::task::unconstrained(self.run())) only runs when build() succeeded and reference the same symbols (std::thread::spawn, Builder::new_current_thread, runtime.block_on, self.run) when implementing the match/log/propagate fix.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-api/src/magic_validator.rs`:
- Around line 214-232: The code in the replication_service block uses
dispatch.replication_messages.take().expect(...); replace this .expect() with
proper error handling: check whether dispatch.replication_messages.take()
returns Some(messages_rx) and if not return an
Err(ApiError::FailedToStartReplicationService { detail: "..."} ) or an
appropriate existing ApiError variant; then pass messages_rx into
ReplicationService::new(...).await? as before. Add a
FailedToStartReplicationService variant to ApiError if one does not exist (or
reuse a suitable variant) and ensure the error message includes context like
"replication channel missing after init" to aid debugging.
In `@magicblock-config/src/config/validator.rs`:
- Around line 45-54: Add a small unit-test matrix that directly exercises
ReplicationMode::remote to assert the correct Option<Url> mapping for each
variant: ensure Self::Standalone yields None and that Self::StandBy(url) and
Self::ReplicatOnly(url) return Some(cloned_url) equal to the original; place
tests near the impl (e.g., in validator.rs tests module) and use a deterministic
Url value to compare equality so the behavior that gates broker startup in
magic_validator.rs is pinned down.
In `@magicblock-core/src/lib.rs`:
- Around line 2-3: The doc comment for the type alias TransactionIndex currently
implies a true per-slot ordinal but the processor still emits 0 for transaction
indexes; update the rustdoc for TransactionIndex to explicitly state it is
intended as an ordinal within a slot but is not yet implemented as a unique or
ordered per-transaction index (current processors may emit 0), and warn
downstream code (e.g. replication consumers) not to rely on uniqueness or
ordering until the planned ledger/processor rewrite implements proper indexing.
In `@magicblock-replicator/Cargo.toml`:
- Line 29: The crate's Cargo.toml should explicitly enable the tokio-util
runtime feature instead of relying on workspace unification; update the
dependency entry for tokio-util in this crate's Cargo.toml (the existing line
tokio-util = { workspace = true }) to include features = ["rt"] while keeping
workspace = true so the CancellationToken API used in src/service/context.rs and
src/service/mod.rs is available.
In `@magicblock-replicator/src/nats/mod.rs`:
- Around line 102-108: Add a unit test that locks down the routing table by
asserting that from_message returns the exact expected Subject for each Message
variant: construct one Message::Transaction, Message::Block and
Message::SuperBlock (using the same constructors/types used in the module), call
from_message for each, and assert equality against Subjects::transaction(),
Subjects::block(), and Subjects::superblock() respectively; place the test in
the same module (or tests mod) and name it clearly (e.g.,
test_from_message_routing_table) so future changes to from_message will fail the
test if any mapping drifts.
---
Outside diff comments:
In `@magicblock-replicator/src/service/mod.rs`:
- Around line 117-124: The code currently calls
Builder::new_current_thread().build().expect(...) inside the std::thread::spawn
closure, which must be replaced with proper error handling; change the call in
the closure so
Builder::new_current_thread().thread_name("replication-service").build() is
matched (or ?-propagated) and handle Err by logging the error (using the crate
logger) and returning early from the thread instead of panicking, or propagate
the Result out of the surrounding function if its signature allows; ensure
runtime.block_on(tokio::task::unconstrained(self.run())) only runs when build()
succeeded and reference the same symbols (std::thread::spawn,
Builder::new_current_thread, runtime.block_on, self.run) when implementing the
match/log/propagate fix.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5128b437-af66-4911-89a7-41020679810e
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
Cargo.tomlmagicblock-api/Cargo.tomlmagicblock-api/src/errors.rsmagicblock-api/src/magic_validator.rsmagicblock-config/src/config/validator.rsmagicblock-core/src/lib.rsmagicblock-core/src/link.rsmagicblock-core/src/link/replication.rsmagicblock-core/src/link/transactions.rsmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/lib.rsmagicblock-replicator/src/nats/mod.rsmagicblock-replicator/src/service/context.rsmagicblock-replicator/src/service/mod.rsmagicblock-replicator/src/service/primary.rsmagicblock-replicator/src/service/standby.rs
| let replication_service = | ||
| if let Some((broker, is_fresh_start)) = broker { | ||
| let messages_rx = dispatch.replication_messages.take().expect( | ||
| "replication channel should always exist after init", | ||
| ); | ||
| ReplicationService::new( | ||
| broker, | ||
| mode_tx.clone(), | ||
| accountsdb.clone(), | ||
| ledger.clone(), | ||
| dispatch.transaction_scheduler.clone(), | ||
| messages_rx, | ||
| token.clone(), | ||
| is_fresh_start, | ||
| ) | ||
| .await? | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
Replace .expect() with proper error handling.
Per coding guidelines, using .expect() in production code under magicblock-*/** is a major issue requiring proper error handling.
🔧 Proposed fix
let replication_service =
if let Some((broker, is_fresh_start)) = broker {
- let messages_rx = dispatch.replication_messages.take().expect(
- "replication channel should always exist after init",
- );
+ let messages_rx = dispatch.replication_messages.take().ok_or_else(|| {
+ ApiError::FailedToStartReplicationService(
+ "replication channel missing after init".to_string(),
+ )
+ })?;
ReplicationService::new(
broker,
mode_tx.clone(),
accountsdb.clone(),
ledger.clone(),
dispatch.transaction_scheduler.clone(),
messages_rx,
token.clone(),
is_fresh_start,
)
.await?
} else {
None
};Note: You may need to add a FailedToStartReplicationService variant to ApiError or use an existing appropriate variant. As per coding guidelines: "Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-api/src/magic_validator.rs` around lines 214 - 232, The code in
the replication_service block uses
dispatch.replication_messages.take().expect(...); replace this .expect() with
proper error handling: check whether dispatch.replication_messages.take()
returns Some(messages_rx) and if not return an
Err(ApiError::FailedToStartReplicationService { detail: "..."} ) or an
appropriate existing ApiError variant; then pass messages_rx into
ReplicationService::new(...).await? as before. Add a
FailedToStartReplicationService variant to ApiError if one does not exist (or
reuse a suitable variant) and ensure the error message includes context like
"replication channel missing after init" to aid debugging.
| impl ReplicationMode { | ||
| /// Returns the remote URL if this node participates in replication. | ||
| /// Returns `None` for `Standalone` mode. | ||
| pub fn remote(&self) -> Option<Url> { | ||
| match self { | ||
| Self::Standalone => None, | ||
| Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add direct coverage for the mode-to-remote mapping.
This helper now decides whether magicblock-api/src/magic_validator.rs:185-230 even attempts broker startup, so Standalone, StandBy, and ReplicatOnly are worth pinning down with a small unit-test matrix.
Minimal coverage example
+#[cfg(test)]
+mod tests {
+ use super::ReplicationMode;
+ use url::Url;
+
+ #[test]
+ fn remote_returns_expected_value_for_each_mode() {
+ let url = Url::parse("nats://localhost:4222").unwrap();
+
+ assert_eq!(ReplicationMode::Standalone.remote(), None);
+ assert_eq!(
+ ReplicationMode::StandBy(url.clone()).remote(),
+ Some(url.clone())
+ );
+ assert_eq!(
+ ReplicationMode::ReplicatOnly(url.clone()).remote(),
+ Some(url)
+ );
+ }
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| impl ReplicationMode { | |
| /// Returns the remote URL if this node participates in replication. | |
| /// Returns `None` for `Standalone` mode. | |
| pub fn remote(&self) -> Option<Url> { | |
| match self { | |
| Self::Standalone => None, | |
| Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()), | |
| } | |
| } | |
| } | |
| impl ReplicationMode { | |
| /// Returns the remote URL if this node participates in replication. | |
| /// Returns `None` for `Standalone` mode. | |
| pub fn remote(&self) -> Option<Url> { | |
| match self { | |
| Self::Standalone => None, | |
| Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()), | |
| } | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use super::ReplicationMode; | |
| use url::Url; | |
| #[test] | |
| fn remote_returns_expected_value_for_each_mode() { | |
| let url = Url::parse("nats://localhost:4222").unwrap(); | |
| assert_eq!(ReplicationMode::Standalone.remote(), None); | |
| assert_eq!( | |
| ReplicationMode::StandBy(url.clone()).remote(), | |
| Some(url.clone()) | |
| ); | |
| assert_eq!( | |
| ReplicationMode::ReplicatOnly(url.clone()).remote(), | |
| Some(url) | |
| ); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-config/src/config/validator.rs` around lines 45 - 54, Add a small
unit-test matrix that directly exercises ReplicationMode::remote to assert the
correct Option<Url> mapping for each variant: ensure Self::Standalone yields
None and that Self::StandBy(url) and Self::ReplicatOnly(url) return
Some(cloned_url) equal to the original; place tests near the impl (e.g., in
validator.rs tests module) and use a deterministic Url value to compare equality
so the behavior that gates broker startup in magic_validator.rs is pinned down.
| /// Ordinal position of a transaction within a slot. | ||
| pub type TransactionIndex = u32; |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Clarify that TransactionIndex is not a true per-slot ordinal yet.
The new rustdoc reads like callers can rely on ordering within a slot, but current processor paths still emit 0 for transaction indexes. Tighten the comment so downstream replication code does not assume uniqueness/order that is not implemented yet.
Possible doc tweak
-/// Ordinal position of a transaction within a slot.
+/// Logical transaction position within a slot.
+///
+/// Note: current processor paths may still emit `0` here; true per-slot
+/// ordinals will be introduced with the planned ledger rewrite.
pub type TransactionIndex = u32;Based on learnings: In magicblock-processor, transaction indexes were always set to 0 even before the changes in PR #596. The proper transaction indexing within slots will be addressed during the planned ledger rewrite.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Ordinal position of a transaction within a slot. | |
| pub type TransactionIndex = u32; | |
| /// Logical transaction position within a slot. | |
| /// | |
| /// Note: current processor paths may still emit `0` here; true per-slot | |
| /// ordinals will be introduced with the planned ledger rewrite. | |
| pub type TransactionIndex = u32; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-core/src/lib.rs` around lines 2 - 3, The doc comment for the type
alias TransactionIndex currently implies a true per-slot ordinal but the
processor still emits 0 for transaction indexes; update the rustdoc for
TransactionIndex to explicitly state it is intended as an ordinal within a slot
but is not yet implemented as a unique or ordered per-transaction index (current
processors may emit 0), and warn downstream code (e.g. replication consumers)
not to rely on uniqueness or ordering until the planned ledger/processor rewrite
implements proper indexing.
| "io-util", | ||
| "fs", | ||
| ] } | ||
| tokio-util = { workspace = true } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf 'tokio-util dependency declarations:\n'
rg -n --glob 'Cargo.toml' 'tokio-util\s*='
printf '\nCancellationToken usages:\n'
rg -n --type rust '\bCancellationToken\b|tokio_util::sync::CancellationToken'
printf '\nExplicit tokio-util rt/full feature enablement:\n'
rg -n --glob 'Cargo.toml' 'tokio-util\s*=\s*\{[^}]*features\s*=\s*\[[^]]*"(rt|full)"'Repository: magicblock-labs/magicblock-validator
Length of output: 8925
Declare the rt feature for tokio-util in this crate's manifest.
magicblock-replicator uses tokio_util::sync::CancellationToken extensively (in src/service/context.rs and src/service/mod.rs), but this API is gated behind the rt feature in tokio-util 0.7. The current declaration relies on another crate in the workspace (magicblock-validator-admin) to pull in the feature via Cargo unification, which is fragile and makes the dependency graph unclear.
Manifest fix
- tokio-util = { workspace = true }
+ tokio-util = { workspace = true, features = ["rt"] }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tokio-util = { workspace = true } | |
| tokio-util = { workspace = true, features = ["rt"] } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/Cargo.toml` at line 29, The crate's Cargo.toml should
explicitly enable the tokio-util runtime feature instead of relying on workspace
unification; update the dependency entry for tokio-util in this crate's
Cargo.toml (the existing line tokio-util = { workspace = true }) to include
features = ["rt"] while keeping workspace = true so the CancellationToken API
used in src/service/context.rs and src/service/mod.rs is available.
| pub(crate) fn from_message(msg: &Message) -> Subject { | ||
| match msg { | ||
| Message::Transaction(_) => Subjects::transaction(), | ||
| Message::Block(_) => Subjects::block(), | ||
| Message::SuperBlock(_) => Subjects::superblock(), | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Please lock this routing table down with a unit test.
A mismatch here will still produce a valid publish, just on the wrong JetStream subject. One assertion per Message variant would make this helper much safer to evolve.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-replicator/src/nats/mod.rs` around lines 102 - 108, Add a unit
test that locks down the routing table by asserting that from_message returns
the exact expected Subject for each Message variant: construct one
Message::Transaction, Message::Block and Message::SuperBlock (using the same
constructors/types used in the module), call from_message for each, and assert
equality against Subjects::transaction(), Subjects::block(), and
Subjects::superblock() respectively; place the test in the same module (or tests
mod) and name it clearly (e.g., test_from_message_routing_table) so future
changes to from_message will fail the test if any mapping drifts.

Summary
shared access
Compatibility
Checklist