Skip to content

feat: initial attempt at replicator integration#1049

Draft
bmuddha wants to merge 3 commits intobmuddha/executor/block-trackerfrom
bmuddha/replicator/integration
Draft

feat: initial attempt at replicator integration#1049
bmuddha wants to merge 3 commits intobmuddha/executor/block-trackerfrom
bmuddha/replicator/integration

Conversation

@bmuddha
Copy link
Collaborator

@bmuddha bmuddha commented Mar 13, 2026

Summary

  • Integrate ReplicationService into MagicValidator startup and shutdown lifecycle
    • Add graceful shutdown support via CancellationToken throughout the replication service
    • Move replication protocol types (Message, Transaction, Block, SuperBlock) from magicblock-replicator/proto.rs to magicblock-core/link/replication.rs for
      shared access

Compatibility

  • No breaking changes

Checklist

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 13, 2026

📝 Walkthrough

Walkthrough

This pull request introduces replication support across the magicblock codebase. The changes add magicblock-replicator as a new workspace member and integrate replication functionality into the core validator and API layers. Key additions include a new ReplicationService field in MagicValidator, replication error handling in the API, a TransactionIndex type for transaction positioning, and Tokio-based message channels for replication communication. The replication service implements lifecycle management with CancellationToken support, allowing graceful shutdown and mode transitions between Primary and Standby states. Supporting configuration updates expose remote replication endpoints through the ReplicationMode enum.

Suggested reviewers

  • thlorenz
  • GabrielePicco
  • Dodecahedr0x
✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bmuddha/replicator/integration
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Collaborator Author

bmuddha commented Mar 13, 2026

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
magicblock-replicator/src/service/mod.rs (1)

117-124: ⚠️ Potential issue | 🟠 Major

Replace .expect() with proper error handling.

Per coding guidelines, using .expect() in production code under magicblock-*/** is a major issue.

🔧 Proposed fix
 pub fn spawn(self) -> JoinHandle<Result<()>> {
     std::thread::spawn(move || {
-        let runtime = Builder::new_current_thread()
+        let runtime = match Builder::new_current_thread()
             .thread_name("replication-service")
             .build()
-            .expect("Failed to build replication service runtime");
+        {
+            Ok(rt) => rt,
+            Err(e) => return Err(Error::Internal(format!(
+                "Failed to build replication service runtime: {e}"
+            ))),
+        };

         runtime.block_on(tokio::task::unconstrained(self.run()))
     })
 }

As per coding guidelines: "Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/service/mod.rs` around lines 117 - 124, The code
currently calls Builder::new_current_thread().build().expect(...) inside the
std::thread::spawn closure, which must be replaced with proper error handling;
change the call in the closure so
Builder::new_current_thread().thread_name("replication-service").build() is
matched (or ?-propagated) and handle Err by logging the error (using the crate
logger) and returning early from the thread instead of panicking, or propagate
the Result out of the surrounding function if its signature allows; ensure
runtime.block_on(tokio::task::unconstrained(self.run())) only runs when build()
succeeded and reference the same symbols (std::thread::spawn,
Builder::new_current_thread, runtime.block_on, self.run) when implementing the
match/log/propagate fix.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-api/src/magic_validator.rs`:
- Around line 214-232: The code in the replication_service block uses
dispatch.replication_messages.take().expect(...); replace this .expect() with
proper error handling: check whether dispatch.replication_messages.take()
returns Some(messages_rx) and if not return an
Err(ApiError::FailedToStartReplicationService { detail: "..."} ) or an
appropriate existing ApiError variant; then pass messages_rx into
ReplicationService::new(...).await? as before. Add a
FailedToStartReplicationService variant to ApiError if one does not exist (or
reuse a suitable variant) and ensure the error message includes context like
"replication channel missing after init" to aid debugging.

In `@magicblock-config/src/config/validator.rs`:
- Around line 45-54: Add a small unit-test matrix that directly exercises
ReplicationMode::remote to assert the correct Option<Url> mapping for each
variant: ensure Self::Standalone yields None and that Self::StandBy(url) and
Self::ReplicatOnly(url) return Some(cloned_url) equal to the original; place
tests near the impl (e.g., in validator.rs tests module) and use a deterministic
Url value to compare equality so the behavior that gates broker startup in
magic_validator.rs is pinned down.

In `@magicblock-core/src/lib.rs`:
- Around line 2-3: The doc comment for the type alias TransactionIndex currently
implies a true per-slot ordinal but the processor still emits 0 for transaction
indexes; update the rustdoc for TransactionIndex to explicitly state it is
intended as an ordinal within a slot but is not yet implemented as a unique or
ordered per-transaction index (current processors may emit 0), and warn
downstream code (e.g. replication consumers) not to rely on uniqueness or
ordering until the planned ledger/processor rewrite implements proper indexing.

In `@magicblock-replicator/Cargo.toml`:
- Line 29: The crate's Cargo.toml should explicitly enable the tokio-util
runtime feature instead of relying on workspace unification; update the
dependency entry for tokio-util in this crate's Cargo.toml (the existing line
tokio-util = { workspace = true }) to include features = ["rt"] while keeping
workspace = true so the CancellationToken API used in src/service/context.rs and
src/service/mod.rs is available.

In `@magicblock-replicator/src/nats/mod.rs`:
- Around line 102-108: Add a unit test that locks down the routing table by
asserting that from_message returns the exact expected Subject for each Message
variant: construct one Message::Transaction, Message::Block and
Message::SuperBlock (using the same constructors/types used in the module), call
from_message for each, and assert equality against Subjects::transaction(),
Subjects::block(), and Subjects::superblock() respectively; place the test in
the same module (or tests mod) and name it clearly (e.g.,
test_from_message_routing_table) so future changes to from_message will fail the
test if any mapping drifts.

---

Outside diff comments:
In `@magicblock-replicator/src/service/mod.rs`:
- Around line 117-124: The code currently calls
Builder::new_current_thread().build().expect(...) inside the std::thread::spawn
closure, which must be replaced with proper error handling; change the call in
the closure so
Builder::new_current_thread().thread_name("replication-service").build() is
matched (or ?-propagated) and handle Err by logging the error (using the crate
logger) and returning early from the thread instead of panicking, or propagate
the Result out of the surrounding function if its signature allows; ensure
runtime.block_on(tokio::task::unconstrained(self.run())) only runs when build()
succeeded and reference the same symbols (std::thread::spawn,
Builder::new_current_thread, runtime.block_on, self.run) when implementing the
match/log/propagate fix.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5128b437-af66-4911-89a7-41020679810e

📥 Commits

Reviewing files that changed from the base of the PR and between 9743ae6 and 12c71c6.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (16)
  • Cargo.toml
  • magicblock-api/Cargo.toml
  • magicblock-api/src/errors.rs
  • magicblock-api/src/magic_validator.rs
  • magicblock-config/src/config/validator.rs
  • magicblock-core/src/lib.rs
  • magicblock-core/src/link.rs
  • magicblock-core/src/link/replication.rs
  • magicblock-core/src/link/transactions.rs
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/nats/mod.rs
  • magicblock-replicator/src/service/context.rs
  • magicblock-replicator/src/service/mod.rs
  • magicblock-replicator/src/service/primary.rs
  • magicblock-replicator/src/service/standby.rs

Comment on lines +214 to +232
let replication_service =
if let Some((broker, is_fresh_start)) = broker {
let messages_rx = dispatch.replication_messages.take().expect(
"replication channel should always exist after init",
);
ReplicationService::new(
broker,
mode_tx.clone(),
accountsdb.clone(),
ledger.clone(),
dispatch.transaction_scheduler.clone(),
messages_rx,
token.clone(),
is_fresh_start,
)
.await?
} else {
None
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replace .expect() with proper error handling.

Per coding guidelines, using .expect() in production code under magicblock-*/** is a major issue requiring proper error handling.

🔧 Proposed fix
 let replication_service =
     if let Some((broker, is_fresh_start)) = broker {
-        let messages_rx = dispatch.replication_messages.take().expect(
-            "replication channel should always exist after init",
-        );
+        let messages_rx = dispatch.replication_messages.take().ok_or_else(|| {
+            ApiError::FailedToStartReplicationService(
+                "replication channel missing after init".to_string(),
+            )
+        })?;
         ReplicationService::new(
             broker,
             mode_tx.clone(),
             accountsdb.clone(),
             ledger.clone(),
             dispatch.transaction_scheduler.clone(),
             messages_rx,
             token.clone(),
             is_fresh_start,
         )
         .await?
     } else {
         None
     };

Note: You may need to add a FailedToStartReplicationService variant to ApiError or use an existing appropriate variant. As per coding guidelines: "Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-api/src/magic_validator.rs` around lines 214 - 232, The code in
the replication_service block uses
dispatch.replication_messages.take().expect(...); replace this .expect() with
proper error handling: check whether dispatch.replication_messages.take()
returns Some(messages_rx) and if not return an
Err(ApiError::FailedToStartReplicationService { detail: "..."} ) or an
appropriate existing ApiError variant; then pass messages_rx into
ReplicationService::new(...).await? as before. Add a
FailedToStartReplicationService variant to ApiError if one does not exist (or
reuse a suitable variant) and ensure the error message includes context like
"replication channel missing after init" to aid debugging.

Comment on lines +45 to +54
impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()),
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add direct coverage for the mode-to-remote mapping.

This helper now decides whether magicblock-api/src/magic_validator.rs:185-230 even attempts broker startup, so Standalone, StandBy, and ReplicatOnly are worth pinning down with a small unit-test matrix.

Minimal coverage example
+#[cfg(test)]
+mod tests {
+    use super::ReplicationMode;
+    use url::Url;
+
+    #[test]
+    fn remote_returns_expected_value_for_each_mode() {
+        let url = Url::parse("nats://localhost:4222").unwrap();
+
+        assert_eq!(ReplicationMode::Standalone.remote(), None);
+        assert_eq!(
+            ReplicationMode::StandBy(url.clone()).remote(),
+            Some(url.clone())
+        );
+        assert_eq!(
+            ReplicationMode::ReplicatOnly(url.clone()).remote(),
+            Some(url)
+        );
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()),
}
}
}
impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()),
}
}
#[cfg(test)]
mod tests {
use super::ReplicationMode;
use url::Url;
#[test]
fn remote_returns_expected_value_for_each_mode() {
let url = Url::parse("nats://localhost:4222").unwrap();
assert_eq!(ReplicationMode::Standalone.remote(), None);
assert_eq!(
ReplicationMode::StandBy(url.clone()).remote(),
Some(url.clone())
);
assert_eq!(
ReplicationMode::ReplicatOnly(url.clone()).remote(),
Some(url)
);
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-config/src/config/validator.rs` around lines 45 - 54, Add a small
unit-test matrix that directly exercises ReplicationMode::remote to assert the
correct Option<Url> mapping for each variant: ensure Self::Standalone yields
None and that Self::StandBy(url) and Self::ReplicatOnly(url) return
Some(cloned_url) equal to the original; place tests near the impl (e.g., in
validator.rs tests module) and use a deterministic Url value to compare equality
so the behavior that gates broker startup in magic_validator.rs is pinned down.

Comment on lines +2 to +3
/// Ordinal position of a transaction within a slot.
pub type TransactionIndex = u32;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Clarify that TransactionIndex is not a true per-slot ordinal yet.

The new rustdoc reads like callers can rely on ordering within a slot, but current processor paths still emit 0 for transaction indexes. Tighten the comment so downstream replication code does not assume uniqueness/order that is not implemented yet.

Possible doc tweak
-/// Ordinal position of a transaction within a slot.
+/// Logical transaction position within a slot.
+///
+/// Note: current processor paths may still emit `0` here; true per-slot
+/// ordinals will be introduced with the planned ledger rewrite.
 pub type TransactionIndex = u32;

Based on learnings: In magicblock-processor, transaction indexes were always set to 0 even before the changes in PR #596. The proper transaction indexing within slots will be addressed during the planned ledger rewrite.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Ordinal position of a transaction within a slot.
pub type TransactionIndex = u32;
/// Logical transaction position within a slot.
///
/// Note: current processor paths may still emit `0` here; true per-slot
/// ordinals will be introduced with the planned ledger rewrite.
pub type TransactionIndex = u32;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-core/src/lib.rs` around lines 2 - 3, The doc comment for the type
alias TransactionIndex currently implies a true per-slot ordinal but the
processor still emits 0 for transaction indexes; update the rustdoc for
TransactionIndex to explicitly state it is intended as an ordinal within a slot
but is not yet implemented as a unique or ordered per-transaction index (current
processors may emit 0), and warn downstream code (e.g. replication consumers)
not to rely on uniqueness or ordering until the planned ledger/processor rewrite
implements proper indexing.

"io-util",
"fs",
] }
tokio-util = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf 'tokio-util dependency declarations:\n'
rg -n --glob 'Cargo.toml' 'tokio-util\s*='

printf '\nCancellationToken usages:\n'
rg -n --type rust '\bCancellationToken\b|tokio_util::sync::CancellationToken'

printf '\nExplicit tokio-util rt/full feature enablement:\n'
rg -n --glob 'Cargo.toml' 'tokio-util\s*=\s*\{[^}]*features\s*=\s*\[[^]]*"(rt|full)"'

Repository: magicblock-labs/magicblock-validator

Length of output: 8925


Declare the rt feature for tokio-util in this crate's manifest.

magicblock-replicator uses tokio_util::sync::CancellationToken extensively (in src/service/context.rs and src/service/mod.rs), but this API is gated behind the rt feature in tokio-util 0.7. The current declaration relies on another crate in the workspace (magicblock-validator-admin) to pull in the feature via Cargo unification, which is fragile and makes the dependency graph unclear.

Manifest fix
- tokio-util = { workspace = true }
+ tokio-util = { workspace = true, features = ["rt"] }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/Cargo.toml` at line 29, The crate's Cargo.toml should
explicitly enable the tokio-util runtime feature instead of relying on workspace
unification; update the dependency entry for tokio-util in this crate's
Cargo.toml (the existing line tokio-util = { workspace = true }) to include
features = ["rt"] while keeping workspace = true so the CancellationToken API
used in src/service/context.rs and src/service/mod.rs is available.

Comment on lines +102 to +108
pub(crate) fn from_message(msg: &Message) -> Subject {
match msg {
Message::Transaction(_) => Subjects::transaction(),
Message::Block(_) => Subjects::block(),
Message::SuperBlock(_) => Subjects::superblock(),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Please lock this routing table down with a unit test.

A mismatch here will still produce a valid publish, just on the wrong JetStream subject. One assertion per Message variant would make this helper much safer to evolve.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/nats/mod.rs` around lines 102 - 108, Add a unit
test that locks down the routing table by asserting that from_message returns
the exact expected Subject for each Message variant: construct one
Message::Transaction, Message::Block and Message::SuperBlock (using the same
constructors/types used in the module), call from_message for each, and assert
equality against Subjects::transaction(), Subjects::block(), and
Subjects::superblock() respectively; place the test in the same module (or tests
mod) and name it clearly (e.g., test_from_message_routing_table) so future
changes to from_message will fail the test if any mapping drifts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant