Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions magicblock-accounts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ magicblock-accounts-db = { workspace = true }
magicblock-chainlink = { workspace = true }
magicblock-committor-service = { workspace = true }
magicblock-core = { workspace = true }
magicblock-metrics = { workspace = true }
magicblock-program = { workspace = true }
solana-hash = { workspace = true }
solana-pubkey = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions magicblock-accounts/src/scheduled_commits_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use magicblock_committor_service::{
intent_executor::ExecutionOutput, BaseIntentCommittor, CommittorService,
};
use magicblock_core::link::transactions::TransactionSchedulerHandle;
use magicblock_metrics::metrics;
use magicblock_program::{
magic_scheduled_base_intent::ScheduledIntentBundle,
register_scheduled_commit_sent, SentCommit, TransactionScheduler,
Expand Down Expand Up @@ -275,6 +276,7 @@ impl ScheduledCommitsProcessor for ScheduledCommitsProcessorImpl {
if intent_bundles.is_empty() {
return Ok(());
}
metrics::inc_committor_intents_count_by(intent_bundles.len() as u64);

// Add metas for intent we schedule
let pubkeys_being_undelegated = {
Expand Down
1 change: 1 addition & 0 deletions magicblock-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod errors;
mod fund_account;
mod genesis_utils;
pub mod ledger;
mod magic_sys_adapter;
pub mod magic_validator;
mod slot;
mod tickers;
101 changes: 101 additions & 0 deletions magicblock-api/src/magic_sys_adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration};

use magicblock_committor_service::CommittorService;
use magicblock_core::{intent::CommittedAccount, traits::MagicSys};
use magicblock_ledger::Ledger;
use magicblock_metrics::metrics;
use solana_instruction::error::InstructionError;
use solana_pubkey::Pubkey;
use tracing::{enabled, error, trace, Level};

#[derive(Clone)]
pub struct MagicSysAdapter {
ledger: Arc<Ledger>,
committor_service: Option<Arc<CommittorService>>,
}

impl MagicSysAdapter {
/// Returned when the sync channel is disconnected (sender dropped).
const RECV_ERR: u32 = 0xE000_0000;
/// Returned when waiting for the nonce fetch times out.
const TIMEOUT_ERR: u32 = 0xE000_0001;
/// Returned when the fetch of current commit nonces fails.
const FETCH_ERR: u32 = 0xE000_0002;
/// Returned when no committor service is configured.
const NO_COMMITTOR_ERR: u32 = 0xE000_0003;

const FETCH_TIMEOUT: Duration = Duration::from_secs(30);

pub fn new(
ledger: Arc<Ledger>,
committor_service: Option<Arc<CommittorService>>,
) -> Self {
Self {
ledger,
committor_service,
}
}
}

impl MagicSys for MagicSysAdapter {
fn persist(&self, id: u64, data: Vec<u8>) -> Result<(), Box<dyn Error>> {
trace!(id, data_len = data.len(), "Persisting data");
self.ledger.write_account_mod_data(id, &data.into())?;
Ok(())
}

fn load(&self, id: u64) -> Result<Option<Vec<u8>>, Box<dyn Error>> {
let data = self.ledger.read_account_mod_data(id)?.map(|x| x.data);
if enabled!(Level::TRACE) {
if let Some(data) = &data {
trace!(id, data_len = data.len(), "Loading data");
} else {
trace!(id, found = false, "Loading data");
}
}
Ok(data)
}

fn fetch_current_commit_nonces(
&self,
commits: &[CommittedAccount],
) -> Result<HashMap<Pubkey, u64>, InstructionError> {
if commits.is_empty() {
return Ok(HashMap::new());
}
let committor_service =
if let Some(committor_service) = &self.committor_service {
Ok(committor_service)
} else {
Err(InstructionError::Custom(Self::NO_COMMITTOR_ERR))
}?;

let min_context_slot = commits
.iter()
.map(|account| account.remote_slot)
.max()
.unwrap_or(0);
let pubkeys: Vec<_> =
commits.iter().map(|account| account.pubkey).collect();

let _timer = metrics::start_fetch_commit_nonces_wait_timer();
let receiver = committor_service
.fetch_current_commit_nonces_sync(&pubkeys, min_context_slot);
receiver
.recv_timeout(Self::FETCH_TIMEOUT)
.map_err(|err| match err {
std::sync::mpsc::RecvTimeoutError::Timeout => {
error!("Timed out waiting for commit nonces from CommittorService");
InstructionError::Custom(Self::TIMEOUT_ERR)
}
std::sync::mpsc::RecvTimeoutError::Disconnected => {
error!("CommittorService channel disconnected while waiting for commit nonces");
InstructionError::Custom(Self::RECV_ERR)
}
})?
.inspect_err(|err| {
error!(error = ?err, "Failed to fetch current commit nonces")
})
.map_err(|_| InstructionError::Custom(Self::FETCH_ERR))
}
}
9 changes: 7 additions & 2 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use magicblock_processor::{
scheduler::{state::TransactionSchedulerState, TransactionScheduler},
};
use magicblock_program::{
init_persister,
init_magic_sys,
validator::{self, validator_authority},
TransactionScheduler as ActionTransactionScheduler,
};
Expand Down Expand Up @@ -91,6 +91,7 @@ use crate::{
self, read_validator_keypair_from_ledger, validator_keypair_path,
write_validator_keypair_to_ledger,
},
magic_sys_adapter::MagicSysAdapter,
slot::advance_slot_and_update_ledger,
tickers::{init_slot_ticker, init_system_metrics_ticker},
};
Expand Down Expand Up @@ -212,6 +213,11 @@ impl MagicValidator {
let step_start = Instant::now();
let committor_service = Self::init_committor_service(&config).await?;
log_timing("startup", "committor_service_init", step_start);
init_magic_sys(Arc::new(MagicSysAdapter::new(
ledger.clone(),
committor_service.clone(),
)));

let step_start = Instant::now();
let chainlink = Arc::new(
Self::init_chainlink(
Expand Down Expand Up @@ -477,7 +483,6 @@ impl MagicValidator {
) -> ApiResult<(Arc<Ledger>, Slot)> {
let (ledger, last_slot) = ledger::init(storage, ledger_config)?;
let ledger_shared = Arc::new(ledger);
init_persister(ledger_shared.clone());
Ok((ledger_shared, last_slot))
}

Expand Down
2 changes: 2 additions & 0 deletions magicblock-committor-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lru = { workspace = true }
magicblock-committor-program = { workspace = true, features = [
"no-entrypoint",
] }
magicblock-core = { workspace = true }
magicblock-delegation-program = { workspace = true, features = [
"no-entrypoint",
] }
Expand Down Expand Up @@ -56,6 +57,7 @@ tokio-util = { workspace = true }

[dev-dependencies]
solana-signature = { workspace = true, features = ["rand"] }
serde_json = { workspace = true }
lazy_static = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
Expand Down
27 changes: 26 additions & 1 deletion magicblock-committor-service/src/committor_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::HashSet, path::Path, sync::Arc};
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::Arc,
};

use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle;
use magicblock_rpc_client::MagicblockRpcClient;
Expand All @@ -16,6 +20,10 @@ use crate::{
intent_execution_manager::{
db::DummyDB, BroadcastedIntentExecutionResult, IntentExecutionManager,
},
intent_executor::task_info_fetcher::{
CacheTaskInfoFetcher, RpcTaskInfoFetcher, TaskInfoFetcher,
TaskInfoFetcherResult,
},
persist::{
CommitStatusRow, IntentPersister, IntentPersisterImpl,
MessageSignatures,
Expand All @@ -28,6 +36,7 @@ pub(crate) struct CommittorProcessor {
pub(crate) authority: Keypair,
persister: IntentPersisterImpl,
commits_scheduler: IntentExecutionManager<DummyDB>,
task_info_fetcher: Arc<CacheTaskInfoFetcher<RpcTaskInfoFetcher>>,
}

impl CommittorProcessor {
Expand Down Expand Up @@ -58,9 +67,13 @@ impl CommittorProcessor {
let persister = IntentPersisterImpl::try_new(persist_file)?;

// Create commit scheduler
let task_info_fetcher = Arc::new(CacheTaskInfoFetcher::new(
RpcTaskInfoFetcher::new(magic_block_rpc_client.clone()),
));
let commits_scheduler = IntentExecutionManager::new(
magic_block_rpc_client.clone(),
DummyDB::new(),
task_info_fetcher.clone(),
Some(persister.clone()),
table_mania.clone(),
chain_config.compute_budget_config.clone(),
Expand All @@ -72,6 +85,7 @@ impl CommittorProcessor {
table_mania,
commits_scheduler,
persister,
task_info_fetcher,
})
}

Expand Down Expand Up @@ -149,4 +163,15 @@ impl CommittorProcessor {
) -> broadcast::Receiver<BroadcastedIntentExecutionResult> {
self.commits_scheduler.subscribe_for_results()
}

/// Fetches current commit nonces
pub async fn fetch_current_commit_nonces(
&self,
pubkeys: &[Pubkey],
min_context_slot: u64,
) -> TaskInfoFetcherResult<HashMap<Pubkey, u64>> {
self.task_info_fetcher
.fetch_current_commit_nonces(pubkeys, min_context_slot)
.await
}
}
8 changes: 7 additions & 1 deletion magicblock-committor-service/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use solana_signature::Signature;
use solana_transaction_error::TransactionError;
use thiserror::Error;

use crate::intent_execution_manager::IntentExecutionManagerError;
use crate::{
intent_execution_manager::IntentExecutionManagerError,
intent_executor::task_info_fetcher::TaskInfoFetcherError,
};

pub type CommittorServiceResult<T, E = CommittorServiceError> = Result<T, E>;

Expand All @@ -26,6 +29,9 @@ pub enum CommittorServiceError {
#[error("IntentExecutionManagerError: {0} ({0:?})")]
IntentExecutionManagerError(#[from] IntentExecutionManagerError),

#[error("TaskInfoFetcherError: {0} ({0:?})")]
TaskInfoFetcherError(#[from] TaskInfoFetcherError),

#[error(
"Failed send and confirm transaction to {0} on chain: {1} ({1:?})"
)]
Expand Down
7 changes: 3 additions & 4 deletions magicblock-committor-service/src/intent_execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
},
intent_executor::{
intent_executor_factory::IntentExecutorFactoryImpl,
task_info_fetcher::CacheTaskInfoFetcher,
task_info_fetcher::{CacheTaskInfoFetcher, RpcTaskInfoFetcher},
},
persist::IntentPersister,
ComputeBudgetConfig,
Expand All @@ -33,19 +33,18 @@ impl<D: DB> IntentExecutionManager<D> {
pub fn new<P: IntentPersister>(
rpc_client: MagicblockRpcClient,
db: D,
task_info_fetcher: Arc<CacheTaskInfoFetcher<RpcTaskInfoFetcher>>,
intent_persister: Option<P>,
table_mania: TableMania,
compute_budget_config: ComputeBudgetConfig,
) -> Self {
let db = Arc::new(db);

let commit_id_tracker =
Arc::new(CacheTaskInfoFetcher::new(rpc_client.clone()));
let executor_factory = IntentExecutorFactoryImpl {
rpc_client,
table_mania,
compute_budget_config,
commit_id_tracker,
task_info_fetcher,
};

let (sender, receiver) = mpsc::channel(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ mod edge_cases_test {

#[cfg(test)]
mod complete_error_test {
use magicblock_program::magic_scheduled_base_intent::CommittedAccount;
use magicblock_core::intent::CommittedAccount;
use solana_account::Account;
use solana_pubkey::pubkey;

Expand Down Expand Up @@ -854,8 +854,9 @@ pub(crate) fn create_test_intent(
pubkeys: &[Pubkey],
is_undelegate: bool,
) -> ScheduledIntentBundle {
use magicblock_core::intent::CommittedAccount;
use magicblock_program::magic_scheduled_base_intent::{
CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle,
CommitAndUndelegate, CommitType, MagicIntentBundle,
ScheduledIntentBundle, UndelegateType,
};
use solana_account::Account;
Expand Down Expand Up @@ -903,8 +904,9 @@ pub(crate) fn create_test_intent_bundle(
commit_pubkeys: &[Pubkey],
commit_and_undelegate_pubkeys: &[Pubkey],
) -> ScheduledIntentBundle {
use magicblock_core::intent::CommittedAccount;
use magicblock_program::magic_scheduled_base_intent::{
CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle,
CommitAndUndelegate, CommitType, MagicIntentBundle,
ScheduledIntentBundle, UndelegateType,
};
use solana_account::Account;
Expand Down
Loading