Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ magicblock-magic-program-api = { path = "./magicblock-magic-program-api" }
magicblock-metrics = { path = "./magicblock-metrics" }
magicblock-processor = { path = "./magicblock-processor" }
magicblock-program = { path = "./programs/magicblock" }
magicblock-replicator = { path = "./magicblock-replicator" }
magicblock-rpc-client = { path = "./magicblock-rpc-client" }
magicblock-table-mania = { path = "./magicblock-table-mania" }
magicblock-task-scheduler = { path = "./magicblock-task-scheduler" }
Expand Down
9 changes: 3 additions & 6 deletions magicblock-aperture/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ use std::fmt::Debug;
use hyper::body::Bytes;
use json::Serialize;
use magicblock_core::{
link::{
accounts::LockedAccount,
transactions::{TransactionResult, TransactionStatus},
},
link::{accounts::LockedAccount, transactions::TransactionStatus},
Slot,
};
use solana_account::ReadableAccount;
use solana_account_decoder::{
encode_ui_account, UiAccountEncoding, UiDataSliceConfig,
};
use solana_pubkey::Pubkey;
use solana_transaction_error::TransactionError;
use solana_transaction_error::{TransactionError, TransactionResult};

use crate::{
requests::{params::SerdeSignature, payload::NotificationPayload},
Expand Down Expand Up @@ -110,7 +107,7 @@ impl Encoder for ProgramAccountEncoder {
pub(crate) struct TransactionResultEncoder;

impl Encoder for TransactionResultEncoder {
type Data = TransactionResult;
type Data = TransactionResult<()>;

fn encode(
&self,
Expand Down
5 changes: 4 additions & 1 deletion magicblock-aperture/src/requests/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ impl HttpDispatcher {
}

let txn = transaction.sanitize(sigverify)?;
Ok(WithEncoded { txn, encoded })
Ok(WithEncoded {
txn,
encoded: encoded.into(),
})
}

/// Ensures all accounts required for a transaction are present in the `AccountsDb`.
Expand Down
5 changes: 3 additions & 2 deletions magicblock-aperture/src/state/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use magicblock_core::{link::transactions::TransactionResult, Slot};
use magicblock_core::Slot;
use solana_signature::Signature;
use solana_transaction_error::TransactionResult;

use super::ExpiringCache;

Expand All @@ -18,5 +19,5 @@ pub(crate) struct SignatureResult {
/// The slot in which the transaction was processed.
pub slot: Slot,
/// The result of the transaction (e.g., success or an error).
pub result: TransactionResult,
pub result: TransactionResult<()>,
}
1 change: 1 addition & 0 deletions magicblock-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ magicblock-magic-program-api = { workspace = true }
magicblock-metrics = { workspace = true }
magicblock-processor = { workspace = true }
magicblock-program = { workspace = true }
magicblock-replicator = { workspace = true }
magicblock-task-scheduler = { workspace = true }
magicblock-validator-admin = { workspace = true }

Expand Down
3 changes: 3 additions & 0 deletions magicblock-api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ pub enum ApiError {
FailedToSanitizeTransaction(
#[from] solana_transaction_error::TransactionError,
),

#[error("Replication service failed: {0}")]
Replication(#[from] magicblock_replicator::Error),
}

impl From<magicblock_accounts::errors::AccountsError> for ApiError {
Expand Down
99 changes: 76 additions & 23 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use magicblock_program::{
validator::{self, validator_authority},
TransactionScheduler as ActionTransactionScheduler,
};
use magicblock_replicator::{nats::Broker, ReplicationService};
use magicblock_task_scheduler::{SchedulerDatabase, TaskSchedulerService};
use magicblock_validator_admin::claim_fees::ClaimFeesTask;
use mdp::state::{
Expand Down Expand Up @@ -122,6 +123,7 @@ pub struct MagicValidator {
ledger_truncator: LedgerTruncator,
slot_ticker: Option<tokio::task::JoinHandle<()>>,
committor_service: Option<Arc<CommittorService>>,
replication_service: Option<ReplicationService>,
scheduled_commits_processor: Option<Arc<ScheduledCommitsProcessorImpl>>,
chainlink: Arc<ChainlinkImpl>,
rpc_handle: thread::JoinHandle<()>,
Expand All @@ -132,6 +134,8 @@ pub struct MagicValidator {
claim_fees_task: ClaimFeesTask,
task_scheduler: Option<TaskSchedulerService>,
transaction_execution: thread::JoinHandle<()>,
replication_handle:
Option<thread::JoinHandle<magicblock_replicator::Result<()>>>,
mode_tx: Sender<SchedulerMode>,
is_standalone: bool,
}
Expand Down Expand Up @@ -174,8 +178,59 @@ impl MagicValidator {

let latest_block = ledger.latest_block().load();
let step_start = Instant::now();
let accountsdb =
let mut accountsdb =
AccountsDb::new(&config.accountsdb, &config.storage, last_slot)?;

// Mode switch channel for transitioning from StartingUp to Primary
// or Replica mode after ledger replay
let (mode_tx, mode_rx) = channel(1);
let is_standalone = matches!(
config.validator.replication_mode,
ReplicationMode::Standalone
);

// Connect to replication broker if configured.
// Returns (broker, is_fresh_start) where is_fresh_start indicates
// whether accountsdb was empty and may need a snapshot.
let broker =
if let Some(url) = config.validator.replication_mode.remote() {
let mut broker = Broker::connect(url).await?;
let is_fresh_start = accountsdb.slot() == 0;
// Fetch snapshot from primary if starting fresh
if is_fresh_start {
if let Some(snapshot) = broker.get_snapshot().await? {
accountsdb.insert_external_snapshot(
snapshot.slot,
&snapshot.data,
)?;
}
}
Some((broker, is_fresh_start))
} else {
None
};
let accountsdb = Arc::new(accountsdb);
let (mut dispatch, validator_channels) = link();

let replication_service =
if let Some((broker, is_fresh_start)) = broker {
let messages_rx = dispatch.replication_messages.take().expect(
"replication channel should always exist after init",
);
ReplicationService::new(
broker,
mode_tx.clone(),
accountsdb.clone(),
ledger.clone(),
dispatch.transaction_scheduler.clone(),
messages_rx,
token.clone(),
is_fresh_start,
)
.await?
} else {
None
};
Comment on lines +215 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replace .expect() with proper error handling.

Per coding guidelines, using .expect() in production code under magicblock-*/** is a major issue requiring proper error handling.

🔧 Proposed fix
 let replication_service =
     if let Some((broker, is_fresh_start)) = broker {
-        let messages_rx = dispatch.replication_messages.take().expect(
-            "replication channel should always exist after init",
-        );
+        let messages_rx = dispatch.replication_messages.take().ok_or_else(|| {
+            ApiError::FailedToStartReplicationService(
+                "replication channel missing after init".to_string(),
+            )
+        })?;
         ReplicationService::new(
             broker,
             mode_tx.clone(),
             accountsdb.clone(),
             ledger.clone(),
             dispatch.transaction_scheduler.clone(),
             messages_rx,
             token.clone(),
             is_fresh_start,
         )
         .await?
     } else {
         None
     };

Note: You may need to add a FailedToStartReplicationService variant to ApiError or use an existing appropriate variant. As per coding guidelines: "Treat any usage of .unwrap() or .expect() in production Rust code as a MAJOR issue."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-api/src/magic_validator.rs` around lines 214 - 232, The code in
the replication_service block uses
dispatch.replication_messages.take().expect(...); replace this .expect() with
proper error handling: check whether dispatch.replication_messages.take()
returns Some(messages_rx) and if not return an
Err(ApiError::FailedToStartReplicationService { detail: "..."} ) or an
appropriate existing ApiError variant; then pass messages_rx into
ReplicationService::new(...).await? as before. Add a
FailedToStartReplicationService variant to ApiError if one does not exist (or
reuse a suitable variant) and ensure the error message includes context like
"replication channel missing after init" to aid debugging.

log_timing("startup", "accountsdb_init", step_start);
for (pubkey, account) in genesis_config.accounts {
if accountsdb.get_account(&pubkey).is_some() {
Expand All @@ -198,8 +253,6 @@ impl MagicValidator {
let faucet_keypair =
funded_faucet(&accountsdb, ledger.ledger_path().as_path())?;

let accountsdb = Arc::new(accountsdb);

let step_start = Instant::now();
let metrics_service = magicblock_metrics::try_start_metrics_service(
config.metrics.address.0,
Expand All @@ -217,8 +270,6 @@ impl MagicValidator {
);
log_timing("startup", "system_metrics_ticker_start", step_start);

let (mut dispatch, validator_channels) = link();

let step_start = Instant::now();
let committor_service = Self::init_committor_service(&config).await?;
log_timing("startup", "committor_service_init", step_start);
Expand Down Expand Up @@ -262,13 +313,6 @@ impl MagicValidator {
validator::init_validator_authority(identity_keypair);
let base_fee = config.validator.basefee;

// Mode switch channel for transitioning from StartingUp to Primary
// or Replica mode after ledger replay
let (mode_tx, mode_rx) = channel(1);
let is_standalone = matches!(
config.validator.replication_mode,
ReplicationMode::Standalone
);
let txn_scheduler_state = TransactionSchedulerState {
accountsdb: accountsdb.clone(),
ledger: ledger.clone(),
Expand All @@ -277,6 +321,7 @@ impl MagicValidator {
account_update_tx: validator_channels.account_update,
environment: build_svm_env(&accountsdb, latest_block.blockhash, 0),
tasks_tx: validator_channels.tasks_service,
replication_tx: validator_channels.replication_messages,
is_auto_airdrop_lamports_enabled: config
.chainlink
.auto_airdrop_lamports
Expand Down Expand Up @@ -374,6 +419,7 @@ impl MagicValidator {
// NOTE: set during [Self::start]
slot_ticker: None,
committor_service,
replication_service,
scheduled_commits_processor,
chainlink,
token,
Expand All @@ -386,6 +432,7 @@ impl MagicValidator {
block_udpate_tx: validator_channels.block_update,
task_scheduler: Some(task_scheduler),
transaction_execution,
replication_handle: None,
mode_tx,
is_standalone,
})
Expand Down Expand Up @@ -830,17 +877,18 @@ impl MagicValidator {
// the correct coordination mode:
// - Standalone validators transition to Primary mode
// - StandBy/ReplicatOnly validators transition to Replica mode
let target = if self.is_standalone {
SchedulerMode::Primary
} else {
SchedulerMode::Replica
};
self.mode_tx.try_send(target).map_err(|e| {
ApiError::FailedToSendModeSwitch(format!(
"Failed to send target mode {target:?} to scheduler: \
{e}"
))
})?;
if self.is_standalone {
self.mode_tx
.send(SchedulerMode::Primary)
.await
.map_err(|e| {
ApiError::FailedToSendModeSwitch(format!(
"Failed to send primary mode to scheduler: {e}"
))
})?;
} else if let Some(replicator) = self.replication_service.take() {
self.replication_handle.replace(replicator.spawn());
}

// Now we are ready to start all services and are ready to accept transactions
if let Some(frequency) = self
Expand Down Expand Up @@ -975,6 +1023,11 @@ impl MagicValidator {
log_timing("shutdown", "ledger_truncator_join", step_start);
let step_start = Instant::now();
let _ = self.transaction_execution.join();
if let Some(handle) = self.replication_handle {
if let Ok(Err(error)) = handle.join() {
error!(%error, "replication service experienced catastrophic failure");
}
}
log_timing("shutdown", "transaction_execution_join", step_start);

log_timing("shutdown", "stop_total", stop_start);
Expand Down
11 changes: 11 additions & 0 deletions magicblock-config/src/config/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ impl Default for ValidatorConfig {
}
}
}

impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()),
}
}
}
Comment on lines +45 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add direct coverage for the mode-to-remote mapping.

This helper now decides whether magicblock-api/src/magic_validator.rs:185-230 even attempts broker startup, so Standalone, StandBy, and ReplicatOnly are worth pinning down with a small unit-test matrix.

Minimal coverage example
+#[cfg(test)]
+mod tests {
+    use super::ReplicationMode;
+    use url::Url;
+
+    #[test]
+    fn remote_returns_expected_value_for_each_mode() {
+        let url = Url::parse("nats://localhost:4222").unwrap();
+
+        assert_eq!(ReplicationMode::Standalone.remote(), None);
+        assert_eq!(
+            ReplicationMode::StandBy(url.clone()).remote(),
+            Some(url.clone())
+        );
+        assert_eq!(
+            ReplicationMode::ReplicatOnly(url.clone()).remote(),
+            Some(url)
+        );
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()),
}
}
}
impl ReplicationMode {
/// Returns the remote URL if this node participates in replication.
/// Returns `None` for `Standalone` mode.
pub fn remote(&self) -> Option<Url> {
match self {
Self::Standalone => None,
Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()),
}
}
#[cfg(test)]
mod tests {
use super::ReplicationMode;
use url::Url;
#[test]
fn remote_returns_expected_value_for_each_mode() {
let url = Url::parse("nats://localhost:4222").unwrap();
assert_eq!(ReplicationMode::Standalone.remote(), None);
assert_eq!(
ReplicationMode::StandBy(url.clone()).remote(),
Some(url.clone())
);
assert_eq!(
ReplicationMode::ReplicatOnly(url.clone()).remote(),
Some(url)
);
}
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-config/src/config/validator.rs` around lines 45 - 54, Add a small
unit-test matrix that directly exercises ReplicationMode::remote to assert the
correct Option<Url> mapping for each variant: ensure Self::Standalone yields
None and that Self::StandBy(url) and Self::ReplicatOnly(url) return
Some(cloned_url) equal to the original; place tests near the impl (e.g., in
validator.rs tests module) and use a deterministic Url value to compare equality
so the behavior that gates broker startup in magic_validator.rs is pinned down.

1 change: 1 addition & 0 deletions magicblock-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition.workspace = true
tokio = { workspace = true, features = ["sync"] }
flume = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true }

serde = { workspace = true, features = ["derive"] }
solana-account = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions magicblock-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub type Slot = u64;
/// Ordinal position of a transaction within a slot.
pub type TransactionIndex = u32;
Comment on lines +2 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Clarify that TransactionIndex is not a true per-slot ordinal yet.

The new rustdoc reads like callers can rely on ordering within a slot, but current processor paths still emit 0 for transaction indexes. Tighten the comment so downstream replication code does not assume uniqueness/order that is not implemented yet.

Possible doc tweak
-/// Ordinal position of a transaction within a slot.
+/// Logical transaction position within a slot.
+///
+/// Note: current processor paths may still emit `0` here; true per-slot
+/// ordinals will be introduced with the planned ledger rewrite.
 pub type TransactionIndex = u32;

Based on learnings: In magicblock-processor, transaction indexes were always set to 0 even before the changes in PR #596. The proper transaction indexing within slots will be addressed during the planned ledger rewrite.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Ordinal position of a transaction within a slot.
pub type TransactionIndex = u32;
/// Logical transaction position within a slot.
///
/// Note: current processor paths may still emit `0` here; true per-slot
/// ordinals will be introduced with the planned ledger rewrite.
pub type TransactionIndex = u32;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-core/src/lib.rs` around lines 2 - 3, The doc comment for the type
alias TransactionIndex currently implies a true per-slot ordinal but the
processor still emits 0 for transaction indexes; update the rustdoc for
TransactionIndex to explicitly state it is intended as an ordinal within a slot
but is not yet implemented as a unique or ordered per-transaction index (current
processors may emit 0), and warn downstream code (e.g. replication consumers)
not to rely on uniqueness or ordering until the planned ledger/processor rewrite
implements proper indexing.


/// A macro that panics when running a debug build and logs the panic message
/// instead when running in release mode.
Expand Down
12 changes: 11 additions & 1 deletion magicblock-core/src/link.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use accounts::{AccountUpdateRx, AccountUpdateTx};
use blocks::{BlockUpdateRx, BlockUpdateTx};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, Receiver, Sender};
use transactions::{
ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle,
TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx,
};

use crate::link::replication::Message;

pub mod accounts;
pub mod blocks;
pub mod replication;
pub mod transactions;

/// The bounded capacity for MPSC channels that require backpressure.
Expand All @@ -29,6 +32,8 @@ pub struct DispatchEndpoints {
pub block_update: BlockUpdateRx,
/// Receives scheduled (crank) tasks from transactions executor.
pub tasks_service: Option<ScheduledTasksRx>,
/// Receives replication events from the transaction scheduler.
pub replication_messages: Option<Receiver<Message>>,
}

/// A collection of channel endpoints for the **validator's internal core**.
Expand All @@ -47,6 +52,8 @@ pub struct ValidatorChannelEndpoints {
pub block_update: BlockUpdateTx,
/// Sends scheduled (crank) tasks to tasks service from transactions executor.
pub tasks_service: ScheduledTasksTx,
/// Sends replication events to the replication service.
pub replication_messages: Sender<Message>,
}

/// Creates and connects the full set of communication channels between the dispatch
Expand All @@ -66,6 +73,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {

// Bounded channels for command queues where applying backpressure is important.
let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY);
let (replication_tx, replication_rx) = mpsc::channel(LINK_CAPACITY);

// Bundle the respective channel ends for the dispatch side.
let dispatch = DispatchEndpoints {
Expand All @@ -74,6 +82,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
account_update: account_update_rx,
block_update: block_update_rx,
tasks_service: Some(tasks_rx),
replication_messages: Some(replication_rx),
};

// Bundle the corresponding channel ends for the validator's internal core.
Expand All @@ -83,6 +92,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) {
account_update: account_update_tx,
block_update: block_update_tx,
tasks_service: tasks_tx,
replication_messages: replication_tx,
};

(dispatch, validator)
Expand Down
Loading
Loading