diff --git a/Cargo.lock b/Cargo.lock index cfef678c7..bb342479f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3407,6 +3407,7 @@ dependencies = [ "magicblock-metrics", "magicblock-processor", "magicblock-program", + "magicblock-replicator", "magicblock-task-scheduler", "magicblock-validator-admin", "num_cpus", @@ -3587,6 +3588,7 @@ name = "magicblock-core" version = "0.8.3" dependencies = [ "bincode", + "bytes", "flume", "magicblock-magic-program-api", "serde", @@ -3817,6 +3819,7 @@ dependencies = [ "tempfile", "thiserror 1.0.69", "tokio", + "tokio-util", "tracing", "url", ] @@ -9136,6 +9139,7 @@ dependencies = [ "solana-signature", "solana-signer", "solana-transaction", + "solana-transaction-error", "solana-transaction-status-client-types", "tempfile", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 1734e9942..df9a31014 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,7 @@ magicblock-magic-program-api = { path = "./magicblock-magic-program-api" } magicblock-metrics = { path = "./magicblock-metrics" } magicblock-processor = { path = "./magicblock-processor" } magicblock-program = { path = "./programs/magicblock" } +magicblock-replicator = { path = "./magicblock-replicator" } magicblock-rpc-client = { path = "./magicblock-rpc-client" } magicblock-table-mania = { path = "./magicblock-table-mania" } magicblock-task-scheduler = { path = "./magicblock-task-scheduler" } diff --git a/magicblock-aperture/src/encoder.rs b/magicblock-aperture/src/encoder.rs index b89f036aa..10b91bc0e 100644 --- a/magicblock-aperture/src/encoder.rs +++ b/magicblock-aperture/src/encoder.rs @@ -3,10 +3,7 @@ use std::fmt::Debug; use hyper::body::Bytes; use json::Serialize; use magicblock_core::{ - link::{ - accounts::LockedAccount, - transactions::{TransactionResult, TransactionStatus}, - }, + link::{accounts::LockedAccount, transactions::TransactionStatus}, Slot, }; use solana_account::ReadableAccount; @@ -14,7 +11,7 @@ use solana_account_decoder::{ encode_ui_account, UiAccountEncoding, UiDataSliceConfig, }; use solana_pubkey::Pubkey; -use solana_transaction_error::TransactionError; +use solana_transaction_error::{TransactionError, TransactionResult}; use crate::{ requests::{params::SerdeSignature, payload::NotificationPayload}, @@ -110,7 +107,7 @@ impl Encoder for ProgramAccountEncoder { pub(crate) struct TransactionResultEncoder; impl Encoder for TransactionResultEncoder { - type Data = TransactionResult; + type Data = TransactionResult<()>; fn encode( &self, diff --git a/magicblock-aperture/src/requests/http/mod.rs b/magicblock-aperture/src/requests/http/mod.rs index 6afc7b55d..fdd883fbb 100644 --- a/magicblock-aperture/src/requests/http/mod.rs +++ b/magicblock-aperture/src/requests/http/mod.rs @@ -213,7 +213,10 @@ impl HttpDispatcher { } let txn = transaction.sanitize(sigverify)?; - Ok(WithEncoded { txn, encoded }) + Ok(WithEncoded { + txn, + encoded: encoded.into(), + }) } /// Ensures all accounts required for a transaction are present in the `AccountsDb`. diff --git a/magicblock-aperture/src/state/transactions.rs b/magicblock-aperture/src/state/transactions.rs index 6360899f5..bd5869387 100644 --- a/magicblock-aperture/src/state/transactions.rs +++ b/magicblock-aperture/src/state/transactions.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use magicblock_core::{link::transactions::TransactionResult, Slot}; +use magicblock_core::Slot; use solana_signature::Signature; +use solana_transaction_error::TransactionResult; use super::ExpiringCache; @@ -18,5 +19,5 @@ pub(crate) struct SignatureResult { /// The slot in which the transaction was processed. pub slot: Slot, /// The result of the transaction (e.g., success or an error). - pub result: TransactionResult, + pub result: TransactionResult<()>, } diff --git a/magicblock-api/Cargo.toml b/magicblock-api/Cargo.toml index 4ddde99a9..fe7e53913 100644 --- a/magicblock-api/Cargo.toml +++ b/magicblock-api/Cargo.toml @@ -28,6 +28,7 @@ magicblock-magic-program-api = { workspace = true } magicblock-metrics = { workspace = true } magicblock-processor = { workspace = true } magicblock-program = { workspace = true } +magicblock-replicator = { workspace = true } magicblock-task-scheduler = { workspace = true } magicblock-validator-admin = { workspace = true } diff --git a/magicblock-api/src/errors.rs b/magicblock-api/src/errors.rs index 0f61136ab..86c2c8834 100644 --- a/magicblock-api/src/errors.rs +++ b/magicblock-api/src/errors.rs @@ -107,6 +107,9 @@ pub enum ApiError { FailedToSanitizeTransaction( #[from] solana_transaction_error::TransactionError, ), + + #[error("Replication service failed: {0}")] + Replication(#[from] magicblock_replicator::Error), } impl From for ApiError { diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index b9cda97f0..f68c6a0a1 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -65,6 +65,7 @@ use magicblock_program::{ validator::{self, validator_authority}, TransactionScheduler as ActionTransactionScheduler, }; +use magicblock_replicator::{nats::Broker, ReplicationService}; use magicblock_task_scheduler::{SchedulerDatabase, TaskSchedulerService}; use magicblock_validator_admin::claim_fees::ClaimFeesTask; use mdp::state::{ @@ -122,6 +123,7 @@ pub struct MagicValidator { ledger_truncator: LedgerTruncator, slot_ticker: Option>, committor_service: Option>, + replication_service: Option, scheduled_commits_processor: Option>, chainlink: Arc, rpc_handle: thread::JoinHandle<()>, @@ -132,6 +134,8 @@ pub struct MagicValidator { claim_fees_task: ClaimFeesTask, task_scheduler: Option, transaction_execution: thread::JoinHandle<()>, + replication_handle: + Option>>, mode_tx: Sender, is_standalone: bool, } @@ -174,8 +178,59 @@ impl MagicValidator { let latest_block = ledger.latest_block().load(); let step_start = Instant::now(); - let accountsdb = + let mut accountsdb = AccountsDb::new(&config.accountsdb, &config.storage, last_slot)?; + + // Mode switch channel for transitioning from StartingUp to Primary + // or Replica mode after ledger replay + let (mode_tx, mode_rx) = channel(1); + let is_standalone = matches!( + config.validator.replication_mode, + ReplicationMode::Standalone + ); + + // Connect to replication broker if configured. + // Returns (broker, is_fresh_start) where is_fresh_start indicates + // whether accountsdb was empty and may need a snapshot. + let broker = + if let Some(url) = config.validator.replication_mode.remote() { + let mut broker = Broker::connect(url).await?; + let is_fresh_start = accountsdb.slot() == 0; + // Fetch snapshot from primary if starting fresh + if is_fresh_start { + if let Some(snapshot) = broker.get_snapshot().await? { + accountsdb.insert_external_snapshot( + snapshot.slot, + &snapshot.data, + )?; + } + } + Some((broker, is_fresh_start)) + } else { + None + }; + let accountsdb = Arc::new(accountsdb); + let (mut dispatch, validator_channels) = link(); + + let replication_service = + if let Some((broker, is_fresh_start)) = broker { + let messages_rx = dispatch.replication_messages.take().expect( + "replication channel should always exist after init", + ); + ReplicationService::new( + broker, + mode_tx.clone(), + accountsdb.clone(), + ledger.clone(), + dispatch.transaction_scheduler.clone(), + messages_rx, + token.clone(), + is_fresh_start, + ) + .await? + } else { + None + }; log_timing("startup", "accountsdb_init", step_start); for (pubkey, account) in genesis_config.accounts { if accountsdb.get_account(&pubkey).is_some() { @@ -198,8 +253,6 @@ impl MagicValidator { let faucet_keypair = funded_faucet(&accountsdb, ledger.ledger_path().as_path())?; - let accountsdb = Arc::new(accountsdb); - let step_start = Instant::now(); let metrics_service = magicblock_metrics::try_start_metrics_service( config.metrics.address.0, @@ -217,8 +270,6 @@ impl MagicValidator { ); log_timing("startup", "system_metrics_ticker_start", step_start); - let (mut dispatch, validator_channels) = link(); - let step_start = Instant::now(); let committor_service = Self::init_committor_service(&config).await?; log_timing("startup", "committor_service_init", step_start); @@ -262,13 +313,6 @@ impl MagicValidator { validator::init_validator_authority(identity_keypair); let base_fee = config.validator.basefee; - // Mode switch channel for transitioning from StartingUp to Primary - // or Replica mode after ledger replay - let (mode_tx, mode_rx) = channel(1); - let is_standalone = matches!( - config.validator.replication_mode, - ReplicationMode::Standalone - ); let txn_scheduler_state = TransactionSchedulerState { accountsdb: accountsdb.clone(), ledger: ledger.clone(), @@ -277,6 +321,7 @@ impl MagicValidator { account_update_tx: validator_channels.account_update, environment: build_svm_env(&accountsdb, latest_block.blockhash, 0), tasks_tx: validator_channels.tasks_service, + replication_tx: validator_channels.replication_messages, is_auto_airdrop_lamports_enabled: config .chainlink .auto_airdrop_lamports @@ -374,6 +419,7 @@ impl MagicValidator { // NOTE: set during [Self::start] slot_ticker: None, committor_service, + replication_service, scheduled_commits_processor, chainlink, token, @@ -386,6 +432,7 @@ impl MagicValidator { block_udpate_tx: validator_channels.block_update, task_scheduler: Some(task_scheduler), transaction_execution, + replication_handle: None, mode_tx, is_standalone, }) @@ -830,17 +877,18 @@ impl MagicValidator { // the correct coordination mode: // - Standalone validators transition to Primary mode // - StandBy/ReplicatOnly validators transition to Replica mode - let target = if self.is_standalone { - SchedulerMode::Primary - } else { - SchedulerMode::Replica - }; - self.mode_tx.try_send(target).map_err(|e| { - ApiError::FailedToSendModeSwitch(format!( - "Failed to send target mode {target:?} to scheduler: \ - {e}" - )) - })?; + if self.is_standalone { + self.mode_tx + .send(SchedulerMode::Primary) + .await + .map_err(|e| { + ApiError::FailedToSendModeSwitch(format!( + "Failed to send primary mode to scheduler: {e}" + )) + })?; + } else if let Some(replicator) = self.replication_service.take() { + self.replication_handle.replace(replicator.spawn()); + } // Now we are ready to start all services and are ready to accept transactions if let Some(frequency) = self @@ -975,6 +1023,11 @@ impl MagicValidator { log_timing("shutdown", "ledger_truncator_join", step_start); let step_start = Instant::now(); let _ = self.transaction_execution.join(); + if let Some(handle) = self.replication_handle { + if let Ok(Err(error)) = handle.join() { + error!(%error, "replication service experienced catastrophic failure"); + } + } log_timing("shutdown", "transaction_execution_join", step_start); log_timing("shutdown", "stop_total", stop_start); diff --git a/magicblock-config/src/config/validator.rs b/magicblock-config/src/config/validator.rs index 26febbb6f..dbcf1e74b 100644 --- a/magicblock-config/src/config/validator.rs +++ b/magicblock-config/src/config/validator.rs @@ -41,3 +41,14 @@ impl Default for ValidatorConfig { } } } + +impl ReplicationMode { + /// Returns the remote URL if this node participates in replication. + /// Returns `None` for `Standalone` mode. + pub fn remote(&self) -> Option { + match self { + Self::Standalone => None, + Self::StandBy(u) | Self::ReplicatOnly(u) => Some(u.clone()), + } + } +} diff --git a/magicblock-core/Cargo.toml b/magicblock-core/Cargo.toml index 358204ede..ca8dc1230 100644 --- a/magicblock-core/Cargo.toml +++ b/magicblock-core/Cargo.toml @@ -12,6 +12,7 @@ edition.workspace = true tokio = { workspace = true, features = ["sync"] } flume = { workspace = true } bincode = { workspace = true } +bytes = { workspace = true } serde = { workspace = true, features = ["derive"] } solana-account = { workspace = true } diff --git a/magicblock-core/src/lib.rs b/magicblock-core/src/lib.rs index cbe515ada..3bb588051 100644 --- a/magicblock-core/src/lib.rs +++ b/magicblock-core/src/lib.rs @@ -1,4 +1,6 @@ pub type Slot = u64; +/// Ordinal position of a transaction within a slot. +pub type TransactionIndex = u32; /// A macro that panics when running a debug build and logs the panic message /// instead when running in release mode. diff --git a/magicblock-core/src/link.rs b/magicblock-core/src/link.rs index 1950dd424..b5609032c 100644 --- a/magicblock-core/src/link.rs +++ b/magicblock-core/src/link.rs @@ -1,13 +1,16 @@ use accounts::{AccountUpdateRx, AccountUpdateTx}; use blocks::{BlockUpdateRx, BlockUpdateTx}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Receiver, Sender}; use transactions::{ ScheduledTasksRx, ScheduledTasksTx, TransactionSchedulerHandle, TransactionStatusRx, TransactionStatusTx, TransactionToProcessRx, }; +use crate::link::replication::Message; + pub mod accounts; pub mod blocks; +pub mod replication; pub mod transactions; /// The bounded capacity for MPSC channels that require backpressure. @@ -29,6 +32,8 @@ pub struct DispatchEndpoints { pub block_update: BlockUpdateRx, /// Receives scheduled (crank) tasks from transactions executor. pub tasks_service: Option, + /// Receives replication events from the transaction scheduler. + pub replication_messages: Option>, } /// A collection of channel endpoints for the **validator's internal core**. @@ -47,6 +52,8 @@ pub struct ValidatorChannelEndpoints { pub block_update: BlockUpdateTx, /// Sends scheduled (crank) tasks to tasks service from transactions executor. pub tasks_service: ScheduledTasksTx, + /// Sends replication events to the replication service. + pub replication_messages: Sender, } /// Creates and connects the full set of communication channels between the dispatch @@ -66,6 +73,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { // Bounded channels for command queues where applying backpressure is important. let (txn_to_process_tx, txn_to_process_rx) = mpsc::channel(LINK_CAPACITY); + let (replication_tx, replication_rx) = mpsc::channel(LINK_CAPACITY); // Bundle the respective channel ends for the dispatch side. let dispatch = DispatchEndpoints { @@ -74,6 +82,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { account_update: account_update_rx, block_update: block_update_rx, tasks_service: Some(tasks_rx), + replication_messages: Some(replication_rx), }; // Bundle the corresponding channel ends for the validator's internal core. @@ -83,6 +92,7 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { account_update: account_update_tx, block_update: block_update_tx, tasks_service: tasks_tx, + replication_messages: replication_tx, }; (dispatch, validator) diff --git a/magicblock-replicator/src/proto.rs b/magicblock-core/src/link/replication.rs similarity index 79% rename from magicblock-replicator/src/proto.rs rename to magicblock-core/src/link/replication.rs index 92a113eab..031b29d3e 100644 --- a/magicblock-replicator/src/proto.rs +++ b/magicblock-core/src/link/replication.rs @@ -1,19 +1,11 @@ //! Protocol message types for replication. -//! -//! # Wire Format -//! -//! The enum variant index serves as an implicit type tag. -use async_nats::Subject; -use magicblock_core::Slot; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use solana_hash::Hash; use solana_transaction::versioned::VersionedTransaction; -use crate::nats::Subjects; - -/// Ordinal position of a transaction within a slot. -pub type TransactionIndex = u32; +use crate::{Slot, TransactionIndex}; /// Index for block boundary markers (TransactionIndex::MAX - 1). /// Used to identify Block messages in slot/index comparisons. @@ -37,15 +29,9 @@ pub enum Message { } impl Message { - pub(crate) fn subject(&self) -> Subject { - match self { - Self::Transaction(_) => Subjects::transaction(), - Self::Block(_) => Subjects::block(), - Self::SuperBlock(_) => Subjects::superblock(), - } - } - - pub(crate) fn slot_and_index(&self) -> (Slot, TransactionIndex) { + /// Returns the (slot, index) position of this message. + /// Block and SuperBlock messages use sentinel index values. + pub fn slot_and_index(&self) -> (Slot, TransactionIndex) { match self { Self::Transaction(tx) => (tx.slot, tx.index), Self::Block(block) => (block.slot, BLOCK_INDEX), @@ -62,7 +48,7 @@ pub struct Transaction { /// Ordinal position within the slot. pub index: TransactionIndex, /// Bincode-encoded `VersionedTransaction`. - pub payload: Vec, + pub payload: Bytes, } /// Slot boundary marker with blockhash. diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index 65750b7da..f3a7a3ea7 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use flume::{Receiver as MpmcReceiver, Sender as MpmcSender}; use magicblock_magic_program_api::args::TaskRequest; use serde::Serialize; @@ -9,7 +10,7 @@ use solana_transaction::{ Transaction, }; use solana_transaction_context::TransactionReturnData; -use solana_transaction_error::TransactionError; +use solana_transaction_error::{TransactionError, TransactionResult}; use solana_transaction_status_client_types::TransactionStatusMeta; use tokio::sync::{ mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, @@ -17,7 +18,7 @@ use tokio::sync::{ }; use super::blocks::BlockHash; -use crate::Slot; +use crate::{Slot, TransactionIndex}; /// The receiver end of the multi-producer, multi-consumer /// channel for communicating final transaction statuses. @@ -44,15 +45,13 @@ pub type ScheduledTasksTx = UnboundedSender; #[derive(Clone)] pub struct TransactionSchedulerHandle(pub(super) TransactionToProcessTx); -/// The standard result of a transaction execution, indicating success or a `TransactionError`. -pub type TransactionResult = solana_transaction_error::TransactionResult<()>; /// The sender half of a one-shot channel used to return the result of a transaction simulation. pub type TxnSimulationResultTx = oneshot::Sender; /// An optional sender half of a one-shot channel for returning a transaction execution result. /// `None` is used for "fire-and-forget" scheduling. -pub type TxnExecutionResultTx = Option>; +pub type TxnExecutionResultTx = Option>>; /// The sender half of a one-shot channel used to return the result of a transaction replay. -pub type TxnReplayResultTx = oneshot::Sender; +pub type TxnReplayResultTx = oneshot::Sender>; /// Contains the final, committed status of an executed /// transaction, including its result and metadata. @@ -62,7 +61,7 @@ pub struct TransactionStatus { pub slot: Slot, pub txn: SanitizedTransaction, pub meta: TransactionStatusMeta, - pub index: u32, + pub index: TransactionIndex, } /// An internal message that bundles a sanitized transaction with its requested processing mode. @@ -72,7 +71,7 @@ pub struct ProcessableTransaction { pub mode: TransactionProcessingMode, /// Pre-encoded bincode bytes for the transaction. /// Used by the replicator to avoid redundant serialization. - pub encoded: Option>, + pub encoded: Option, } /// Specifies the position and persistence behavior for replaying a transaction. @@ -84,7 +83,7 @@ pub struct ReplayPosition { /// The slot in which the transaction was originally included. pub slot: Slot, /// The transaction's index within that slot (0-based). - pub index: u32, + pub index: TransactionIndex, /// Whether to persist the replay to the ledger and broadcast status. /// - `true`: Record to ledger + broadcast (for replay from primary/replicator) /// - `false`: No recording, no broadcast (for local ledger replay during startup) @@ -112,7 +111,7 @@ pub enum TransactionProcessingMode { /// Contains extra information not available in a standard /// execution, like compute units and return data. pub struct TransactionSimulationResult { - pub result: TransactionResult, + pub result: TransactionResult<()>, pub logs: Option>, pub units_consumed: u64, pub return_data: Option, @@ -150,7 +149,7 @@ pub trait SanitizeableTransaction { fn sanitize_with_encoded( self, verify: bool, - ) -> Result<(SanitizedTransaction, Option>), TransactionError> + ) -> TransactionResult<(SanitizedTransaction, Option)> where Self: Sized, { @@ -163,7 +162,7 @@ pub trait SanitizeableTransaction { /// Use for internally-constructed transactions that need encoded bytes. pub struct WithEncoded { pub txn: T, - pub encoded: Vec, + pub encoded: Bytes, } impl SanitizeableTransaction for WithEncoded { @@ -177,7 +176,7 @@ impl SanitizeableTransaction for WithEncoded { fn sanitize_with_encoded( self, verify: bool, - ) -> Result<(SanitizedTransaction, Option>), TransactionError> { + ) -> TransactionResult<(SanitizedTransaction, Option)> { let txn = self.txn.sanitize(verify)?; Ok((txn, Some(self.encoded))) } @@ -190,7 +189,8 @@ where T: Serialize, { let encoded = bincode::serialize(&txn) - .map_err(|_| TransactionError::SanitizeFailure)?; + .map_err(|_| TransactionError::SanitizeFailure)? + .into(); Ok(WithEncoded { txn, encoded }) } @@ -238,7 +238,7 @@ impl TransactionSchedulerHandle { pub async fn schedule( &self, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let mode = TransactionProcessingMode::Execution(None); let txn = ProcessableTransaction { @@ -258,7 +258,7 @@ impl TransactionSchedulerHandle { pub async fn execute( &self, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let mode = |tx| TransactionProcessingMode::Execution(Some(tx)); self.send(txn, mode).await? } @@ -267,7 +267,7 @@ impl TransactionSchedulerHandle { pub async fn simulate( &self, txn: impl SanitizeableTransaction, - ) -> Result { + ) -> TransactionResult { let mode = TransactionProcessingMode::Simulation; self.send(txn, mode).await } @@ -285,7 +285,7 @@ impl TransactionSchedulerHandle { &self, position: ReplayPosition, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let mode = TransactionProcessingMode::Replay(position); let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let txn = ProcessableTransaction { @@ -305,7 +305,7 @@ impl TransactionSchedulerHandle { &self, txn: impl SanitizeableTransaction, mode: fn(oneshot::Sender) -> TransactionProcessingMode, - ) -> Result { + ) -> TransactionResult { let (transaction, encoded) = txn.sanitize_with_encoded(true)?; let (tx, rx) = oneshot::channel(); let mode = mode(tx); diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index 5520a06db..b82864bb2 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -284,6 +284,7 @@ impl super::TransactionExecutor { let versioned = transaction.to_versioned_transaction(); bincode::serialize(&versioned) .map_err(|e| Box::new(e) as Box)? + .into() } }; diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index 43420bdc5..30ed819e2 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -314,6 +314,11 @@ impl ExecutionCoordinator { ); !mode_mismatch } + + /// Check whether the node is acting as an event source for replication + pub(super) fn should_replicate(&self) -> bool { + matches!(self.mode, CoordinationMode::Primary(_)) + } } /// Transaction wrapped with a monotonic ID for FIFO queue ordering. diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index 69cd80d5d..40ebdbea6 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -16,9 +16,12 @@ use coordinator::{ExecutionCoordinator, TransactionWithId}; use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::{ - link::transactions::{ - ProcessableTransaction, SchedulerMode, TransactionProcessingMode, - TransactionToProcessRx, + link::{ + replication::{self, Message}, + transactions::{ + ProcessableTransaction, SchedulerMode, TransactionProcessingMode, + TransactionToProcessRx, + }, }, Slot, }; @@ -64,7 +67,11 @@ pub struct TransactionScheduler { shutdown: CancellationToken, /// Receives mode transition commands (Primary or Replica) at runtime. mode_rx: Receiver, + /// A sink for the events (transactions, blocks etc) that need to be replicated + replication_tx: Sender, + /// Current Slot that scheduler is operating on (clock value) slot: Slot, + /// Current transaction index included into the block under assembly index: u32, } @@ -107,6 +114,7 @@ impl TransactionScheduler { accountsdb: state.accountsdb, shutdown: state.shutdown, mode_rx: state.mode_rx, + replication_tx: state.replication_tx, slot: state.ledger.latest_block().load().slot, index: 0, } @@ -138,7 +146,9 @@ impl TransactionScheduler { tokio::select! { biased; Ok(()) = block_produced.recv() => self.transition_to_new_slot(), - Some(executor) = self.ready_rx.recv() => self.handle_ready_executor(executor), + Some(executor) = self.ready_rx.recv() => { + self.handle_ready_executor(executor).await; + } Some(mode) = self.mode_rx.recv() => { match mode { SchedulerMode::Primary => { @@ -150,7 +160,7 @@ impl TransactionScheduler { } } Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { - self.handle_new_transaction(txn); + self.handle_new_transaction(txn).await; } _ = self.shutdown.cancelled() => break, else => break, @@ -163,12 +173,12 @@ impl TransactionScheduler { info!("Scheduler terminated"); } - fn handle_ready_executor(&mut self, executor: ExecutorId) { + async fn handle_ready_executor(&mut self, executor: ExecutorId) { self.coordinator.unlock_accounts(executor); - self.reschedule_blocked_transactions(executor); + self.reschedule_blocked_transactions(executor).await; } - fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + async fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { if !self.coordinator.is_transaction_allowed(&txn.mode) { warn!("Dropping transaction due to mode incompatibility"); return; @@ -179,10 +189,11 @@ impl TransactionScheduler { let executor = self.coordinator.get_ready_executor().expect( "unreachable: is_ready() guard ensures an executor is available", ); - self.schedule_transaction(executor, TransactionWithId::new(txn)); + self.schedule_transaction(executor, TransactionWithId::new(txn)) + .await; } - fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { + async fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { let mut executor = Some(blocker); while let Some(exec) = executor.take() { // Try to get next transaction blocked by this executor @@ -193,7 +204,7 @@ impl TransactionScheduler { break; }; - let blocked = self.schedule_transaction(exec, txn); + let blocked = self.schedule_transaction(exec, txn).await; // If blocked by the same executor we're draining, stop to avoid infinite loop if blocked.is_some_and(|b| b == blocker) { @@ -204,7 +215,7 @@ impl TransactionScheduler { } } - fn schedule_transaction( + async fn schedule_transaction( &mut self, executor: ExecutorId, txn: TransactionWithId, @@ -222,11 +233,31 @@ impl TransactionScheduler { (self.slot, index) }; + let msg = txn + .encoded + .as_ref() + .cloned() + .filter(|_| { + matches!(txn.mode, TransactionProcessingMode::Execution(_)) + && self.coordinator.should_replicate() + }) + .map(|payload| { + Message::Transaction(replication::Transaction { + index, + slot, + payload, + }) + }); let txn = IndexedTransaction { slot, index, txn }; - let _ = self.executors[executor as usize].try_send(txn).inspect_err( - |e| error!(executor, error = ?e, "Executor channel send failed"), - ); + let sent = self.executors[executor as usize].try_send(txn).inspect_err( + |error| error!(executor, %error, "Executor channel send failed"), + ).is_ok(); + if let Some(msg) = msg.filter(|_| sent) { + let _ = self.replication_tx.send(msg).await.inspect_err( + |error| error!(executor, %error, "Replication channel send failed"), + ); + } None } diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 03b9a01af..0d3f5319a 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -9,6 +9,7 @@ use std::sync::{Arc, OnceLock, RwLock}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::link::{ accounts::AccountUpdateTx, + replication::Message, transactions::{ ScheduledTasksTx, SchedulerMode, TransactionStatusTx, TransactionToProcessRx, @@ -32,7 +33,7 @@ use solana_program_runtime::{ }; use solana_pubkey::Pubkey; use solana_svm::transaction_processor::TransactionProcessingEnvironment; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio_util::sync::CancellationToken; use crate::{executor::SimpleForkGraph, syscalls::SyscallMatmulI8}; @@ -52,6 +53,7 @@ pub struct TransactionSchedulerState { pub account_update_tx: AccountUpdateTx, pub transaction_status_tx: TransactionStatusTx, pub tasks_tx: ScheduledTasksTx, + pub replication_tx: Sender, // === Configuration === pub is_auto_airdrop_lamports_enabled: bool, diff --git a/magicblock-replicator/Cargo.toml b/magicblock-replicator/Cargo.toml index 12457e7fc..4d5328fd8 100644 --- a/magicblock-replicator/Cargo.toml +++ b/magicblock-replicator/Cargo.toml @@ -26,6 +26,7 @@ tokio = { workspace = true, features = [ "io-util", "fs", ] } +tokio-util = { workspace = true } serde = { workspace = true, features = ["derive"] } solana-hash = { workspace = true, features = ["serde"] } solana-transaction = { workspace = true, features = ["serde"] } diff --git a/magicblock-replicator/src/lib.rs b/magicblock-replicator/src/lib.rs index 9a616b463..243658a2b 100644 --- a/magicblock-replicator/src/lib.rs +++ b/magicblock-replicator/src/lib.rs @@ -14,7 +14,6 @@ pub mod error; pub mod nats; -pub mod proto; pub mod service; pub mod watcher; @@ -22,4 +21,4 @@ pub mod watcher; mod tests; pub use error::{Error, Result}; -pub use proto::{Message, TransactionIndex}; +pub use service::Service as ReplicationService; diff --git a/magicblock-replicator/src/nats/consumer.rs b/magicblock-replicator/src/nats/consumer.rs index 47a424f2c..02c05b1c0 100644 --- a/magicblock-replicator/src/nats/consumer.rs +++ b/magicblock-replicator/src/nats/consumer.rs @@ -4,6 +4,7 @@ use async_nats::jetstream::consumer::{ pull::{Config as PullConfig, Stream as MessageStream}, AckPolicy, DeliverPolicy, PullConsumer, }; +use tokio_util::sync::CancellationToken; use tracing::warn; use super::cfg; @@ -58,18 +59,27 @@ impl Consumer { /// /// Use this in a `tokio::select!` loop to process messages as they arrive. /// Messages are fetched in batches for efficiency. - pub async fn messages(&self) -> MessageStream { + pub async fn messages( + &self, + cancel: &CancellationToken, + ) -> Option { loop { - let result = self + let messages = self .inner .stream() .max_messages_per_batch(cfg::BATCH_SIZE) - .messages() - .await; - match result { - Ok(s) => break s, - Err(error) => { - warn!(%error, "failed to create message stream") + .messages(); + tokio::select! { + result = messages => { + match result { + Ok(s) => break Some(s), + Err(error) => { + warn!(%error, "failed to create message stream") + } + } + } + _ = cancel.cancelled() => { + break None; } } } diff --git a/magicblock-replicator/src/nats/lock_watcher.rs b/magicblock-replicator/src/nats/lock_watcher.rs index 303a84926..7f58fb726 100644 --- a/magicblock-replicator/src/nats/lock_watcher.rs +++ b/magicblock-replicator/src/nats/lock_watcher.rs @@ -2,6 +2,7 @@ use async_nats::jetstream::kv::{Operation, Watch}; use futures::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::warn; use super::cfg; @@ -17,8 +18,14 @@ pub struct LockWatcher { impl LockWatcher { /// Creates a new lock watcher. - pub(crate) async fn new(broker: &Broker) -> Self { + pub(crate) async fn new( + broker: &Broker, + cancel: &CancellationToken, + ) -> Option { let watch = loop { + if cancel.is_cancelled() { + return None; + } let store = match broker.ctx.get_key_value(cfg::PRODUCER_LOCK).await { Ok(s) => s, @@ -35,7 +42,7 @@ impl LockWatcher { } } }; - Self { watch } + Some(Self { watch }) } /// Waits for the lock to be deleted or expire. diff --git a/magicblock-replicator/src/nats/mod.rs b/magicblock-replicator/src/nats/mod.rs index 70cc0f511..0c9386a76 100644 --- a/magicblock-replicator/src/nats/mod.rs +++ b/magicblock-replicator/src/nats/mod.rs @@ -18,6 +18,7 @@ use async_nats::Subject; pub use broker::Broker; pub use consumer::Consumer; pub use lock_watcher::LockWatcher; +use magicblock_core::link::replication::Message; pub use producer::Producer; pub use snapshot::Snapshot; @@ -97,4 +98,12 @@ impl Subjects { pub fn superblock() -> Subject { Self::from(Self::SUPERBLOCK) } + + pub(crate) fn from_message(msg: &Message) -> Subject { + match msg { + Message::Transaction(_) => Subjects::transaction(), + Message::Block(_) => Subjects::block(), + Message::SuperBlock(_) => Subjects::superblock(), + } + } } diff --git a/magicblock-replicator/src/service/context.rs b/magicblock-replicator/src/service/context.rs index fcd16d0f7..0f267a997 100644 --- a/magicblock-replicator/src/service/context.rs +++ b/magicblock-replicator/src/service/context.rs @@ -5,22 +5,25 @@ use std::sync::Arc; use machineid_rs::IdBuilder; use magicblock_accounts_db::AccountsDb; use magicblock_core::{ - link::transactions::{SchedulerMode, TransactionSchedulerHandle}, - Slot, + link::{ + replication::{Block, Message, SuperBlock}, + transactions::{SchedulerMode, TransactionSchedulerHandle}, + }, + Slot, TransactionIndex, }; use magicblock_ledger::Ledger; use tokio::{ fs::File, sync::mpsc::{Receiver, Sender}, }; +use tokio_util::sync::CancellationToken; use tracing::info; use super::{Primary, Standby, CONSUMER_RETRY_DELAY}; use crate::{ nats::{Broker, Consumer, LockWatcher, Producer}, - proto::{self, TransactionIndex}, watcher::SnapshotWatcher, - Error, Message, Result, + Error, Result, }; /// Shared state for both primary and standby roles. @@ -29,6 +32,8 @@ pub struct ReplicationContext { pub id: String, /// NATS broker. pub broker: Broker, + /// Global shutdown signal + pub cancel: CancellationToken, /// Scheduler mode channel. pub mode_tx: Sender, /// Accounts database. @@ -39,6 +44,7 @@ pub struct ReplicationContext { pub scheduler: TransactionSchedulerHandle, /// Current position. pub slot: Slot, + /// Position of the last transaction within slot pub index: TransactionIndex, } @@ -50,6 +56,7 @@ impl ReplicationContext { accountsdb: Arc, ledger: Arc, scheduler: TransactionSchedulerHandle, + cancel: CancellationToken, ) -> Result { let id = IdBuilder::new(machineid_rs::Encryption::SHA256) .add_component(machineid_rs::HWIDComponent::SystemID) @@ -64,6 +71,7 @@ impl ReplicationContext { Ok(Self { id, broker, + cancel, mode_tx, accountsdb, ledger, @@ -80,14 +88,14 @@ impl ReplicationContext { } /// Writes block to ledger. - pub async fn write_block(&self, block: &proto::Block) -> Result<()> { + pub async fn write_block(&self, block: &Block) -> Result<()> { self.ledger .write_block(block.slot, block.timestamp, block.hash)?; Ok(()) } /// Verifies superblock checksum. - pub fn verify_checksum(&self, sb: &proto::SuperBlock) -> Result<()> { + pub fn verify_checksum(&self, sb: &SuperBlock) -> Result<()> { let _lock = self.accountsdb.lock_database(); // SAFETY: Lock acquired above ensures no concurrent modifications // during checksum computation. @@ -132,16 +140,25 @@ impl ReplicationContext { self.broker.put_snapshot(slot, file).await } - /// Creates consumer with retry. - pub async fn create_consumer(&self, reset: bool) -> Consumer { + /// Creates consumer with retry, respecting shutdown signal. + /// Returns `None` if shutdown is triggered during creation. + pub async fn create_consumer(&self, reset: bool) -> Option { loop { - match self.broker.create_consumer(&self.id, reset).await { - Ok(c) => return c, - Err(e) => { - tracing::warn!(%e, "consumer creation failed, retrying"); - tokio::time::sleep(CONSUMER_RETRY_DELAY).await; + tokio::select! { + result = self.broker.create_consumer(&self.id, reset) => { + match result { + Ok(c) => return Some(c), + Err(e) => { + tracing::warn!(%e, "consumer creation failed, retrying"); + } + } + } + _ = self.cancel.cancelled() => { + tracing::info!("shutdown during consumer creation"); + return None; } } + tokio::time::sleep(CONSUMER_RETRY_DELAY).await; } } @@ -157,6 +174,7 @@ impl ReplicationContext { } /// Transitions to standby role. + /// Returns `None` if shutdown is triggered during consumer creation. /// reset parameter controls where in the stream the consumption starts: /// true - the last known position that we know /// false - the last known position that message broker tracks for us @@ -164,10 +182,20 @@ impl ReplicationContext { self, messages: Receiver, reset: bool, - ) -> Result { - let consumer = Box::new(self.create_consumer(reset).await); - let watcher = LockWatcher::new(&self.broker).await; + ) -> Result> { + let Some(consumer) = self.create_consumer(reset).await else { + return Ok(None); + }; + let Some(watcher) = LockWatcher::new(&self.broker, &self.cancel).await + else { + return Ok(None); + }; self.enter_replica_mode().await; - Ok(Standby::new(self, consumer, messages, watcher)) + Ok(Some(Standby::new( + self, + Box::new(consumer), + messages, + watcher, + ))) } } diff --git a/magicblock-replicator/src/service/mod.rs b/magicblock-replicator/src/service/mod.rs index 4c89ae5ad..a9c497bb8 100644 --- a/magicblock-replicator/src/service/mod.rs +++ b/magicblock-replicator/src/service/mod.rs @@ -27,8 +27,9 @@ use std::{sync::Arc, thread::JoinHandle, time::Duration}; pub use context::ReplicationContext; use magicblock_accounts_db::AccountsDb; -use magicblock_core::link::transactions::{ - SchedulerMode, TransactionSchedulerHandle, +use magicblock_core::link::{ + replication::Message, + transactions::{SchedulerMode, TransactionSchedulerHandle}, }; use magicblock_ledger::Ledger; pub use primary::Primary; @@ -37,8 +38,9 @@ use tokio::{ runtime::Builder, sync::mpsc::{Receiver, Sender}, }; +use tokio_util::sync::CancellationToken; -use crate::{nats::Broker, Message, Result}; +use crate::{nats::Broker, Result}; // ============================================================================= // Constants @@ -60,6 +62,7 @@ pub enum Service { impl Service { /// Creates service, attempting primary role first. + #[allow(clippy::too_many_arguments)] pub async fn new( broker: Broker, mode_tx: Sender, @@ -67,21 +70,26 @@ impl Service { ledger: Arc, scheduler: TransactionSchedulerHandle, messages: Receiver, + cancel: CancellationToken, reset: bool, - ) -> crate::Result { + ) -> crate::Result> { let ctx = ReplicationContext::new( - broker, mode_tx, accountsdb, ledger, scheduler, + broker, mode_tx, accountsdb, ledger, scheduler, cancel, ) .await?; // Try to become primary. match ctx.try_acquire_producer().await? { - Some(producer) => { - Ok(Self::Primary(ctx.into_primary(producer, messages).await?)) - } + Some(producer) => Ok(Some(Self::Primary( + ctx.into_primary(producer, messages).await?, + ))), None => { - let standby = ctx.into_standby(messages, reset).await?; - Ok(Self::Standby(standby)) + let Some(standby) = ctx.into_standby(messages, reset).await? + else { + // Shutdown during consumer creation + return Ok(None); + }; + Ok(Some(Self::Standby(standby))) } } } @@ -90,13 +98,13 @@ impl Service { pub async fn run(mut self) -> Result<()> { loop { self = match self { - Service::Primary(p) => Service::Standby(p.run().await?), - Service::Standby(s) => match s.run().await { - Ok(p) => Service::Primary(p), - Err(error) => { - tracing::error!(%error, "unrecoverable replication failure"); - return Err(error); - } + Service::Primary(p) => match p.run().await? { + Some(s) => Service::Standby(s), + None => return Ok(()), + }, + Service::Standby(s) => match s.run().await? { + Some(p) => Service::Primary(p), + None => return Ok(()), }, }; } diff --git a/magicblock-replicator/src/service/primary.rs b/magicblock-replicator/src/service/primary.rs index c2b00a161..5733fe9b5 100644 --- a/magicblock-replicator/src/service/primary.rs +++ b/magicblock-replicator/src/service/primary.rs @@ -1,11 +1,15 @@ //! Primary node: publishes events and holds leader lock. +use magicblock_core::link::replication::Message; use tokio::sync::mpsc::Receiver; use tracing::{error, info, instrument, warn}; use super::{ReplicationContext, LOCK_REFRESH_INTERVAL}; use crate::{ - nats::Producer, service::Standby, watcher::SnapshotWatcher, Message, Result, + nats::{Producer, Subjects}, + service::Standby, + watcher::SnapshotWatcher, + Result, }; /// Primary node: publishes events and holds leader lock. @@ -32,22 +36,15 @@ impl Primary { } } - /// Runs until leadership lost, returns standby on demotion. + /// Runs until leadership lost or shutdown. + /// Returns `Some(Standby)` on demotion, `None` on shutdown. #[instrument(skip(self))] - pub async fn run(mut self) -> Result { + pub async fn run(mut self) -> Result> { let mut lock_tick = tokio::time::interval(LOCK_REFRESH_INTERVAL); loop { tokio::select! { - Some(msg) = self.messages.recv() => { - if let Err(error) = self.publish(msg).await { - // publish should not easily fail, if that happens, it means - // the message broker has become unrecoverably unreacheable - warn!(%error, "failed to publish the message"); - return self.ctx.into_standby(self.messages, true).await; - } - } - + biased; _ = lock_tick.tick() => { let held = match self.producer.refresh().await { Ok(h) => h, @@ -61,12 +58,23 @@ impl Primary { return self.ctx.into_standby(self.messages, true).await; } } - + Some(msg) = self.messages.recv() => { + if let Err(error) = self.publish(msg).await { + // publish should not easily fail, if that happens, it means + // the message broker has become unrecoverably unreacheable + warn!(%error, "failed to publish the message"); + return self.ctx.into_standby(self.messages, true).await; + } + } Some((file, slot)) = self.snapshots.recv() => { if let Err(e) = self.ctx.upload_snapshot(file, slot).await { warn!(%e, "snapshot upload failed"); } } + _ = self.ctx.cancel.cancelled() => { + info!("shutdown received, terminating primary mode"); + return Ok(None); + } } } } @@ -79,7 +87,7 @@ impl Primary { return Ok(()); } }; - let subject = msg.subject(); + let subject = Subjects::from_message(&msg); let (slot, index) = msg.slot_and_index(); let ack = matches!(msg, Message::SuperBlock(_)); diff --git a/magicblock-replicator/src/service/standby.rs b/magicblock-replicator/src/service/standby.rs index 9bd834d1b..77bdebb21 100644 --- a/magicblock-replicator/src/service/standby.rs +++ b/magicblock-replicator/src/service/standby.rs @@ -4,9 +4,9 @@ use std::time::{Duration, Instant}; use async_nats::Message as NatsMessage; use futures::StreamExt; -use magicblock_core::{ - link::transactions::{ReplayPosition, WithEncoded}, - Slot, +use magicblock_core::link::{ + replication::{Message, Transaction}, + transactions::{ReplayPosition, WithEncoded}, }; use solana_transaction::versioned::VersionedTransaction; use tokio::sync::mpsc::Receiver; @@ -15,9 +15,8 @@ use tracing::{error, info, warn}; use super::{ReplicationContext, LEADER_TIMEOUT}; use crate::{ nats::{Consumer, LockWatcher}, - proto::TransactionIndex, service::Primary, - Message, Result, + Result, }; /// Standby node: consumes events and watches for leader failure. @@ -46,17 +45,33 @@ impl Standby { } } - /// Runs until leadership acquired, returns primary on promotion. - pub async fn run(mut self) -> Result { + /// Runs until leadership acquired or shutdown. + /// Returns `Some(Primary)` on promotion, `None` on shutdown. + pub async fn run(mut self) -> Result> { let mut timeout_check = tokio::time::interval(Duration::from_secs(1)); - let mut stream = self.consumer.messages().await; + let Some(mut stream) = self.consumer.messages(&self.ctx.cancel).await + else { + return Ok(None); + }; loop { tokio::select! { + biased; + _ = self.watcher.wait_for_expiry() => { + info!("leader lock expired, attempting takeover"); + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership, promoting"); + return self.ctx.into_primary(producer, self.messages).await.map(Some); + } + } result = stream.next() => { let Some(result) = result else { - stream = self.consumer.messages().await; - continue; + if let Some(s) = self.consumer.messages(&self.ctx.cancel).await { + stream = s; + continue; + } else { + return Ok(None); + }; }; match result { Ok(msg) => { @@ -66,21 +81,16 @@ impl Standby { Err(e) => warn!(%e, "message consumption stream error"), } } - - _ = self.watcher.wait_for_expiry() => { - info!("leader lock expired, attempting takeover"); - if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { - info!("acquired leadership, promoting"); - return self.ctx.into_primary(producer, self.messages).await; - } - } - _ = timeout_check.tick(), if self.last_activity.elapsed() > LEADER_TIMEOUT => { if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { info!("acquired leadership via timeout, promoting"); - return self.ctx.into_primary(producer, self.messages).await; + return self.ctx.into_primary(producer, self.messages).await.map(Some); } } + _ = self.ctx.cancel.cancelled() => { + info!("shutdown received, terminating standby mode"); + return Ok(None); + } } } } @@ -102,8 +112,8 @@ impl Standby { } let result = match message { - Message::Transaction(tx) => { - self.replay_tx(tx.slot, tx.index, tx.payload).await + Message::Transaction(txn) => { + self.replay_tx(txn).await } Message::Block(block) => self.ctx.write_block(&block).await, Message::SuperBlock(sb) => { @@ -120,20 +130,16 @@ impl Standby { self.ctx.update_position(slot, index); } - async fn replay_tx( - &self, - slot: Slot, - index: TransactionIndex, - encoded: Vec, - ) -> Result<()> { + async fn replay_tx(&self, msg: Transaction) -> Result<()> { let pos = ReplayPosition { - slot, - index, + slot: msg.slot, + index: msg.index, persist: true, }; - let tx: VersionedTransaction = bincode::deserialize(&encoded)?; - let tx = WithEncoded { txn: tx, encoded }; - self.ctx.scheduler.replay(pos, tx).await?; + let encoded = msg.payload; + let txn: VersionedTransaction = bincode::deserialize(&encoded)?; + let txn = WithEncoded { txn, encoded }; + self.ctx.scheduler.replay(pos, txn).await?; Ok(()) } } diff --git a/test-kit/Cargo.toml b/test-kit/Cargo.toml index 925e3456a..fa00e9359 100644 --- a/test-kit/Cargo.toml +++ b/test-kit/Cargo.toml @@ -22,6 +22,7 @@ solana-rpc-client = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } solana-transaction = { workspace = true } +solana-transaction-error = { workspace = true } solana-transaction-status-client-types = { workspace = true } tempfile = { workspace = true } diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index eaedfa801..5c4f6d0c7 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -15,8 +15,7 @@ use magicblock_core::{ link, transactions::{ ReplayPosition, SanitizeableTransaction, SchedulerMode, - TransactionResult, TransactionSchedulerHandle, - TransactionSimulationResult, + TransactionSchedulerHandle, TransactionSimulationResult, }, DispatchEndpoints, }, @@ -37,6 +36,7 @@ use solana_program::{ use solana_signature::Signature; pub use solana_signer::Signer; use solana_transaction::Transaction; +use solana_transaction_error::TransactionResult; use solana_transaction_status_client_types::TransactionStatusMeta; use tempfile::TempDir; use tokio::sync::mpsc::Sender; @@ -167,6 +167,7 @@ impl ExecutionTestEnv { transaction_status_tx: validator_channels.transaction_status, txn_to_process_rx: validator_channels.transaction_to_process, tasks_tx: validator_channels.tasks_service, + replication_tx: validator_channels.replication_messages, environment, is_auto_airdrop_lamports_enabled: false, shutdown: Default::default(), @@ -330,7 +331,7 @@ impl ExecutionTestEnv { pub async fn execute_transaction( &self, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { self.transaction_scheduler.execute(txn).await.inspect_err( |err| error!(error = ?err, "Transaction execution failed"), ) @@ -372,7 +373,7 @@ impl ExecutionTestEnv { &self, persist: bool, txn: impl SanitizeableTransaction, - ) -> TransactionResult { + ) -> TransactionResult<()> { let position = ReplayPosition { slot: 0, index: 0,