diff --git a/Cargo.lock b/Cargo.lock index 544797123..8f7d1c7ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2962,6 +2962,7 @@ dependencies = [ "magicblock-chainlink", "magicblock-committor-service", "magicblock-core", + "magicblock-metrics", "magicblock-program", "solana-hash", "solana-pubkey", @@ -3180,6 +3181,7 @@ dependencies = [ "lazy_static", "lru 0.16.2", "magicblock-committor-program", + "magicblock-core", "magicblock-delegation-program", "magicblock-metrics", "magicblock-program", @@ -3187,6 +3189,7 @@ dependencies = [ "magicblock-table-mania", "rand 0.9.2", "rusqlite", + "serde_json", "solana-account", "solana-account-decoder", "solana-address-lookup-table-interface", @@ -3243,6 +3246,7 @@ version = "0.8.2" dependencies = [ "flume", "magicblock-magic-program-api", + "serde", "solana-account", "solana-account-decoder", "solana-hash", @@ -3408,6 +3412,7 @@ dependencies = [ "parking_lot", "rand 0.9.2", "serde", + "serial_test", "solana-account", "solana-account-info", "solana-clock", diff --git a/magicblock-accounts/Cargo.toml b/magicblock-accounts/Cargo.toml index 617931422..aaf0879d4 100644 --- a/magicblock-accounts/Cargo.toml +++ b/magicblock-accounts/Cargo.toml @@ -16,6 +16,7 @@ magicblock-accounts-db = { workspace = true } magicblock-chainlink = { workspace = true } magicblock-committor-service = { workspace = true } magicblock-core = { workspace = true } +magicblock-metrics = { workspace = true } magicblock-program = { workspace = true } solana-hash = { workspace = true } solana-pubkey = { workspace = true } diff --git a/magicblock-accounts/src/scheduled_commits_processor.rs b/magicblock-accounts/src/scheduled_commits_processor.rs index 88c4ed4ed..619abe562 100644 --- a/magicblock-accounts/src/scheduled_commits_processor.rs +++ b/magicblock-accounts/src/scheduled_commits_processor.rs @@ -19,6 +19,7 @@ use magicblock_committor_service::{ intent_executor::ExecutionOutput, BaseIntentCommittor, CommittorService, }; use magicblock_core::link::transactions::TransactionSchedulerHandle; +use magicblock_metrics::metrics; use magicblock_program::{ magic_scheduled_base_intent::ScheduledIntentBundle, register_scheduled_commit_sent, SentCommit, TransactionScheduler, @@ -275,6 +276,7 @@ impl ScheduledCommitsProcessor for ScheduledCommitsProcessorImpl { if intent_bundles.is_empty() { return Ok(()); } + metrics::inc_committor_intents_count_by(intent_bundles.len() as u64); // Add metas for intent we schedule let pubkeys_being_undelegated = { diff --git a/magicblock-api/src/lib.rs b/magicblock-api/src/lib.rs index a82f4bc97..853019317 100644 --- a/magicblock-api/src/lib.rs +++ b/magicblock-api/src/lib.rs @@ -3,6 +3,7 @@ pub mod errors; mod fund_account; mod genesis_utils; pub mod ledger; +mod magic_sys_adapter; pub mod magic_validator; mod slot; mod tickers; diff --git a/magicblock-api/src/magic_sys_adapter.rs b/magicblock-api/src/magic_sys_adapter.rs new file mode 100644 index 000000000..9515eea16 --- /dev/null +++ b/magicblock-api/src/magic_sys_adapter.rs @@ -0,0 +1,101 @@ +use std::{collections::HashMap, error::Error, sync::Arc, time::Duration}; + +use magicblock_committor_service::CommittorService; +use magicblock_core::{intent::CommittedAccount, traits::MagicSys}; +use magicblock_ledger::Ledger; +use magicblock_metrics::metrics; +use solana_instruction::error::InstructionError; +use solana_pubkey::Pubkey; +use tracing::{enabled, error, trace, Level}; + +#[derive(Clone)] +pub struct MagicSysAdapter { + ledger: Arc, + committor_service: Option>, +} + +impl MagicSysAdapter { + /// Returned when the sync channel is disconnected (sender dropped). + const RECV_ERR: u32 = 0xE000_0000; + /// Returned when waiting for the nonce fetch times out. + const TIMEOUT_ERR: u32 = 0xE000_0001; + /// Returned when the fetch of current commit nonces fails. + const FETCH_ERR: u32 = 0xE000_0002; + /// Returned when no committor service is configured. + const NO_COMMITTOR_ERR: u32 = 0xE000_0003; + + const FETCH_TIMEOUT: Duration = Duration::from_secs(30); + + pub fn new( + ledger: Arc, + committor_service: Option>, + ) -> Self { + Self { + ledger, + committor_service, + } + } +} + +impl MagicSys for MagicSysAdapter { + fn persist(&self, id: u64, data: Vec) -> Result<(), Box> { + trace!(id, data_len = data.len(), "Persisting data"); + self.ledger.write_account_mod_data(id, &data.into())?; + Ok(()) + } + + fn load(&self, id: u64) -> Result>, Box> { + let data = self.ledger.read_account_mod_data(id)?.map(|x| x.data); + if enabled!(Level::TRACE) { + if let Some(data) = &data { + trace!(id, data_len = data.len(), "Loading data"); + } else { + trace!(id, found = false, "Loading data"); + } + } + Ok(data) + } + + fn fetch_current_commit_nonces( + &self, + commits: &[CommittedAccount], + ) -> Result, InstructionError> { + if commits.is_empty() { + return Ok(HashMap::new()); + } + let committor_service = + if let Some(committor_service) = &self.committor_service { + Ok(committor_service) + } else { + Err(InstructionError::Custom(Self::NO_COMMITTOR_ERR)) + }?; + + let min_context_slot = commits + .iter() + .map(|account| account.remote_slot) + .max() + .unwrap_or(0); + let pubkeys: Vec<_> = + commits.iter().map(|account| account.pubkey).collect(); + + let _timer = metrics::start_fetch_commit_nonces_wait_timer(); + let receiver = committor_service + .fetch_current_commit_nonces_sync(&pubkeys, min_context_slot); + receiver + .recv_timeout(Self::FETCH_TIMEOUT) + .map_err(|err| match err { + std::sync::mpsc::RecvTimeoutError::Timeout => { + error!("Timed out waiting for commit nonces from CommittorService"); + InstructionError::Custom(Self::TIMEOUT_ERR) + } + std::sync::mpsc::RecvTimeoutError::Disconnected => { + error!("CommittorService channel disconnected while waiting for commit nonces"); + InstructionError::Custom(Self::RECV_ERR) + } + })? + .inspect_err(|err| { + error!(error = ?err, "Failed to fetch current commit nonces") + }) + .map_err(|_| InstructionError::Custom(Self::FETCH_ERR)) + } +} diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 249513aa7..fd3d1e62c 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -57,7 +57,7 @@ use magicblock_processor::{ scheduler::{state::TransactionSchedulerState, TransactionScheduler}, }; use magicblock_program::{ - init_persister, + init_magic_sys, validator::{self, validator_authority}, TransactionScheduler as ActionTransactionScheduler, }; @@ -91,6 +91,7 @@ use crate::{ self, read_validator_keypair_from_ledger, validator_keypair_path, write_validator_keypair_to_ledger, }, + magic_sys_adapter::MagicSysAdapter, slot::advance_slot_and_update_ledger, tickers::{init_slot_ticker, init_system_metrics_ticker}, }; @@ -212,6 +213,11 @@ impl MagicValidator { let step_start = Instant::now(); let committor_service = Self::init_committor_service(&config).await?; log_timing("startup", "committor_service_init", step_start); + init_magic_sys(Arc::new(MagicSysAdapter::new( + ledger.clone(), + committor_service.clone(), + ))); + let step_start = Instant::now(); let chainlink = Arc::new( Self::init_chainlink( @@ -477,7 +483,6 @@ impl MagicValidator { ) -> ApiResult<(Arc, Slot)> { let (ledger, last_slot) = ledger::init(storage, ledger_config)?; let ledger_shared = Arc::new(ledger); - init_persister(ledger_shared.clone()); Ok((ledger_shared, last_slot)) } diff --git a/magicblock-committor-service/Cargo.toml b/magicblock-committor-service/Cargo.toml index 4aa583f8d..de5433181 100644 --- a/magicblock-committor-service/Cargo.toml +++ b/magicblock-committor-service/Cargo.toml @@ -22,6 +22,7 @@ lru = { workspace = true } magicblock-committor-program = { workspace = true, features = [ "no-entrypoint", ] } +magicblock-core = { workspace = true } magicblock-delegation-program = { workspace = true, features = [ "no-entrypoint", ] } @@ -56,6 +57,7 @@ tokio-util = { workspace = true } [dev-dependencies] solana-signature = { workspace = true, features = ["rand"] } +serde_json = { workspace = true } lazy_static = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } diff --git a/magicblock-committor-service/src/committor_processor.rs b/magicblock-committor-service/src/committor_processor.rs index 342c25a85..2e3e1335c 100644 --- a/magicblock-committor-service/src/committor_processor.rs +++ b/magicblock-committor-service/src/committor_processor.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet, path::Path, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + path::Path, + sync::Arc, +}; use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; use magicblock_rpc_client::MagicblockRpcClient; @@ -16,6 +20,10 @@ use crate::{ intent_execution_manager::{ db::DummyDB, BroadcastedIntentExecutionResult, IntentExecutionManager, }, + intent_executor::task_info_fetcher::{ + CacheTaskInfoFetcher, RpcTaskInfoFetcher, TaskInfoFetcher, + TaskInfoFetcherResult, + }, persist::{ CommitStatusRow, IntentPersister, IntentPersisterImpl, MessageSignatures, @@ -28,6 +36,7 @@ pub(crate) struct CommittorProcessor { pub(crate) authority: Keypair, persister: IntentPersisterImpl, commits_scheduler: IntentExecutionManager, + task_info_fetcher: Arc>, } impl CommittorProcessor { @@ -58,9 +67,13 @@ impl CommittorProcessor { let persister = IntentPersisterImpl::try_new(persist_file)?; // Create commit scheduler + let task_info_fetcher = Arc::new(CacheTaskInfoFetcher::new( + RpcTaskInfoFetcher::new(magic_block_rpc_client.clone()), + )); let commits_scheduler = IntentExecutionManager::new( magic_block_rpc_client.clone(), DummyDB::new(), + task_info_fetcher.clone(), Some(persister.clone()), table_mania.clone(), chain_config.compute_budget_config.clone(), @@ -72,6 +85,7 @@ impl CommittorProcessor { table_mania, commits_scheduler, persister, + task_info_fetcher, }) } @@ -149,4 +163,15 @@ impl CommittorProcessor { ) -> broadcast::Receiver { self.commits_scheduler.subscribe_for_results() } + + /// Fetches current commit nonces + pub async fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + self.task_info_fetcher + .fetch_current_commit_nonces(pubkeys, min_context_slot) + .await + } } diff --git a/magicblock-committor-service/src/error.rs b/magicblock-committor-service/src/error.rs index 266c1388c..fa13e49b8 100644 --- a/magicblock-committor-service/src/error.rs +++ b/magicblock-committor-service/src/error.rs @@ -3,7 +3,10 @@ use solana_signature::Signature; use solana_transaction_error::TransactionError; use thiserror::Error; -use crate::intent_execution_manager::IntentExecutionManagerError; +use crate::{ + intent_execution_manager::IntentExecutionManagerError, + intent_executor::task_info_fetcher::TaskInfoFetcherError, +}; pub type CommittorServiceResult = Result; @@ -26,6 +29,9 @@ pub enum CommittorServiceError { #[error("IntentExecutionManagerError: {0} ({0:?})")] IntentExecutionManagerError(#[from] IntentExecutionManagerError), + #[error("TaskInfoFetcherError: {0} ({0:?})")] + TaskInfoFetcherError(#[from] TaskInfoFetcherError), + #[error( "Failed send and confirm transaction to {0} on chain: {1} ({1:?})" )] diff --git a/magicblock-committor-service/src/intent_execution_manager.rs b/magicblock-committor-service/src/intent_execution_manager.rs index bb13063c6..d53918f13 100644 --- a/magicblock-committor-service/src/intent_execution_manager.rs +++ b/magicblock-committor-service/src/intent_execution_manager.rs @@ -17,7 +17,7 @@ use crate::{ }, intent_executor::{ intent_executor_factory::IntentExecutorFactoryImpl, - task_info_fetcher::CacheTaskInfoFetcher, + task_info_fetcher::{CacheTaskInfoFetcher, RpcTaskInfoFetcher}, }, persist::IntentPersister, ComputeBudgetConfig, @@ -33,19 +33,18 @@ impl IntentExecutionManager { pub fn new( rpc_client: MagicblockRpcClient, db: D, + task_info_fetcher: Arc>, intent_persister: Option

, table_mania: TableMania, compute_budget_config: ComputeBudgetConfig, ) -> Self { let db = Arc::new(db); - let commit_id_tracker = - Arc::new(CacheTaskInfoFetcher::new(rpc_client.clone())); let executor_factory = IntentExecutorFactoryImpl { rpc_client, table_mania, compute_budget_config, - commit_id_tracker, + task_info_fetcher, }; let (sender, receiver) = mpsc::channel(1000); diff --git a/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs b/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs index 57f497131..580acebf6 100644 --- a/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs +++ b/magicblock-committor-service/src/intent_execution_manager/intent_scheduler.rs @@ -637,7 +637,7 @@ mod edge_cases_test { #[cfg(test)] mod complete_error_test { - use magicblock_program::magic_scheduled_base_intent::CommittedAccount; + use magicblock_core::intent::CommittedAccount; use solana_account::Account; use solana_pubkey::pubkey; @@ -854,8 +854,9 @@ pub(crate) fn create_test_intent( pubkeys: &[Pubkey], is_undelegate: bool, ) -> ScheduledIntentBundle { + use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ - CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle, + CommitAndUndelegate, CommitType, MagicIntentBundle, ScheduledIntentBundle, UndelegateType, }; use solana_account::Account; @@ -903,8 +904,9 @@ pub(crate) fn create_test_intent_bundle( commit_pubkeys: &[Pubkey], commit_and_undelegate_pubkeys: &[Pubkey], ) -> ScheduledIntentBundle { + use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ - CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle, + CommitAndUndelegate, CommitType, MagicIntentBundle, ScheduledIntentBundle, UndelegateType, }; use solana_account::Account; diff --git a/magicblock-committor-service/src/intent_executor/intent_executor_factory.rs b/magicblock-committor-service/src/intent_executor/intent_executor_factory.rs index 24ac3d33c..6f61e389c 100644 --- a/magicblock-committor-service/src/intent_executor/intent_executor_factory.rs +++ b/magicblock-committor-service/src/intent_executor/intent_executor_factory.rs @@ -5,8 +5,8 @@ use magicblock_table_mania::TableMania; use crate::{ intent_executor::{ - task_info_fetcher::CacheTaskInfoFetcher, IntentExecutor, - IntentExecutorImpl, + task_info_fetcher::{CacheTaskInfoFetcher, RpcTaskInfoFetcher}, + IntentExecutor, IntentExecutorImpl, }, transaction_preparator::TransactionPreparatorImpl, ComputeBudgetConfig, @@ -23,12 +23,12 @@ pub struct IntentExecutorFactoryImpl { pub rpc_client: MagicblockRpcClient, pub table_mania: TableMania, pub compute_budget_config: ComputeBudgetConfig, - pub commit_id_tracker: Arc, + pub task_info_fetcher: Arc>, } impl IntentExecutorFactory for IntentExecutorFactoryImpl { type Executor = - IntentExecutorImpl; + IntentExecutorImpl; fn create_instance(&self) -> Self::Executor { let transaction_preparator = TransactionPreparatorImpl::new( @@ -36,10 +36,10 @@ impl IntentExecutorFactory for IntentExecutorFactoryImpl { self.table_mania.clone(), self.compute_budget_config.clone(), ); - IntentExecutorImpl::::new( + IntentExecutorImpl::::new( self.rpc_client.clone(), transaction_preparator, - self.commit_id_tracker.clone(), + self.task_info_fetcher.clone(), ) } } diff --git a/magicblock-committor-service/src/intent_executor/mod.rs b/magicblock-committor-service/src/intent_executor/mod.rs index 997cfbc9e..893a8af30 100644 --- a/magicblock-committor-service/src/intent_executor/mod.rs +++ b/magicblock-committor-service/src/intent_executor/mod.rs @@ -38,7 +38,7 @@ use crate::{ TransactionStrategyExecutionError, }, single_stage_executor::SingleStageExecutor, - task_info_fetcher::{ResetType, TaskInfoFetcher}, + task_info_fetcher::{CacheTaskInfoFetcher, ResetType, TaskInfoFetcher}, two_stage_executor::TwoStageExecutor, }, persist::{CommitStatus, CommitStatusSignatures, IntentPersister}, @@ -107,7 +107,7 @@ pub struct IntentExecutorImpl { authority: Keypair, rpc_client: MagicblockRpcClient, transaction_preparator: T, - task_info_fetcher: Arc, + task_info_fetcher: Arc>, /// Junk that needs to be cleaned up pub junk: Vec, @@ -123,7 +123,7 @@ where pub fn new( rpc_client: MagicblockRpcClient, transaction_preparator: T, - task_info_fetcher: Arc, + task_info_fetcher: Arc>, ) -> Self { let authority = validator_authority(); Self { @@ -333,7 +333,7 @@ where .reset(ResetType::Specific(committed_pubkeys)); let commit_ids = self .task_info_fetcher - .fetch_next_commit_ids(committed_pubkeys, min_context_slot) + .fetch_next_commit_nonces(committed_pubkeys, min_context_slot) .await .map_err(TaskBuilderError::CommitTasksBuildError)?; diff --git a/magicblock-committor-service/src/intent_executor/task_info_fetcher.rs b/magicblock-committor-service/src/intent_executor/task_info_fetcher.rs index 415d12437..a53211514 100644 --- a/magicblock-committor-service/src/intent_executor/task_info_fetcher.rs +++ b/magicblock-committor-service/src/intent_executor/task_info_fetcher.rs @@ -1,5 +1,9 @@ use std::{ - collections::HashMap, num::NonZeroUsize, sync::Mutex, time::Duration, + collections::HashMap, + mem, + num::NonZeroUsize, + sync::{Arc, Mutex}, + time::Duration, }; use async_trait::async_trait; @@ -18,6 +22,7 @@ use solana_rpc_client_api::{ request::RpcError, }; use solana_signature::Signature; +use tokio::sync::{Mutex as TMutex, MutexGuard}; use tracing::{error, info, warn}; const NUM_FETCH_RETRIES: NonZeroUsize = NonZeroUsize::new(5).unwrap(); @@ -25,9 +30,17 @@ const MUTEX_POISONED_MSG: &str = "CacheTaskInfoFetcher mutex poisoned!"; #[async_trait] pub trait TaskInfoFetcher: Send + Sync + 'static { - /// Fetches correct next ids for pubkeys - /// Those ids can be used as correct commit_id during Commit - async fn fetch_next_commit_ids( + /// Fetches correct commit nonces for pubkeys + /// Those nonces can be used as correct commit_nonce during Commit + async fn fetch_next_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult>; + + /// Fetches current commit nonces for pubkeys + /// Missing nonces will be fetched from chain + async fn fetch_current_commit_nonces( &self, pubkeys: &[Pubkey], min_context_slot: u64, @@ -40,12 +53,6 @@ pub trait TaskInfoFetcher: Send + Sync + 'static { min_context_slot: u64, ) -> TaskInfoFetcherResult>; - /// Peeks current commit ids for pubkeys - fn peek_commit_id(&self, pubkey: &Pubkey) -> Option; - - /// Resets cache for some or all accounts - fn reset(&self, reset_type: ResetType); - async fn get_base_accounts( &self, pubkeys: &[Pubkey], @@ -53,24 +60,18 @@ pub trait TaskInfoFetcher: Send + Sync + 'static { ) -> TaskInfoFetcherResult>; } -pub enum ResetType<'a> { - All, - Specific(&'a [Pubkey]), -} +// --------------------------------------------------------------------------- +// RpcTaskInfoFetcher +// --------------------------------------------------------------------------- -pub struct CacheTaskInfoFetcher { +/// Pure RPC implementation of [`TaskInfoFetcher`] — no caching. +pub struct RpcTaskInfoFetcher { rpc_client: MagicblockRpcClient, - cache: Mutex>, } -impl CacheTaskInfoFetcher { +impl RpcTaskInfoFetcher { pub fn new(rpc_client: MagicblockRpcClient) -> Self { - const CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap(); - - Self { - rpc_client, - cache: Mutex::new(LruCache::new(CACHE_SIZE)), - } + Self { rpc_client } } /// Fetches [`DelegationMetadata`]s with some num of retries @@ -151,7 +152,6 @@ impl CacheTaskInfoFetcher { break Err(err); } TaskInfoFetcherError::MinContextSlotNotReachedError(_, _) => { - // Get some extra sleep info!( min_context_slot, attempt = i, @@ -178,7 +178,6 @@ impl CacheTaskInfoFetcher { pubkeys: &[Pubkey], min_context_slot: u64, ) -> TaskInfoFetcherResult> { - // Early return if no pubkeys to process if pubkeys.is_empty() { return Ok(Vec::new()); } @@ -224,12 +223,9 @@ impl CacheTaskInfoFetcher { } } -/// TaskInfoFetcher implementation that also caches most used 1000 keys #[async_trait] -impl TaskInfoFetcher for CacheTaskInfoFetcher { - /// Returns next ids for requested pubkeys - /// If key isn't in cache, it will be requested - async fn fetch_next_commit_ids( +impl TaskInfoFetcher for RpcTaskInfoFetcher { + async fn fetch_next_commit_nonces( &self, pubkeys: &[Pubkey], min_context_slot: u64, @@ -237,67 +233,36 @@ impl TaskInfoFetcher for CacheTaskInfoFetcher { if pubkeys.is_empty() { return Ok(HashMap::new()); } - - let mut result = HashMap::new(); - let mut to_request = Vec::new(); - // Lock cache and extract whatever ids we can - { - let mut cache = self.cache.lock().expect(MUTEX_POISONED_MSG); - for pubkey in pubkeys { - // in case already inserted - if result.contains_key(pubkey) { - continue; - } - - if let Some(id) = cache.get(pubkey) { - result.insert(*pubkey, *id + 1); - } else { - to_request.push(*pubkey); - } - } - } - - // If all in cache - great! return - if to_request.is_empty() { - let mut cache = self.cache.lock().expect(MUTEX_POISONED_MSG); - result.iter().for_each(|(pubkey, id)| { - cache.push(*pubkey, *id); - }); - - return Ok(result); - } - - // Remove duplicates - to_request.sort(); - to_request.dedup(); - - let remaining_ids = Self::fetch_metadata_with_retries( + let nonces = Self::fetch_metadata_with_retries( &self.rpc_client, - &to_request, + pubkeys, min_context_slot, NUM_FETCH_RETRIES, ) .await? .into_iter() - .map(|metadata| metadata.last_update_nonce); + .map(|m| m.last_update_nonce + 1); + Ok(pubkeys.iter().copied().zip(nonces).collect()) + } - // We don't care if anything changed in between with cache - just update and return our ids. - { - let mut cache = self.cache.lock().expect(MUTEX_POISONED_MSG); - // Avoid changes to LRU until all data is ready - atomic update - result.iter().for_each(|(pubkey, id)| { - cache.push(*pubkey, *id); - }); - to_request - .iter() - .zip(remaining_ids) - .for_each(|(pubkey, id)| { - result.insert(*pubkey, id + 1); - cache.push(*pubkey, id + 1); - }); + async fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + if pubkeys.is_empty() { + return Ok(HashMap::new()); } - - Ok(result) + let nonces = Self::fetch_metadata_with_retries( + &self.rpc_client, + pubkeys, + min_context_slot, + NUM_FETCH_RETRIES, + ) + .await? + .into_iter() + .map(|m| m.last_update_nonce); + Ok(pubkeys.iter().copied().zip(nonces).collect()) } async fn fetch_rent_reimbursements( @@ -305,7 +270,7 @@ impl TaskInfoFetcher for CacheTaskInfoFetcher { pubkeys: &[Pubkey], min_context_slot: u64, ) -> TaskInfoFetcherResult> { - let rent_reimbursements = Self::fetch_metadata_with_retries( + Ok(Self::fetch_metadata_with_retries( &self.rpc_client, pubkeys, min_context_slot, @@ -313,31 +278,349 @@ impl TaskInfoFetcher for CacheTaskInfoFetcher { ) .await? .into_iter() - .map(|metadata| metadata.rent_payer) - .collect(); + .map(|m| m.rent_payer) + .collect()) + } + + async fn get_base_accounts( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + let accounts = Self::fetch_accounts_with_retries( + &self.rpc_client, + pubkeys, + min_context_slot, + NUM_FETCH_RETRIES, + ) + .await?; + Ok(pubkeys.iter().copied().zip(accounts).collect()) + } +} + +/// Per-account async mutex protecting the cached nonce value. +type NonceLock = Arc>; - Ok(rent_reimbursements) +/// Split-map cache: `active` is the live LRU window; `retiring` holds locks +/// evicted from `active` that are still held by in-flight requests. +struct CacheInner { + active: LruCache, + retiring: HashMap, +} + +impl CacheInner { + fn new(capacity: NonZeroUsize) -> Self { + Self { + active: LruCache::new(capacity), + retiring: HashMap::new(), + } } +} - /// Returns current commit id without raising priority - fn peek_commit_id(&self, pubkey: &Pubkey) -> Option { - let cache = self.cache.lock().expect(MUTEX_POISONED_MSG); - cache.peek(pubkey).copied() +/// RAII guard returned by [`CacheTaskInfoFetcher::acquire_nonce_locks`]. +/// Holds clones of the reserved [`NonceLock`]s and cleans up retiring entries +/// on drop. +struct CacheInnerGuard<'a> { + inner: &'a Mutex, + nonce_locks: Vec<(Pubkey, NonceLock)>, +} + +impl<'a> CacheInnerGuard<'a> { + // Acquire per-account locks sequentially in sorted order (see sort above). + // join_all would poll all futures concurrently, allowing partial acquisition + // and producing the classic A→B / B→A deadlock across concurrent callers. + async fn lock<'s>(&'s self) -> Vec<(&'s Pubkey, MutexGuard<'s, u64>)> { + let mut output = Vec::with_capacity(self.nonce_locks.len()); + for (pubkey, lock) in self.nonce_locks.iter() { + let guard = lock.lock().await; + output.push((pubkey, guard)) + } + + output } +} + +impl<'a> Drop for CacheInnerGuard<'a> { + fn drop(&mut self) { + let mut inner = self.inner.lock().expect(MUTEX_POISONED_MSG); + let nonce_locks = mem::take(&mut self.nonce_locks); + for (pubkey, lock) in nonce_locks { + // Drop our clone first so strong_count reflects only other + // live holders when we check below + drop(lock); + let should_remove = inner + .retiring + .get(&pubkey) + .is_some_and(|l| Arc::strong_count(l) == 1); + if should_remove { + inner.retiring.remove(&pubkey); + } + } - /// Reset cache - fn reset(&self, reset_type: ResetType) { + metrics::set_task_info_fetcher_retiring_count( + inner.retiring.len() as i64 + ); + } +} + +/// [`TaskInfoFetcher`] that caches the most recently used nonces, delegating +/// cache misses and all non-nonce queries to an inner `T: TaskInfoFetcher`. +pub struct CacheTaskInfoFetcher { + inner: T, + cache: Mutex, +} + +impl CacheTaskInfoFetcher { + pub fn new(inner: T) -> Self { + const CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap(); + + Self { + inner, + cache: Mutex::new(CacheInner::new(CACHE_SIZE)), + } + } + + pub fn with_capacity(capacity: NonZeroUsize, inner: T) -> Self { + Self { + inner, + cache: Mutex::new(CacheInner::new(capacity)), + } + } + + /// Returns the cached nonce without promoting LRU order or incrementing. + pub async fn peek_commit_nonce(&self, pubkey: &Pubkey) -> Option { + let lock = { + let inner = self.cache.lock().expect(MUTEX_POISONED_MSG); + inner + .active + .peek(pubkey) + .or_else(|| inner.retiring.get(pubkey)) + .cloned() + }?; + + let locks_guard = CacheInnerGuard { + inner: &self.cache, + nonce_locks: vec![(*pubkey, lock)], + }; + let guards = locks_guard.lock().await; + let value = *guards[0].1; + + (value != u64::MAX).then_some(value) + } + + /// Resets cache for some or all accounts + pub fn reset(&self, reset_type: ResetType) { + let mut cache = self.cache.lock().expect(MUTEX_POISONED_MSG); match reset_type { ResetType::All => { - self.cache.lock().expect(MUTEX_POISONED_MSG).clear() + cache.active.clear(); + cache.retiring.clear(); } ResetType::Specific(pubkeys) => { - let mut cache = self.cache.lock().expect(MUTEX_POISONED_MSG); - pubkeys.iter().for_each(|pubkey| { - let _ = cache.pop(pubkey); - }); + for pubkey in pubkeys { + cache.active.pop(pubkey); + cache.retiring.remove(pubkey); + } + } + } + } + + /// Ensures a [`NonceLock`] exists in the cache for each pubkey and returns + /// a [`CacheInnerGuard`] holding clones of those locks. + fn acquire_nonce_locks(&self, pubkeys: &[Pubkey]) -> CacheInnerGuard<'_> { + // Sorted order is required: all callers acquire per-key locks in the same + // order, preventing the A→B / B→A circular-wait deadlock. + let mut pubkeys = pubkeys.to_vec(); + pubkeys.sort_unstable(); + pubkeys.dedup(); + + let mut nonce_locks = vec![]; + { + let mut inner = self.cache.lock().expect(MUTEX_POISONED_MSG); + for pubkey in pubkeys { + let (lock, evicted) = + if let Some(val) = inner.active.get(&pubkey) { + (val.clone(), None) + } else if let Some(val) = inner.retiring.remove(&pubkey) { + // This promotes retiring to active + let evicted = inner.active.push(pubkey, val.clone()); + (val, evicted) + } else { + let val = Arc::new(TMutex::new(u64::MAX)); + let evicted = inner.active.push(pubkey, val.clone()); + (val, evicted) + }; + + if let Some((evicted_pk, evicted_lock)) = evicted { + // If value isn't used by anyone then it can be dropped + if Arc::strong_count(&evicted_lock) > 1 { + // Value used in by another request + // We can't drop evicted lock in that case + // We move it to retiring, which will be cleaned up on exit + // Race condition scenario: + // 1. set of accs A evicted due to surge of requests - locks are dropped + // 2. request for set A still ongoing + // 3, another request with set A comes in, creating new locks in `CacheInner::active` + // 4. 2 simultaneous requestors receive same value + let old = + inner.retiring.insert(evicted_pk, evicted_lock); + if old.is_some() { + // Safety + // assume that is true: + // That means that value was active & retiring at the same time + // This is impossible as per logic above, contradiction. Q.E.D. + debug_assert!( + false, + "Just evicted value can't be in retiring" + ); + error!("Retiring map already contained lock with pubkey: {}", evicted_pk); + } + } + } + + nonce_locks.push((pubkey, lock)); + } + metrics::set_task_info_fetcher_retiring_count( + inner.retiring.len() as i64 + ); + } + + CacheInnerGuard { + inner: &self.cache, + nonce_locks, + } + } +} + +/// TaskInfoFetcher implementation that caches the most used 1000 nonces +#[async_trait] +impl TaskInfoFetcher for CacheTaskInfoFetcher { + /// Returns next ids for requested pubkeys + /// If key isn't in cache, it will be requested + async fn fetch_next_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + if pubkeys.is_empty() { + return Ok(HashMap::new()); + } + + // Acquire locks on requested nonces + let locks_guard = self.acquire_nonce_locks(pubkeys); + + // Acquire per-account locks sequentially in sorted order (see sort above). + // join_all would poll all futures concurrently, allowing partial acquisition + // and producing the classic A→B / B→A deadlock across concurrent callers. + let nonce_guards = locks_guard.lock().await; + let (mut existing, mut missing) = (vec![], vec![]); + for (pubkey, guard) in nonce_guards { + if *guard == u64::MAX { + missing.push((pubkey, guard)); + } else { + existing.push((pubkey, guard)) + } + } + + // If all in cache - great! return + if missing.is_empty() { + let mut result = HashMap::with_capacity(existing.len()); + for (pubkey, mut guard) in existing { + *guard += 1; + result.insert(*pubkey, *guard); + } + return Ok(result); + } + + // Fetch missing nonces in cache + let fetched_nonces = { + let missing_pubkeys: Vec<_> = + missing.iter().map(|(pubkey, _)| **pubkey).collect(); + self.inner + .fetch_current_commit_nonces(&missing_pubkeys, min_context_slot) + .await? + }; + + // We don't care if anything changed in between with cache - just update and return our ids. + let mut result = HashMap::with_capacity(existing.len()); + for (pubkey, mut guard) in existing { + *guard += 1; + result.insert(*pubkey, *guard); + } + for (pubkey, mut guard) in missing { + if let Some(&nonce) = fetched_nonces.get(pubkey) { + *guard = nonce + 1; + result.insert(*pubkey, *guard); + Ok(()) + } else { + Err(TaskInfoFetcherError::AccountNotFoundError(*pubkey)) + }?; + } + + Ok(result) + } + + async fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + if pubkeys.is_empty() { + return Ok(HashMap::new()); + } + + // Acquire locks on requested nonces + let locks_guard = self.acquire_nonce_locks(pubkeys); + + // Acquire per-account locks sequentially in sorted order (see sort above). + let nonce_guards = locks_guard.lock().await; + let mut missing = vec![]; + let mut result = HashMap::with_capacity(nonce_guards.len()); + for (pubkey, guard) in nonce_guards { + if *guard == u64::MAX { + missing.push((pubkey, guard)); + } else { + result.insert(*pubkey, *guard); } } + + if missing.is_empty() { + return Ok(result); + } + + // Fetch missing nonces in cache + let fetched_nonces = { + let missing_pubkeys: Vec<_> = + missing.iter().map(|(pubkey, _)| **pubkey).collect(); + self.inner + .fetch_current_commit_nonces(&missing_pubkeys, min_context_slot) + .await? + }; + + // Store the on-chain nonce as-is (no +1): recording current state, not + // reserving the next slot. A subsequent fetch_next_commit_nonces call will + // increment from here correctly. + for (pubkey, mut guard) in missing { + if let Some(&nonce) = fetched_nonces.get(pubkey) { + *guard = nonce; + result.insert(*pubkey, nonce); + Ok(()) + } else { + Err(TaskInfoFetcherError::AccountNotFoundError(*pubkey)) + }? + } + + Ok(result) + } + + async fn fetch_rent_reimbursements( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + self.inner + .fetch_rent_reimbursements(pubkeys, min_context_slot) + .await } async fn get_base_accounts( @@ -345,18 +628,17 @@ impl TaskInfoFetcher for CacheTaskInfoFetcher { pubkeys: &[Pubkey], min_context_slot: u64, ) -> TaskInfoFetcherResult> { - let accounts = Self::fetch_accounts_with_retries( - &self.rpc_client, - pubkeys, - min_context_slot, - NUM_FETCH_RETRIES, - ) - .await?; - - Ok(pubkeys.iter().copied().zip(accounts).collect()) + self.inner + .get_base_accounts(pubkeys, min_context_slot) + .await } } +pub enum ResetType<'a> { + All, + Specific(&'a [Pubkey]), +} + #[derive(thiserror::Error, Debug)] pub enum TaskInfoFetcherError { #[error("Metadata not found for: {0}")] @@ -422,3 +704,356 @@ impl TaskInfoFetcherError { } pub type TaskInfoFetcherResult = Result; + +#[cfg(test)] +mod tests { + use std::{collections::VecDeque, sync::Arc, time::Duration}; + + use async_trait::async_trait; + use solana_account::Account; + use solana_pubkey::Pubkey; + + use super::*; + + #[tokio::test] + async fn cache_miss_then_hit() { + let pk = Pubkey::new_unique(); + let fetcher = FetcherBuilder::new(vec![10]).build(); + + let r1 = fetcher.fetch_next_commit_nonces(&[pk], 0).await.unwrap(); + assert_eq!(r1[&pk], 11); + + // Cache hit: no RPC (only 1 response queued), increments + let r2 = fetcher.fetch_next_commit_nonces(&[pk], 0).await.unwrap(); + assert_eq!(r2[&pk], 12); + } + + #[tokio::test] + async fn partial_cache_hit() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + // prime pk1 (nonce 5), then mixed call fetches only cold pk2 (nonce 20) + let fetcher = FetcherBuilder::new(vec![5, 20]).build(); + + fetcher.fetch_next_commit_nonces(&[pk1], 0).await.unwrap(); // pk1 = 6 + let r = fetcher + .fetch_next_commit_nonces(&[pk1, pk2], 0) + .await + .unwrap(); + assert_eq!(r[&pk1], 7); // cached, incremented + assert_eq!(r[&pk2], 21); // fetched from chain + } + + #[tokio::test] + async fn lru_eviction_forces_refetch() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + // pk1 initial, pk2 evicts pk1, pk1 re-fetch after eviction + let fetcher = FetcherBuilder::new(vec![1, 2, 10]).capacity(1).build(); + + fetcher.fetch_next_commit_nonces(&[pk1], 0).await.unwrap(); // pk1 cached = 2 + fetcher.fetch_next_commit_nonces(&[pk2], 0).await.unwrap(); // pk2 cached = 3, pk1 evicted + + assert!(fetcher.peek_commit_nonce(&pk1).await.is_none()); // evicted + + let r = fetcher.fetch_next_commit_nonces(&[pk1], 0).await.unwrap(); + assert_eq!(r[&pk1], 11); // re-fetched (10 + 1) + + // Sequential eviction: pk1's guard was dropped before pk2 evicted it, + // so Arc strong_count was 1 — never moved to retiring. + assert_eq!(fetcher.cache.lock().unwrap().retiring.len(), 0); + } + + // Phase 1: fetch phase1_keys one-by-one → barrier → outer verification → + // barrier. Phase 2: fetch phase2_keys one-by-one for `iters` passes, then + // fetch shared_b in chunks of 2. + async fn run_worker( + fetcher: Arc>, + barrier: Arc, + phase1_keys: Vec, + phase2_keys: Vec, + shared_b: Vec, + iters: usize, + ) { + for pk in &phase1_keys { + fetcher.fetch_next_commit_nonces(&[*pk], 0).await.unwrap(); + } + barrier.wait().await; // signal phase 1 done + barrier.wait().await; // wait for outer verification + for _ in 0..iters { + for pk in &phase2_keys { + fetcher.fetch_next_commit_nonces(&[*pk], 0).await.unwrap(); + } + } + for chunk in shared_b.chunks(2) { + fetcher.fetch_next_commit_nonces(chunk, 0).await.unwrap(); + } + } + + // Three concurrent workers operating in two phases, separated by a barrier. + #[tokio::test(flavor = "multi_thread")] + async fn three_concurrent_workers_two_phase() { + const ITERS: usize = 50; + const NUM_WORKERS: usize = 3; + const SHARED_A: usize = 10; + const SHARED_B: usize = 40; + const EXCLUSIVE: usize = 50; + const CAPACITY: usize = 30; + const PHASE2_KEYS: usize = EXCLUSIVE + SHARED_A; // 60 + + let shared_a: Vec = + (0..SHARED_A).map(|_| Pubkey::new_unique()).collect(); + let shared_b: Vec = + (0..SHARED_B).map(|_| Pubkey::new_unique()).collect(); + let excl: [Vec; NUM_WORKERS] = std::array::from_fn(|_| { + (0..EXCLUSIVE).map(|_| Pubkey::new_unique()).collect() + }); + let phase2_keys: [Vec; NUM_WORKERS] = + std::array::from_fn(|i| { + excl[i].iter().chain(shared_a.iter()).cloned().collect() + }); + + // Flat queue: each mock call pops exactly N entries (N cold keys). + // Upper bounds: + // phase 1 loop: SHARED_A × 1 + // phase 2 excl+sa: ITERS × PHASE2_KEYS × NUM_WORKERS × 1 + // phase 2 shared_b: (SHARED_B / chunk_size) × chunk_size × NUM_WORKERS + // = SHARED_B × NUM_WORKERS + let total = SHARED_A + + ITERS * PHASE2_KEYS * NUM_WORKERS + + SHARED_B * NUM_WORKERS; + + let fetcher = Arc::new( + FetcherBuilder::new(vec![0; total]) + .capacity(CAPACITY) + .rpc_delay(Duration::from_millis(2)) + .build(), + ); + + // Barrier resets automatically: round 1 syncs after phase 1, round 2 + // releases workers into phase 2 after outer verification. + let barrier = Arc::new(tokio::sync::Barrier::new(NUM_WORKERS + 1)); + + let handles: Vec<_> = (0..NUM_WORKERS) + .map(|i| { + tokio::spawn(run_worker( + fetcher.clone(), + barrier.clone(), + shared_a.clone(), + phase2_keys[i].clone(), + shared_b.clone(), + ITERS, + )) + }) + .collect(); + + barrier.wait().await; // all workers done with phase 1 + + // No eviction during phase 1 (10 keys < capacity 30). + // Per-key lock serialises 3 workers → exactly NUM_WORKERS increments each. + for pk in &shared_a { + assert_eq!( + fetcher.peek_commit_nonce(pk).await, + Some(NUM_WORKERS as u64) + ); + } + + barrier.wait().await; // release workers into phase 2 + + for h in handles { + h.await.unwrap(); + } + + // Workers fetched shared_b in chunks of 2 (40 / 2 = 20 calls each). + // Due to eviction (40 keys > capacity 30) not all may remain in cache. + // Any key still in cache must have nonce >= 1. + let mut found = 0usize; + for pk in &shared_b { + if let Some(n) = fetcher.peek_commit_nonce(pk).await { + assert!(n >= 1); + found += 1; + } + } + assert!( + found > 0, + "expected some shared_b keys in cache after workers" + ); + } + + #[tokio::test] + async fn fetch_current_no_increment() { + let pk = Pubkey::new_unique(); + let fetcher = FetcherBuilder::new(vec![10]).build(); + + let r1 = fetcher.fetch_current_commit_nonces(&[pk], 0).await.unwrap(); + assert_eq!(r1[&pk], 10); // stored as-is + + // Cache hit: still 10, fetch_current never increments + let r2 = fetcher.fetch_current_commit_nonces(&[pk], 0).await.unwrap(); + assert_eq!(r2[&pk], 10); + } + + #[tokio::test] + async fn reset_specific_only_clears_that_key() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + // pk1 initial, pk2 initial, pk1 after reset + let fetcher = FetcherBuilder::new(vec![1, 2, 50]).build(); + + fetcher.fetch_next_commit_nonces(&[pk1], 0).await.unwrap(); // pk1 cached = 2 + fetcher.fetch_next_commit_nonces(&[pk2], 0).await.unwrap(); // pk2 cached = 3 + fetcher.reset(ResetType::Specific(&[pk1])); + + assert!(fetcher.peek_commit_nonce(&pk1).await.is_none()); // cleared + assert_eq!(fetcher.peek_commit_nonce(&pk2).await, Some(3)); // still cached + + let r1 = fetcher.fetch_next_commit_nonces(&[pk1], 0).await.unwrap(); + assert_eq!(r1[&pk1], 51); // re-fetched (50 + 1) + } + + #[tokio::test] + async fn peek_awaits_inflight_fetch() { + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + // capacity=1: pk2 fetch evicts pk1 into retiring while Task A is still in-flight. + // pk1 nonce=7 (stored as 8), pk2 nonce=9 (stored as 10). + let fetcher = Arc::new( + FetcherBuilder::new(vec![7, 9]) + .rpc_delay(Duration::from_millis(50)) + .capacity(1) + .build(), + ); + + // Spawn Task A: slow fetch for pk1 acquires its nonce lock and sleeps. + let fetcher2 = fetcher.clone(); + let task_a = tokio::spawn(async move { + fetcher2.fetch_next_commit_nonces(&[pk1], 0).await.unwrap(); + }); + + // Let Task A acquire pk1's nonce lock and start the slow fetch. + tokio::task::yield_now().await; + + // Spawn Task B: inserts pk2 (capacity=1), evicting pk1 into retiring + // because Task A's guard still holds a clone of pk1's Arc. + let fetcher3 = fetcher.clone(); + let task_b = tokio::spawn(async move { + fetcher3.fetch_next_commit_nonces(&[pk2], 0).await.unwrap(); + }); + + // Let Task B run through acquire_nonce_locks (eviction happens here). + tokio::task::yield_now().await; + + // pk1 is in retiring: Task A is in-flight and holds its Arc clone. + assert_eq!(fetcher.cache.lock().unwrap().retiring.len(), 1); + + // peek finds pk1 in retiring, blocks on its in-flight lock, returns the value. + let peeked = fetcher.peek_commit_nonce(&pk1).await; + assert_eq!(peeked, Some(8)); // 7 + 1 + + task_a.await.unwrap(); + task_b.await.unwrap(); + + // All CacheInnerGuards dropped: retiring fully cleaned up. + assert_eq!(fetcher.cache.lock().unwrap().retiring.len(), 0); + } + + /// Fetcher mock + struct MockInfoFetcher { + nonces: Mutex>, + delay: Option, + } + + impl MockInfoFetcher { + fn new(nonces: Vec) -> Self { + Self { + nonces: Mutex::new(nonces.into()), + delay: None, + } + } + + fn with_delay(mut self, delay: Duration) -> Self { + self.delay = Some(delay); + self + } + } + + #[async_trait] + impl TaskInfoFetcher for MockInfoFetcher { + async fn fetch_next_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> TaskInfoFetcherResult> { + self.fetch_current_commit_nonces(pubkeys, min_context_slot) + .await + } + + async fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + _: u64, + ) -> TaskInfoFetcherResult> { + if let Some(delay) = self.delay { + tokio::time::sleep(delay).await; + } + let mut q = self.nonces.lock().unwrap(); + Ok(pubkeys + .iter() + .map(|pk| { + let nonce = + q.pop_front().expect("mock nonce queue exhausted"); + (*pk, nonce) + }) + .collect()) + } + + async fn fetch_rent_reimbursements( + &self, + _: &[Pubkey], + _: u64, + ) -> TaskInfoFetcherResult> { + unimplemented!() + } + + async fn get_base_accounts( + &self, + _: &[Pubkey], + _: u64, + ) -> TaskInfoFetcherResult> { + unimplemented!() + } + } + + struct FetcherBuilder { + inner: MockInfoFetcher, + capacity: Option, + } + + impl FetcherBuilder { + fn new(nonces: Vec) -> Self { + Self { + inner: MockInfoFetcher::new(nonces), + capacity: None, + } + } + + fn rpc_delay(mut self, d: Duration) -> Self { + self.inner = self.inner.with_delay(d); + self + } + + fn capacity(mut self, n: usize) -> Self { + self.capacity = Some(n.try_into().unwrap()); + self + } + + fn build(self) -> CacheTaskInfoFetcher { + match self.capacity { + Some(cap) => { + CacheTaskInfoFetcher::with_capacity(cap, self.inner) + } + None => CacheTaskInfoFetcher::new(self.inner), + } + } + } +} diff --git a/magicblock-committor-service/src/persist/commit_persister.rs b/magicblock-committor-service/src/persist/commit_persister.rs index c9399bff9..d16a621ae 100644 --- a/magicblock-committor-service/src/persist/commit_persister.rs +++ b/magicblock-committor-service/src/persist/commit_persister.rs @@ -3,9 +3,8 @@ use std::{ sync::{Arc, Mutex}, }; -use magicblock_program::magic_scheduled_base_intent::{ - CommittedAccount, ScheduledIntentBundle, -}; +use magicblock_core::intent::CommittedAccount; +use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; use solana_pubkey::Pubkey; use super::{ @@ -436,9 +435,9 @@ impl IntentPersister for Option { #[cfg(test)] mod tests { + use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ - CommitAndUndelegate, CommitType, CommittedAccount, MagicIntentBundle, - UndelegateType, + CommitAndUndelegate, CommitType, MagicIntentBundle, UndelegateType, }; use solana_account::Account; use solana_hash::Hash; diff --git a/magicblock-committor-service/src/service.rs b/magicblock-committor-service/src/service.rs index 6101e41a0..62cb7de2d 100644 --- a/magicblock-committor-service/src/service.rs +++ b/magicblock-committor-service/src/service.rs @@ -1,4 +1,4 @@ -use std::{path::Path, sync::Arc, time::Instant}; +use std::{collections::HashMap, path::Path, sync::Arc, time::Instant}; use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; use solana_keypair::Keypair; @@ -19,7 +19,7 @@ use tracing::*; use crate::{ committor_processor::CommittorProcessor, config::ChainConfig, - error::CommittorServiceResult, + error::{CommittorServiceError, CommittorServiceResult}, intent_execution_manager::BroadcastedIntentExecutionResult, persist::{CommitStatusRow, MessageSignatures}, pubkeys_provider::{provide_committee_pubkeys, provide_common_pubkeys}, @@ -83,6 +83,19 @@ pub enum CommittorMessage { broadcast::Receiver, >, }, + FetchCurrentCommitNonces { + respond_to: + oneshot::Sender>>, + pubkeys: Vec, + min_context_slot: u64, + }, + FetchCurrentCommitNoncesSync { + respond_to: std::sync::mpsc::Sender< + CommittorServiceResult>, + >, + pubkeys: Vec, + min_context_slot: u64, + }, } // ----------------- @@ -228,6 +241,40 @@ impl CommittorActor { error!(message_type = "SubscribeForResults", error = ?err, "Failed to send response"); } } + FetchCurrentCommitNonces { + respond_to, + pubkeys, + min_context_slot, + } => { + let processor = self.processor.clone(); + tokio::spawn(async move { + let result = processor + .fetch_current_commit_nonces(&pubkeys, min_context_slot) + .await; + if let Err(err) = respond_to + .send(result.map_err(CommittorServiceError::from)) + { + error!(message_type = "FetchCurrentCommitNonces", error = ?err, "Failed to send response"); + } + }); + } + FetchCurrentCommitNoncesSync { + respond_to, + pubkeys, + min_context_slot, + } => { + let processor = self.processor.clone(); + tokio::spawn(async move { + let result = processor + .fetch_current_commit_nonces(&pubkeys, min_context_slot) + .await; + if let Err(err) = respond_to + .send(result.map_err(CommittorServiceError::from)) + { + error!(message_type = "FetchCurrentCommitNoncesSync", error = ?err, "Failed to send response"); + } + }); + } } } @@ -329,6 +376,21 @@ impl CommittorService { rx } + pub fn fetch_current_commit_nonces_sync( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> std::sync::mpsc::Receiver>> + { + let (tx, rx) = std::sync::mpsc::channel(); + self.try_send(CommittorMessage::FetchCurrentCommitNoncesSync { + respond_to: tx, + pubkeys: pubkeys.to_vec(), + min_context_slot, + }); + rx + } + fn try_send(&self, msg: CommittorMessage) { if let Err(e) = self.sender.try_send(msg) { match e { @@ -424,6 +486,21 @@ impl BaseIntentCommittor for CommittorService { rx } + fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> oneshot::Receiver>> { + let (tx, rx) = oneshot::channel(); + self.try_send(CommittorMessage::FetchCurrentCommitNonces { + respond_to: tx, + pubkeys: pubkeys.to_vec(), + min_context_slot, + }); + + rx + } + fn stop(&self) { self.cancel_token.cancel(); } @@ -472,6 +549,12 @@ pub trait BaseIntentCommittor: Send + Sync + 'static { CommittorServiceResult, >; + fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> oneshot::Receiver>>; + /// Stops Committor service fn stop(&self); diff --git a/magicblock-committor-service/src/service_ext.rs b/magicblock-committor-service/src/service_ext.rs index 5a77d9880..024d0423e 100644 --- a/magicblock-committor-service/src/service_ext.rs +++ b/magicblock-committor-service/src/service_ext.rs @@ -207,6 +207,15 @@ impl BaseIntentCommittor for CommittorServiceExt { self.inner.get_transaction(signature) } + fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> oneshot::Receiver>> { + self.inner + .fetch_current_commit_nonces(pubkeys, min_context_slot) + } + fn stop(&self) { self.inner.stop(); } diff --git a/magicblock-committor-service/src/stubs/changeset_committor_stub.rs b/magicblock-committor-service/src/stubs/changeset_committor_stub.rs index f12cb7806..1e1136ef8 100644 --- a/magicblock-committor-service/src/stubs/changeset_committor_stub.rs +++ b/magicblock-committor-service/src/stubs/changeset_committor_stub.rs @@ -190,6 +190,22 @@ impl BaseIntentCommittor for ChangesetCommittorStub { rx } + fn fetch_current_commit_nonces( + &self, + pubkeys: &[Pubkey], + _min_context_slot: u64, + ) -> oneshot::Receiver>> { + let (tx, rx) = oneshot::channel(); + let nonces = pubkeys.iter().map(|p| (*p, 0u64)).collect(); + tx.send(Ok(nonces)).unwrap_or_else(|_| { + tracing::error!( + message_type = "FetchCurrentCommitNonces", + "Failed to send response" + ); + }); + rx + } + fn stop(&self) { self.cancellation_token.cancel(); } diff --git a/magicblock-committor-service/src/tasks/commit_task.rs b/magicblock-committor-service/src/tasks/commit_task.rs index 70d047537..d31c00f34 100644 --- a/magicblock-committor-service/src/tasks/commit_task.rs +++ b/magicblock-committor-service/src/tasks/commit_task.rs @@ -5,7 +5,7 @@ use dlp::{ AccountSizeClass, }; use magicblock_committor_program::Chunks; -use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use magicblock_core::intent::CommittedAccount; use solana_account::{Account, ReadableAccount}; use solana_instruction::Instruction; use solana_pubkey::Pubkey; diff --git a/magicblock-committor-service/src/tasks/mod.rs b/magicblock-committor-service/src/tasks/mod.rs index 2e763c604..50946641d 100644 --- a/magicblock-committor-service/src/tasks/mod.rs +++ b/magicblock-committor-service/src/tasks/mod.rs @@ -17,10 +17,9 @@ use magicblock_committor_program::{ }, pdas, ChangesetChunks, Chunks, }; +use magicblock_core::intent::CommittedAccount; use magicblock_metrics::metrics::LabelValue; -use magicblock_program::magic_scheduled_base_intent::{ - BaseAction, CommittedAccount, -}; +use magicblock_program::magic_scheduled_base_intent::BaseAction; use solana_account::Account; use solana_instruction::{AccountMeta, Instruction}; use solana_pubkey::Pubkey; @@ -517,6 +516,7 @@ impl CleanupTask { #[cfg(test)] mod serialization_safety_test { + use magicblock_core::intent::CommittedAccount; use magicblock_program::{ args::ShortAccountMeta, magic_scheduled_base_intent::ProgramArgs, }; diff --git a/magicblock-committor-service/src/tasks/task_builder.rs b/magicblock-committor-service/src/tasks/task_builder.rs index 2fb0124dc..749612d88 100644 --- a/magicblock-committor-service/src/tasks/task_builder.rs +++ b/magicblock-committor-service/src/tasks/task_builder.rs @@ -1,9 +1,9 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; +use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ - BaseAction, CommitType, CommittedAccount, ScheduledIntentBundle, - UndelegateType, + BaseAction, CommitType, ScheduledIntentBundle, UndelegateType, }; use solana_account::Account; use solana_pubkey::Pubkey; @@ -106,7 +106,7 @@ impl TaskBuilderImpl { .collect::>(); task_info_fetcher - .fetch_next_commit_ids(&committed_pubkeys, min_context_slot) + .fetch_next_commit_nonces(&committed_pubkeys, min_context_slot) .await } diff --git a/magicblock-committor-service/src/tasks/task_strategist.rs b/magicblock-committor-service/src/tasks/task_strategist.rs index d7df5a14e..d625c106c 100644 --- a/magicblock-committor-service/src/tasks/task_strategist.rs +++ b/magicblock-committor-service/src/tasks/task_strategist.rs @@ -400,8 +400,9 @@ pub type TaskStrategistResult = Result; mod tests { use std::{collections::HashMap, sync::Arc}; + use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ - BaseAction, CommittedAccount, ProgramArgs, + BaseAction, ProgramArgs, }; use solana_account::Account; use solana_program::system_program; @@ -412,7 +413,7 @@ mod tests { use crate::{ intent_execution_manager::intent_scheduler::create_test_intent, intent_executor::task_info_fetcher::{ - ResetType, TaskInfoFetcher, TaskInfoFetcherResult, + TaskInfoFetcher, TaskInfoFetcherResult, }, persist::IntentPersisterImpl, tasks::{ @@ -430,7 +431,15 @@ mod tests { #[async_trait::async_trait] impl TaskInfoFetcher for MockInfoFetcher { - async fn fetch_next_commit_ids( + async fn fetch_next_commit_nonces( + &self, + pubkeys: &[Pubkey], + _: u64, + ) -> TaskInfoFetcherResult> { + Ok(pubkeys.iter().map(|pubkey| (*pubkey, 0)).collect()) + } + + async fn fetch_current_commit_nonces( &self, pubkeys: &[Pubkey], _: u64, @@ -446,12 +455,6 @@ mod tests { Ok(pubkeys.iter().map(|_| Pubkey::new_unique()).collect()) } - fn peek_commit_id(&self, _pubkey: &Pubkey) -> Option { - Some(0) - } - - fn reset(&self, _: ResetType) {} - async fn get_base_accounts( &self, _pubkeys: &[Pubkey], diff --git a/magicblock-core/Cargo.toml b/magicblock-core/Cargo.toml index 308b441a1..ce7a883ac 100644 --- a/magicblock-core/Cargo.toml +++ b/magicblock-core/Cargo.toml @@ -12,6 +12,7 @@ edition.workspace = true tokio = { workspace = true, features = ["sync"] } flume = { workspace = true } +serde = { workspace = true, features = ["derive"] } solana-account = { workspace = true } solana-account-decoder = { workspace = true } solana-hash = { workspace = true } diff --git a/magicblock-core/src/intent.rs b/magicblock-core/src/intent.rs new file mode 100644 index 000000000..69a23a85e --- /dev/null +++ b/magicblock-core/src/intent.rs @@ -0,0 +1,59 @@ +use std::cell::RefCell; + +use serde::{Deserialize, Serialize}; +use solana_account::{Account, AccountSharedData}; +use solana_pubkey::Pubkey; + +use crate::token_programs::try_remap_ata_to_eata; + +pub type CommittedAccountRef<'a> = (Pubkey, &'a RefCell); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CommittedAccount { + pub pubkey: Pubkey, + pub account: Account, + pub remote_slot: u64, +} + +impl<'a> From> for CommittedAccount { + fn from(value: CommittedAccountRef<'a>) -> Self { + let account = value.1.borrow(); + let remote_slot = account.remote_slot(); + Self { + pubkey: value.0, + account: account.to_owned().into(), + remote_slot, + } + } +} + +impl CommittedAccount { + /// Build a CommittedAccount from an AccountSharedData reference, optionally + /// overriding the owner with `parent_program_id` and remapping ATA -> eATA + /// if applicable. + pub fn from_account_shared( + pubkey: Pubkey, + account_shared: &AccountSharedData, + parent_program_id: Option, + ) -> Self { + let remote_slot = account_shared.remote_slot(); + if let Some((eata_pubkey, eata)) = + try_remap_ata_to_eata(&pubkey, account_shared) + { + return CommittedAccount { + pubkey: eata_pubkey, + account: eata.into(), + remote_slot, + }; + } + + let mut account: Account = account_shared.to_owned().into(); + account.owner = parent_program_id.unwrap_or(account.owner); + + CommittedAccount { + pubkey, + account, + remote_slot, + } + } +} diff --git a/magicblock-core/src/lib.rs b/magicblock-core/src/lib.rs index f18f8a6f1..fcb7126f6 100644 --- a/magicblock-core/src/lib.rs +++ b/magicblock-core/src/lib.rs @@ -18,4 +18,5 @@ pub mod tls; pub mod token_programs; pub mod traits; +pub mod intent; pub mod logger; diff --git a/magicblock-core/src/traits.rs b/magicblock-core/src/traits.rs index bbd84d9d7..38f71f0c7 100644 --- a/magicblock-core/src/traits.rs +++ b/magicblock-core/src/traits.rs @@ -1,6 +1,18 @@ -use std::{error::Error, fmt}; +use std::{collections::HashMap, error::Error}; -pub trait PersistsAccountModData: Sync + Send + fmt::Display + 'static { +use solana_program::instruction::InstructionError; +use solana_pubkey::Pubkey; + +use crate::intent::CommittedAccount; + +/// Trait that provides access to system calls implemented outside of SVM, +/// accessible in magic-program. +pub trait MagicSys: Sync + Send + 'static { fn persist(&self, id: u64, data: Vec) -> Result<(), Box>; fn load(&self, id: u64) -> Result>, Box>; + + fn fetch_current_commit_nonces( + &self, + commits: &[CommittedAccount], + ) -> Result, InstructionError>; } diff --git a/magicblock-ledger/src/store/data_mod_persister.rs b/magicblock-ledger/src/store/data_mod_persister.rs deleted file mode 100644 index 53fc5d864..000000000 --- a/magicblock-ledger/src/store/data_mod_persister.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::error::Error; - -use magicblock_core::traits::PersistsAccountModData; -use tracing::*; - -use crate::Ledger; - -impl PersistsAccountModData for Ledger { - fn persist(&self, id: u64, data: Vec) -> Result<(), Box> { - trace!(id, data_len = data.len(), "Persisting data"); - self.write_account_mod_data(id, &data.into())?; - Ok(()) - } - - fn load(&self, id: u64) -> Result>, Box> { - let data = self.read_account_mod_data(id)?.map(|x| x.data); - if enabled!(Level::TRACE) { - if let Some(data) = &data { - trace!(id, data_len = data.len(), "Loading data"); - } else { - trace!(id, found = false, "Loading data"); - } - } - Ok(data) - } -} diff --git a/magicblock-ledger/src/store/mod.rs b/magicblock-ledger/src/store/mod.rs index 1280c2685..dcc5f0cff 100644 --- a/magicblock-ledger/src/store/mod.rs +++ b/magicblock-ledger/src/store/mod.rs @@ -1,3 +1,2 @@ pub mod api; -pub mod data_mod_persister; mod utils; diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index d340c28f9..f93c8d679 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -393,6 +393,11 @@ lazy_static::lazy_static! { "task_info_fetcher_a_count", "Get mupltiple account count" ).unwrap(); + static ref TASK_INFO_FETCHER_RETIRING_GAUGE: IntGauge = IntGauge::new( + "task_info_fetcher_retiring_gauge", + "Number of pubkeys currently in the retiring map of CacheTaskInfoFetcher" + ).unwrap(); + static ref TABLE_MANIA_A_COUNT: IntCounter = IntCounter::new( "table_mania_a_count", "Get mupltiple account count" ).unwrap(); @@ -423,6 +428,14 @@ lazy_static::lazy_static! { ), ).unwrap(); + static ref COMMITTOR_FETCH_COMMIT_NONCES_WAIT_TIME: Histogram = Histogram::with_opts( + HistogramOpts::new( + "committor_fetch_commit_nonces_wait_time_second", + "Time in seconds spent waiting for fetch_current_commit_nonces response" + ) + .buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 30.0]), + ).unwrap(); + // ----------------- // Pubsub Clients // ----------------- @@ -557,6 +570,7 @@ pub(crate) fn register() { register!(COMMITTOR_INTENT_CU_USAGE); register!(COMMITTOR_INTENT_TASK_PREPARATION_TIME); register!(COMMITTOR_INTENT_ALT_PREPARATION_TIME); + register!(COMMITTOR_FETCH_COMMIT_NONCES_WAIT_TIME); register!(ENSURE_ACCOUNTS_TIME); register!(RPC_REQUEST_HANDLING_TIME); register!(TRANSACTION_PROCESSING_TIME); @@ -577,6 +591,7 @@ pub(crate) fn register() { register!(MAX_LOCK_CONTENTION_QUEUE_SIZE); register!(REMOTE_ACCOUNT_PROVIDER_A_COUNT); register!(TASK_INFO_FETCHER_A_COUNT); + register!(TASK_INFO_FETCHER_RETIRING_GAUGE); register!(TABLE_MANIA_A_COUNT); register!(TABLE_MANIA_CLOSED_A_COUNT); register!(CONNECTED_PUBSUB_CLIENTS_GAUGE); @@ -759,6 +774,10 @@ pub fn observe_committor_intent_alt_preparation_time() -> HistogramTimer { COMMITTOR_INTENT_ALT_PREPARATION_TIME.start_timer() } +pub fn start_fetch_commit_nonces_wait_timer() -> HistogramTimer { + COMMITTOR_FETCH_COMMIT_NONCES_WAIT_TIME.start_timer() +} + pub fn inc_account_fetches_success(count: u64) { ACCOUNT_FETCHES_SUCCESS_COUNT.inc_by(count); } @@ -843,6 +862,10 @@ pub fn inc_task_info_fetcher_a_count() { TASK_INFO_FETCHER_A_COUNT.inc() } +pub fn set_task_info_fetcher_retiring_count(count: i64) { + TASK_INFO_FETCHER_RETIRING_GAUGE.set(count); +} + pub fn inc_table_mania_a_count() { TABLE_MANIA_A_COUNT.inc() } diff --git a/programs/magicblock/Cargo.toml b/programs/magicblock/Cargo.toml index 0ad4396b9..a24516966 100644 --- a/programs/magicblock/Cargo.toml +++ b/programs/magicblock/Cargo.toml @@ -39,6 +39,7 @@ thiserror = { workspace = true } test-kit = { workspace = true } assert_matches = { workspace = true } rand = { workspace = true } +serial_test = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } magicblock-chainlink = { workspace = true, features = ["dev-context"] } diff --git a/programs/magicblock/src/lib.rs b/programs/magicblock/src/lib.rs index 33888f27a..725ec69fd 100644 --- a/programs/magicblock/src/lib.rs +++ b/programs/magicblock/src/lib.rs @@ -1,6 +1,7 @@ mod ephemeral_accounts; pub mod errors; mod magic_context; +pub mod magic_sys; mod mutate_accounts; mod schedule_task; mod schedule_transactions; @@ -12,8 +13,8 @@ pub mod test_utils; mod utils; pub mod validator; +pub use magic_sys::init_magic_sys; pub use magicblock_magic_program_api::*; -pub use mutate_accounts::*; pub use schedule_transactions::{ process_scheduled_commit_sent, register_scheduled_commit_sent, transaction_scheduler::TransactionScheduler, SentCommit, diff --git a/programs/magicblock/src/magic_scheduled_base_intent.rs b/programs/magicblock/src/magic_scheduled_base_intent.rs index 08c63153e..f2f234b03 100644 --- a/programs/magicblock/src/magic_scheduled_base_intent.rs +++ b/programs/magicblock/src/magic_scheduled_base_intent.rs @@ -1,9 +1,8 @@ -use std::{cell::RefCell, collections::HashSet}; +use std::collections::HashSet; use magicblock_core::{ - token_programs::{ - try_remap_ata_to_eata, EATA_PROGRAM_ID, TOKEN_PROGRAM_ID, - }, + intent::{CommittedAccount, CommittedAccountRef}, + token_programs::{EATA_PROGRAM_ID, TOKEN_PROGRAM_ID}, Slot, }; use magicblock_magic_program_api::args::{ @@ -12,7 +11,7 @@ use magicblock_magic_program_api::args::{ UndelegateTypeArgs, }; use serde::{Deserialize, Serialize}; -use solana_account::{Account, AccountSharedData, ReadableAccount}; +use solana_account::ReadableAccount; use solana_hash::Hash; use solana_log_collector::ic_msg; use solana_program_runtime::{ @@ -605,57 +604,6 @@ impl BaseAction { } } -type CommittedAccountRef<'a> = (Pubkey, &'a RefCell); -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct CommittedAccount { - pub pubkey: Pubkey, - pub account: Account, - pub remote_slot: u64, -} - -impl<'a> From> for CommittedAccount { - fn from(value: CommittedAccountRef<'a>) -> Self { - let account = value.1.borrow(); - let remote_slot = account.remote_slot(); - Self { - pubkey: value.0, - account: account.to_owned().into(), - remote_slot, - } - } -} - -impl CommittedAccount { - /// Build a CommittedAccount from an AccountSharedData reference, optionally - /// overriding the owner with `parent_program_id` and remapping ATA -> eATA - /// if applicable. - pub fn from_account_shared( - pubkey: Pubkey, - account_shared: &AccountSharedData, - parent_program_id: Option, - ) -> Self { - let remote_slot = account_shared.remote_slot(); - if let Some((eata_pubkey, eata)) = - try_remap_ata_to_eata(&pubkey, account_shared) - { - return CommittedAccount { - pubkey: eata_pubkey, - account: eata.into(), - remote_slot, - }; - } - - let mut account: Account = account_shared.to_owned().into(); - account.owner = parent_program_id.unwrap_or(account.owner); - - CommittedAccount { - pubkey, - account, - remote_slot, - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum CommitType { /// Regular commit without actions diff --git a/programs/magicblock/src/magic_sys.rs b/programs/magicblock/src/magic_sys.rs new file mode 100644 index 000000000..0fed11e79 --- /dev/null +++ b/programs/magicblock/src/magic_sys.rs @@ -0,0 +1,64 @@ +use std::{ + collections::HashMap, + error::Error, + sync::{Arc, RwLock}, +}; + +use lazy_static::lazy_static; +use magicblock_core::{intent::CommittedAccount, traits::MagicSys}; +use solana_instruction::error::InstructionError; +use solana_pubkey::Pubkey; + +/// Maximum number of times an account may be committed before it must be +/// undelegated. A plain commit at or beyond this limit fails with [`COMMIT_LIMIT_ERR`]. +pub const COMMIT_LIMIT: u64 = 10; +/// [`InstructionError::Custom`] code returned when a commit is attempted on an +/// account that has reached [`COMMIT_LIMIT`]. +pub const COMMIT_LIMIT_ERR: u32 = 0xA000_0000; +pub(crate) const MISSING_COMMIT_NONCE_ERR: u32 = 0xA000_0001; + +lazy_static! { + static ref MAGIC_SYS: RwLock>> = RwLock::new(None); +} + +const MAGIC_SYS_POISONED_MSG: &str = "MAGIC_SYS poisoned"; +const MAGIC_SYS_UNSET_MSG: &str = "MagicSys needs to be set on startup"; + +pub fn init_magic_sys(magic_sys: Arc) { + MAGIC_SYS + .write() + .expect(MAGIC_SYS_POISONED_MSG) + .replace(magic_sys); +} + +pub(crate) fn load_data(id: u64) -> Result>, Box> { + MAGIC_SYS + .read() + .expect(MAGIC_SYS_POISONED_MSG) + .as_ref() + .ok_or(MAGIC_SYS_UNSET_MSG)? + .load(id) +} + +pub(crate) fn persist_data( + id: u64, + data: Vec, +) -> Result<(), Box> { + MAGIC_SYS + .read() + .expect(MAGIC_SYS_POISONED_MSG) + .as_ref() + .ok_or(MAGIC_SYS_UNSET_MSG)? + .persist(id, data) +} + +pub(crate) fn fetch_current_commit_nonces( + commits: &[CommittedAccount], +) -> Result, InstructionError> { + MAGIC_SYS + .read() + .expect(MAGIC_SYS_POISONED_MSG) + .as_ref() + .ok_or(InstructionError::UninitializedAccount)? + .fetch_current_commit_nonces(commits) +} diff --git a/programs/magicblock/src/mutate_accounts/account_mod_data.rs b/programs/magicblock/src/mutate_accounts/account_mod_data.rs index d313b0202..2df4cf6ae 100644 --- a/programs/magicblock/src/mutate_accounts/account_mod_data.rs +++ b/programs/magicblock/src/mutate_accounts/account_mod_data.rs @@ -2,16 +2,19 @@ use std::{ collections::HashMap, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Mutex, }, }; use lazy_static::lazy_static; -use magicblock_core::traits::PersistsAccountModData; use solana_log_collector::ic_msg; use solana_program_runtime::invoke_context::InvokeContext; -use crate::{errors::MagicBlockProgramError, validator}; +use crate::{ + errors::MagicBlockProgramError, + magic_sys::{load_data, persist_data}, + validator, +}; lazy_static! { /// In order to modify large data chunks we cannot include all the data as part of the @@ -20,12 +23,6 @@ lazy_static! { /// processed it resolved that data from the key that we provide in its place. static ref DATA_MODS: Mutex>> = Mutex::default(); - /// In order to support replaying transactions we need to persist the data that is - /// loaded from the [DATA_MODS] - /// During replay the [DATA_MODS] won't have the data for the particular id in which - /// case it is loaded via the persister instead. - static ref PERSISTER: RwLock>> = RwLock::new(None); - static ref DATA_MOD_ID: AtomicU64 = AtomicU64::new(0); static ref MAX_REPLAY_DATA_MOD_ID: Mutex> = Mutex::default(); @@ -75,43 +72,6 @@ pub(super) fn get_data(id: u64) -> Option> { DATA_MODS.lock().expect("DATA_MODS poisoned").remove(&id) } -pub fn init_persister(persister: Arc) { - PERSISTER - .write() - .expect("PERSISTER poisoned") - .replace(persister); -} - -pub fn persister_info() -> String { - PERSISTER - .read() - .expect("PERSISTER poisoned") - .as_ref() - .map(|p| p.to_string()) - .unwrap_or_else(|| "None".to_string()) -} - -fn load_data(id: u64) -> Result>, Box> { - PERSISTER - .read() - .expect("PERSISTER poisoned") - .as_ref() - .ok_or("AccountModPersister needs to be set on startup")? - .load(id) -} - -fn persist_data( - id: u64, - data: Vec, -) -> Result<(), Box> { - PERSISTER - .read() - .expect("PERSISTER poisoned") - .as_ref() - .ok_or("AccounModPersister needs to be set on startup")? - .persist(id, data) -} - /// The resolved data including an indication about how it was resolved. pub(super) enum ResolvedAccountModData { /// The data was resolved from memory while the validator was processing diff --git a/programs/magicblock/src/mutate_accounts/mod.rs b/programs/magicblock/src/mutate_accounts/mod.rs index 041dcc697..530cd1986 100644 --- a/programs/magicblock/src/mutate_accounts/mod.rs +++ b/programs/magicblock/src/mutate_accounts/mod.rs @@ -1,5 +1,4 @@ mod account_mod_data; mod process_mutate_accounts; pub(crate) use account_mod_data::*; -pub use account_mod_data::{init_persister, persister_info}; pub(crate) use process_mutate_accounts::process_mutate_accounts; diff --git a/programs/magicblock/src/mutate_accounts/process_mutate_accounts.rs b/programs/magicblock/src/mutate_accounts/process_mutate_accounts.rs index 17f29ccb7..3b93d7b2d 100644 --- a/programs/magicblock/src/mutate_accounts/process_mutate_accounts.rs +++ b/programs/magicblock/src/mutate_accounts/process_mutate_accounts.rs @@ -366,7 +366,7 @@ mod tests { map.insert(mod_key, AccountSharedData::new(100, 0, &mod_key)); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let modification = AccountModification { pubkey: mod_key, @@ -450,7 +450,7 @@ mod tests { map.insert(mod_key2, AccountSharedData::new(200, 0, &mod_key2)); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![ @@ -545,7 +545,7 @@ mod tests { map.insert(mod_key, delegated_account); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![AccountModification { @@ -588,7 +588,7 @@ mod tests { map.insert(mod_key, undelegating_account); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![AccountModification { @@ -656,7 +656,7 @@ mod tests { map.insert(mod_key4, AccountSharedData::new(400, 0, &mod_key4)); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![ @@ -796,7 +796,7 @@ mod tests { map.insert(mod_key, AccountSharedData::new(100, 0, &mod_key)); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![AccountModification { @@ -840,7 +840,7 @@ mod tests { map.insert(mod_key, account); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![AccountModification { @@ -881,7 +881,7 @@ mod tests { map.insert(mod_key, account); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::modify_accounts_instruction( vec![AccountModification { diff --git a/programs/magicblock/src/schedule_transactions/mod.rs b/programs/magicblock/src/schedule_transactions/mod.rs index 942a5991c..bfe18899f 100644 --- a/programs/magicblock/src/schedule_transactions/mod.rs +++ b/programs/magicblock/src/schedule_transactions/mod.rs @@ -6,6 +6,7 @@ mod process_schedule_intent_bundle; mod process_scheduled_commit_sent; pub(crate) mod transaction_scheduler; +use magicblock_core::intent::CommittedAccount; use magicblock_magic_program_api::MAGIC_CONTEXT_PUBKEY; pub(crate) use process_accept_scheduled_commits::*; pub(crate) use process_schedule_commit::*; @@ -17,7 +18,39 @@ use solana_instruction::error::InstructionError; use solana_log_collector::ic_msg; use solana_program_runtime::invoke_context::InvokeContext; -use crate::utils::accounts::get_instruction_pubkey_with_idx; +use crate::{ + magic_sys::{ + fetch_current_commit_nonces, COMMIT_LIMIT, COMMIT_LIMIT_ERR, + MISSING_COMMIT_NONCE_ERR, + }, + utils::accounts::get_instruction_pubkey_with_idx, +}; + +pub(crate) fn check_commit_limits( + commits: &[CommittedAccount], + invoke_context: &InvokeContext, +) -> Result<(), InstructionError> { + let mut nonces = fetch_current_commit_nonces(commits)?; + let mut limit_exceeded = false; + for account in commits { + let nonce = nonces + .remove(&account.pubkey) + .ok_or(InstructionError::Custom(MISSING_COMMIT_NONCE_ERR))?; + if nonce >= COMMIT_LIMIT { + ic_msg!( + invoke_context, + "ScheduleCommit ERR: commit limit exceeded for account {}, only undelegation is allowed", + account.pubkey + ); + limit_exceeded = true; + } + } + if limit_exceeded { + Err(InstructionError::Custom(COMMIT_LIMIT_ERR)) + } else { + Ok(()) + } +} pub fn check_magic_context_id( invoke_context: &InvokeContext, diff --git a/programs/magicblock/src/schedule_transactions/process_schedule_commit.rs b/programs/magicblock/src/schedule_transactions/process_schedule_commit.rs index 8817e5985..aaf26538e 100644 --- a/programs/magicblock/src/schedule_transactions/process_schedule_commit.rs +++ b/programs/magicblock/src/schedule_transactions/process_schedule_commit.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; +use magicblock_core::intent::CommittedAccount; // no direct token remap helpers needed here; handled in CommittedAccount builder use solana_account::{state_traits::StateMut, ReadableAccount}; use solana_instruction::error::InstructionError; @@ -10,10 +11,9 @@ use solana_pubkey::Pubkey; use crate::{ magic_scheduled_base_intent::{ validate_commit_schedule_permissions, CommitAndUndelegate, CommitType, - CommittedAccount, MagicBaseIntent, ScheduledIntentBundle, - UndelegateType, + MagicBaseIntent, ScheduledIntentBundle, UndelegateType, }, - schedule_transactions, + schedule_transactions::{self, check_commit_limits}, utils::{ account_actions::mark_account_as_undelegated, accounts::{ @@ -233,6 +233,13 @@ pub(crate) fn process_schedule_commit( } } + // NOTE: + // We validate commit nonces only for plain commits + // If accounts are undelegated we don't want to fail + if !opts.request_undelegation { + check_commit_limits(&committed_accounts, invoke_context)?; + } + // NOTE: this is only protected by all the above checks however if the // instruction fails for other reasons detected afterward then the commit // stays scheduled diff --git a/programs/magicblock/src/schedule_transactions/process_schedule_commit_tests.rs b/programs/magicblock/src/schedule_transactions/process_schedule_commit_tests.rs index 128e777b8..01f72650b 100644 --- a/programs/magicblock/src/schedule_transactions/process_schedule_commit_tests.rs +++ b/programs/magicblock/src/schedule_transactions/process_schedule_commit_tests.rs @@ -18,6 +18,7 @@ use solana_signer::Signer; use crate::{ magic_context::MagicContext, magic_scheduled_base_intent::ScheduledIntentBundle, + magic_sys::COMMIT_LIMIT, schedule_transactions::transaction_scheduler::TransactionScheduler, test_utils::{ensure_started_validator, process_instruction}, utils::DELEGATION_PROGRAM_ID, @@ -63,7 +64,7 @@ fn prepare_transaction_with_single_committee( map.insert(committee, committee_account); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let transaction_accounts: Vec<(Pubkey, AccountSharedData)> = vec![( clock::id(), @@ -122,7 +123,7 @@ fn prepare_transaction_with_three_committees( } map }; - ensure_started_validator(&mut accounts_data); + ensure_started_validator(&mut accounts_data, None); let transaction_accounts: Vec<(Pubkey, AccountSharedData)> = vec![( clock::id(), @@ -257,6 +258,7 @@ mod tests { // Reuse test helper to create proper SPL ATA account data use magicblock_chainlink::testing::eatas::create_ata_account; use magicblock_core::token_programs::{derive_ata, derive_eata}; + use serial_test::serial; use solana_seed_derivable::SeedDerivable; use test_kit::init_logger; @@ -274,6 +276,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_single_account_success() { init_logger!(); let payer = @@ -359,6 +362,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_single_account_and_request_undelegate_success() { init_logger!(); let payer = @@ -445,6 +449,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_remaps_delegated_ata_to_eata() { init_logger!(); @@ -526,6 +531,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_and_undelegate_remaps_delegated_ata_to_eata() { init_logger!(); @@ -610,6 +616,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_three_accounts_success() { init_logger!(); @@ -723,6 +730,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_three_accounts_and_request_undelegate_success() { let payer = Keypair::from_seed( b"three_accounts_and_request_undelegate_success", @@ -875,6 +883,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_no_pdas_provided_to_ix() { init_logger!(); @@ -910,6 +919,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_undelegate_with_readonly() { init_logger!(); @@ -953,6 +963,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_with_non_delegated_account() { init_logger!(); @@ -992,6 +1003,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_three_accounts_second_not_owned_by_program_and_not_signer( ) { init_logger!(); @@ -1040,6 +1052,7 @@ mod tests { } #[test] + #[serial] fn test_schedule_commit_with_confined_account() { init_logger!(); @@ -1083,6 +1096,80 @@ mod tests { } #[test] + #[serial] + fn test_schedule_commit_fails_when_commit_limit_exceeded() { + init_logger!(); + + let payer = + Keypair::from_seed(b"schedule_commit_limit_exceeded____").unwrap(); + let program = Pubkey::new_unique(); + let committee = Pubkey::new_unique(); + + let (mut account_data, mut transaction_accounts) = + prepare_transaction_with_single_committee( + &payer, program, committee, + ); + + // Override stub to return nonce at the commit limit + ensure_started_validator(&mut account_data, Some(COMMIT_LIMIT)); + + let ix = InstructionUtils::schedule_commit_instruction( + &payer.pubkey(), + vec![committee], + ); + extend_transaction_accounts_from_ix( + &ix, + &mut account_data, + &mut transaction_accounts, + ); + + process_instruction( + ix.data.as_slice(), + transaction_accounts, + ix.accounts, + Err(InstructionError::Custom(crate::magic_sys::COMMIT_LIMIT_ERR)), + ); + } + + #[test] + #[serial] + fn test_schedule_commit_and_undelegate_succeeds_when_commit_limit_exceeded() + { + init_logger!(); + + let payer = + Keypair::from_seed(b"undelegate_succeeds_limit_exceeded").unwrap(); + let program = Pubkey::new_unique(); + let committee = Pubkey::new_unique(); + + let (mut account_data, mut transaction_accounts) = + prepare_transaction_with_single_committee( + &payer, program, committee, + ); + + // Override stub to return nonce at the commit limit + ensure_started_validator(&mut account_data, Some(COMMIT_LIMIT)); + + let ix = InstructionUtils::schedule_commit_and_undelegate_instruction( + &payer.pubkey(), + vec![committee], + ); + extend_transaction_accounts_from_ix( + &ix, + &mut account_data, + &mut transaction_accounts, + ); + + process_instruction( + ix.data.as_slice(), + transaction_accounts, + ix.accounts, + Ok(()), + ); + } + + #[test] + #[serial] fn test_schedule_commit_three_accounts_one_confined() { init_logger!(); diff --git a/programs/magicblock/src/schedule_transactions/process_schedule_intent_bundle.rs b/programs/magicblock/src/schedule_transactions/process_schedule_intent_bundle.rs index 5b846c984..f2edd342c 100644 --- a/programs/magicblock/src/schedule_transactions/process_schedule_intent_bundle.rs +++ b/programs/magicblock/src/schedule_transactions/process_schedule_intent_bundle.rs @@ -12,7 +12,7 @@ use crate::{ magic_scheduled_base_intent::{ CommitType, ConstructionContext, ScheduledIntentBundle, }, - schedule_transactions::check_magic_context_id, + schedule_transactions::{check_commit_limits, check_magic_context_id}, utils::{ account_actions::mark_account_as_undelegated, accounts::{ @@ -157,6 +157,11 @@ pub(crate) fn process_schedule_intent_bundle( ); } + if let Some(commit_accounts) = scheduled_intent.get_commit_intent_accounts() + { + check_commit_limits(commit_accounts, invoke_context)?; + } + let action_sent_signature = scheduled_intent.sent_transaction.signatures[0]; context.add_scheduled_action(scheduled_intent); diff --git a/programs/magicblock/src/schedule_transactions/process_scheduled_commit_sent.rs b/programs/magicblock/src/schedule_transactions/process_scheduled_commit_sent.rs index 4b886930f..14053bf6b 100644 --- a/programs/magicblock/src/schedule_transactions/process_scheduled_commit_sent.rs +++ b/programs/magicblock/src/schedule_transactions/process_scheduled_commit_sent.rs @@ -305,7 +305,7 @@ mod tests { let mut account_data = HashMap::new(); - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let mut ix = InstructionUtils::scheduled_commit_sent_instruction( &crate::id(), @@ -342,7 +342,7 @@ mod tests { ); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::scheduled_commit_sent_instruction( &crate::id(), @@ -377,7 +377,7 @@ mod tests { ); map }; - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::scheduled_commit_sent_instruction( &fake_program.pubkey(), @@ -406,7 +406,7 @@ mod tests { let mut account_data = HashMap::new(); - ensure_started_validator(&mut account_data); + ensure_started_validator(&mut account_data, None); let ix = InstructionUtils::scheduled_commit_sent_instruction( &crate::id(), diff --git a/programs/magicblock/src/test_utils/mod.rs b/programs/magicblock/src/test_utils/mod.rs index 7929fc14a..0bf1b5a55 100644 --- a/programs/magicblock/src/test_utils/mod.rs +++ b/programs/magicblock/src/test_utils/mod.rs @@ -8,12 +8,13 @@ use std::{ }, }; -use magicblock_core::traits::PersistsAccountModData; +use magicblock_core::{intent::CommittedAccount, traits::MagicSys}; use magicblock_magic_program_api::{id, EPHEMERAL_VAULT_PUBKEY}; use solana_account::AccountSharedData; use solana_instruction::{error::InstructionError, AccountMeta}; use solana_log_collector::log::debug; use solana_program_runtime::invoke_context::mock_process_instruction; +use solana_pubkey::Pubkey; use solana_sdk_ids::system_program; use self::magicblock_processor::Entrypoint; @@ -24,7 +25,10 @@ pub const AUTHORITY_BALANCE: u64 = u64::MAX / 2; pub const COUNTER_PROGRAM_ID: Pubkey = Pubkey::from_str_const("2jQZbSfAfqT5nZHGrLpDG2vXuEGtTgZYnNy7AZEjMCYz"); -pub fn ensure_started_validator(map: &mut HashMap) { +pub fn ensure_started_validator( + map: &mut HashMap, + nonce: Option, +) { validator::generate_validator_authority_if_needed(); let validator_authority_id = validator::validator_authority_id(); map.entry(validator_authority_id).or_insert_with(|| { @@ -38,8 +42,8 @@ pub fn ensure_started_validator(map: &mut HashMap) { vault }); - let stub = Arc::new(PersisterStub::default()); - init_persister(stub); + let stub = Arc::new(MagicSysStub::with_nonce(nonce.unwrap_or(0))); + init_magic_sys(stub); validator::ensure_started_up(); } @@ -63,27 +67,38 @@ pub fn process_instruction( ) } -pub struct PersisterStub { +pub struct MagicSysStub { id: u64, + nonce: u64, } -impl Default for PersisterStub { +impl Default for MagicSysStub { fn default() -> Self { static ID: AtomicU64 = AtomicU64::new(0); Self { id: ID.fetch_add(1, Ordering::Relaxed), + nonce: 0, } } } -impl fmt::Display for PersisterStub { +impl MagicSysStub { + pub fn with_nonce(nonce: u64) -> Self { + Self { + nonce, + ..Self::default() + } + } +} + +impl fmt::Display for MagicSysStub { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "PersisterStub({})", self.id) + write!(f, "MagicSysStub({})", self.id) } } -impl PersistsAccountModData for PersisterStub { +impl MagicSys for MagicSysStub { fn persist(&self, id: u64, data: Vec) -> Result<(), Box> { debug!("Persisting data for id '{}' with len {}", id, data.len()); Ok(()) @@ -92,4 +107,11 @@ impl PersistsAccountModData for PersisterStub { fn load(&self, _id: u64) -> Result>, Box> { Err("Loading from ledger not supported in tests".into()) } + + fn fetch_current_commit_nonces( + &self, + commits: &[CommittedAccount], + ) -> Result, InstructionError> { + Ok(commits.iter().map(|c| (c.pubkey, self.nonce)).collect()) + } } diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 7516ad0e2..e793be94b 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -3313,6 +3313,7 @@ dependencies = [ "magicblock-chainlink", "magicblock-committor-service", "magicblock-core", + "magicblock-metrics", "magicblock-program", "solana-hash", "solana-pubkey", @@ -3521,6 +3522,7 @@ dependencies = [ "futures-util", "lru", "magicblock-committor-program", + "magicblock-core", "magicblock-delegation-program 1.1.3 (git+https://github.com/magicblock-labs/delegation-program.git?rev=2cb491032f372)", "magicblock-metrics", "magicblock-program", @@ -3577,6 +3579,7 @@ version = "0.8.2" dependencies = [ "flume", "magicblock-magic-program-api 0.8.2", + "serde", "solana-account", "solana-account-decoder", "solana-hash", @@ -5799,6 +5802,7 @@ dependencies = [ "futures", "magicblock-committor-program", "magicblock-committor-service", + "magicblock-core", "magicblock-delegation-program 1.1.3 (git+https://github.com/magicblock-labs/delegation-program.git?rev=2cb491032f372)", "magicblock-program", "magicblock-rpc-client", @@ -5825,6 +5829,7 @@ dependencies = [ "integration-test-tools", "magicblock-core", "magicblock-magic-program-api 0.8.2", + "magicblock-program", "program-schedulecommit", "rand 0.8.5", "schedulecommit-client", diff --git a/test-integration/schedulecommit/test-scenarios/Cargo.toml b/test-integration/schedulecommit/test-scenarios/Cargo.toml index f1913660d..ba385ac5b 100644 --- a/test-integration/schedulecommit/test-scenarios/Cargo.toml +++ b/test-integration/schedulecommit/test-scenarios/Cargo.toml @@ -10,6 +10,7 @@ tracing = { workspace = true } program-schedulecommit = { workspace = true, features = ["no-entrypoint"] } schedulecommit-client = { workspace = true } magicblock-core = { workspace = true } +magicblock-program = { workspace = true } magicblock_magic_program_api = { workspace = true } solana-program = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/test-integration/schedulecommit/test-scenarios/tests/03_commit_limit.rs b/test-integration/schedulecommit/test-scenarios/tests/03_commit_limit.rs new file mode 100644 index 000000000..c501adfc4 --- /dev/null +++ b/test-integration/schedulecommit/test-scenarios/tests/03_commit_limit.rs @@ -0,0 +1,205 @@ +use std::sync::OnceLock; + +use integration_test_tools::run_test; +use magicblock_program::magic_sys::{COMMIT_LIMIT, COMMIT_LIMIT_ERR}; +use program_schedulecommit::api::{ + schedule_commit_and_undelegate_cpi_instruction, + schedule_commit_cpi_instruction, UserSeeds, +}; +use schedulecommit_client::{ + verify, ScheduleCommitTestContext, ScheduleCommitTestContextFields, +}; +use solana_rpc_client::rpc_client::SerializableTransaction; +use solana_rpc_client_api::config::RpcSendTransactionConfig; +use solana_sdk::{ + instruction::InstructionError, signer::Signer, transaction::Transaction, +}; +use test_kit::init_logger; +use tracing::*; +use utils::{ + assert_account_was_undelegated_on_chain, assert_committee_was_committed, + assert_is_instruction_error, extract_transaction_error, + get_context_with_delegated_committees, +}; + +mod utils; + +// --------------------------------------------------------------------------- +// Shared prepared context +// +// One context holding NUM_TESTS committees. All accounts are committed +// COMMIT_LIMIT times together (one tx per round, all PDAs in each tx) so +// they sit at the boundary of failure. Each test then uses its own committee +// slot independently. +// +// OnceLock ensures preparation runs exactly once regardless of which test +// triggers it first; the other test blocks until the pool is ready. +// --------------------------------------------------------------------------- + +const NUM_TESTS: usize = 2; +const IDX_COMMIT_FAILS: usize = 0; +const IDX_UNDELEGATE_SUCCEEDS: usize = 1; + +static PREPARED: OnceLock = OnceLock::new(); + +fn get_prepared() -> &'static ScheduleCommitTestContext { + PREPARED.get_or_init(|| { + let ctx = get_context_with_delegated_committees( + NUM_TESTS, + UserSeeds::MagicScheduleCommit, + ); + + let ScheduleCommitTestContextFields { + payer_ephem: payer, + committees, + commitment, + ephem_client, + .. + } = ctx.fields(); + + let players: Vec<_> = + committees.iter().map(|(p, _)| p.pubkey()).collect(); + let pdas: Vec<_> = committees.iter().map(|(_, pda)| *pda).collect(); + + for n in 0..COMMIT_LIMIT { + let ix = schedule_commit_cpi_instruction( + payer.pubkey(), + magicblock_magic_program_api::id(), + magicblock_magic_program_api::MAGIC_CONTEXT_PUBKEY, + &players, + &pdas, + ); + let blockhash = ephem_client.get_latest_blockhash().unwrap(); + let tx = Transaction::new_signed_with_payer( + &[ix], + Some(&payer.pubkey()), + &[&payer], + blockhash, + ); + let sig = *tx.get_signature(); + ephem_client + .send_and_confirm_transaction_with_spinner_and_config( + &tx, + *commitment, + RpcSendTransactionConfig { + skip_preflight: true, + ..Default::default() + }, + ) + .unwrap_or_else(|e| { + panic!( + "prepare commit {}/{} failed: {:?}", + n + 1, + COMMIT_LIMIT, + e + ) + }); + info!("prepare commit {}/{}: {}", n + 1, COMMIT_LIMIT, sig); + verify::fetch_and_verify_commit_result_from_logs(&ctx, sig); + } + + ctx + }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[test] +fn test_schedule_commit_fails_at_commit_limit() { + run_test!({ + let ctx = get_prepared(); + let ScheduleCommitTestContextFields { + payer_ephem: payer, + committees, + commitment, + ephem_client, + .. + } = ctx.fields(); + + let committee = &committees[IDX_COMMIT_FAILS]; + let ix = schedule_commit_cpi_instruction( + payer.pubkey(), + magicblock_magic_program_api::id(), + magicblock_magic_program_api::MAGIC_CONTEXT_PUBKEY, + &[committee.0.pubkey()], + &[committee.1], + ); + let blockhash = ephem_client.get_latest_blockhash().unwrap(); + let tx = Transaction::new_signed_with_payer( + &[ix], + Some(&payer.pubkey()), + &[&payer], + blockhash, + ); + + let res = ephem_client + .send_and_confirm_transaction_with_spinner_and_config( + &tx, + *commitment, + RpcSendTransactionConfig { + skip_preflight: true, + ..Default::default() + }, + ); + + let (tx_result_err, tx_err) = extract_transaction_error(res); + assert_is_instruction_error( + tx_err.unwrap(), + &tx_result_err, + InstructionError::Custom(COMMIT_LIMIT_ERR), + ); + }); +} + +#[test] +fn test_schedule_commit_and_undelegate_succeeds_at_commit_limit() { + run_test!({ + let ctx = get_prepared(); + let ScheduleCommitTestContextFields { + payer_ephem: payer, + committees, + commitment, + ephem_client, + .. + } = ctx.fields(); + + let committee = &committees[IDX_UNDELEGATE_SUCCEEDS]; + let ix = schedule_commit_and_undelegate_cpi_instruction( + payer.pubkey(), + magicblock_magic_program_api::id(), + magicblock_magic_program_api::MAGIC_CONTEXT_PUBKEY, + &[committee.0.pubkey()], + &[committee.1], + ); + let blockhash = ephem_client.get_latest_blockhash().unwrap(); + let tx = Transaction::new_signed_with_payer( + &[ix], + Some(&payer.pubkey()), + &[&payer], + blockhash, + ); + + let sig = *tx.get_signature(); + ephem_client + .send_and_confirm_transaction_with_spinner_and_config( + &tx, + *commitment, + RpcSendTransactionConfig { + skip_preflight: true, + ..Default::default() + }, + ) + .unwrap_or_else(|e| panic!("undelegate at limit failed: {:?}", e)); + + // verify via logs using a single-committee context view + let res = verify::fetch_and_verify_commit_result_from_logs(ctx, sig); + assert_committee_was_committed(committee.1, &res, true); + assert_account_was_undelegated_on_chain( + ctx, + committee.1, + program_schedulecommit::id(), + ); + }); +} diff --git a/test-integration/schedulecommit/test-scenarios/tests/utils/mod.rs b/test-integration/schedulecommit/test-scenarios/tests/utils/mod.rs index 342f94ed2..ca63ce2b3 100644 --- a/test-integration/schedulecommit/test-scenarios/tests/utils/mod.rs +++ b/test-integration/schedulecommit/test-scenarios/tests/utils/mod.rs @@ -37,16 +37,14 @@ pub fn get_context_with_delegated_committees( // ----------------- // Asserts // ----------------- -#[allow(dead_code)] // used in 02_commit_and_undelegate.rs -pub fn assert_one_committee_was_committed( - ctx: &ScheduleCommitTestContext, +#[allow(dead_code)] +pub fn assert_committee_was_committed( + pda: Pubkey, res: &ScheduledCommitResult, is_single_stage: bool, ) where T: std::fmt::Debug + borsh::BorshDeserialize + PartialEq + Eq, { - let pda = ctx.committees[0].1; - assert_eq!(res.included.len(), 1, "includes 1 pda"); assert_eq!(res.excluded.len(), 0, "excludes 0 pdas"); @@ -64,6 +62,17 @@ pub fn assert_one_committee_was_committed( ); } +#[allow(dead_code)] // used in 02_commit_and_undelegate.rs +pub fn assert_one_committee_was_committed( + ctx: &ScheduleCommitTestContext, + res: &ScheduledCommitResult, + is_single_stage: bool, +) where + T: std::fmt::Debug + borsh::BorshDeserialize + PartialEq + Eq, +{ + assert_committee_was_committed(ctx.committees[0].1, res, is_single_stage); +} + #[allow(dead_code)] // used in 02_commit_and_undelegate.rs pub fn assert_two_committees_were_committed( ctx: &ScheduleCommitTestContext, diff --git a/test-integration/test-committor-service/Cargo.toml b/test-integration/test-committor-service/Cargo.toml index 4d6af1f58..e61a5c1d5 100644 --- a/test-integration/test-committor-service/Cargo.toml +++ b/test-integration/test-committor-service/Cargo.toml @@ -8,6 +8,7 @@ async-trait = { workspace = true } borsh = { workspace = true } tracing = { workspace = true } futures = { workspace = true } +magicblock-core = { workspace = true } magicblock-committor-program = { workspace = true, features = [ "no-entrypoint", ] } diff --git a/test-integration/test-committor-service/tests/common.rs b/test-integration/test-committor-service/tests/common.rs index 67d989d30..e69872e32 100644 --- a/test-integration/test-committor-service/tests/common.rs +++ b/test-integration/test-committor-service/tests/common.rs @@ -1,27 +1,16 @@ -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, }; -use async_trait::async_trait; use magicblock_committor_service::{ - intent_executor::{ - task_info_fetcher::{ - ResetType, TaskInfoFetcher, TaskInfoFetcherError, - TaskInfoFetcherResult, - }, - IntentExecutorImpl, - }, tasks::commit_task::{CommitDelivery, CommitTask}, transaction_preparator::{ delivery_preparator::DeliveryPreparator, TransactionPreparatorImpl, }, ComputeBudgetConfig, }; -use magicblock_program::magic_scheduled_base_intent::CommittedAccount; +use magicblock_core::intent::CommittedAccount; use magicblock_rpc_client::MagicblockRpcClient; use magicblock_table_mania::{GarbageCollectorConfig, TableMania}; use solana_account::Account; @@ -98,72 +87,6 @@ impl TestFixture { self.compute_budget_config.clone(), ) } - - #[allow(dead_code)] - pub fn create_intent_executor( - &self, - ) -> IntentExecutorImpl - { - let transaction_preparator = self.create_transaction_preparator(); - - IntentExecutorImpl::new( - self.rpc_client.clone(), - transaction_preparator, - self.create_task_info_fetcher(), - ) - } - - #[allow(dead_code)] - pub fn create_task_info_fetcher(&self) -> Arc { - Arc::new(MockTaskInfoFetcher(self.rpc_client.clone())) - } -} - -pub struct MockTaskInfoFetcher(MagicblockRpcClient); - -#[async_trait] -impl TaskInfoFetcher for MockTaskInfoFetcher { - async fn fetch_next_commit_ids( - &self, - pubkeys: &[Pubkey], - _: u64, - ) -> TaskInfoFetcherResult> { - Ok(pubkeys.iter().map(|pubkey| (*pubkey, 0)).collect()) - } - - async fn fetch_rent_reimbursements( - &self, - pubkeys: &[Pubkey], - _: u64, - ) -> TaskInfoFetcherResult> { - Ok(pubkeys.to_vec()) - } - - fn peek_commit_id(&self, _pubkey: &Pubkey) -> Option { - None - } - - fn reset(&self, _: ResetType) {} - - async fn get_base_accounts( - &self, - pubkeys: &[Pubkey], - _: u64, - ) -> TaskInfoFetcherResult> { - self.0 - .get_multiple_accounts(pubkeys, None) - .await - .map_err(|err| { - TaskInfoFetcherError::MagicBlockRpcClientError(Box::new(err)) - }) - .map(|accounts| { - pubkeys - .iter() - .zip(accounts) - .filter_map(|(key, value)| value.map(|value| (*key, value))) - .collect() - }) - } } #[allow(dead_code)] diff --git a/test-integration/test-committor-service/tests/test_intent_executor.rs b/test-integration/test-committor-service/tests/test_intent_executor.rs index fd53a00bf..b81a16056 100644 --- a/test-integration/test-committor-service/tests/test_intent_executor.rs +++ b/test-integration/test-committor-service/tests/test_intent_executor.rs @@ -17,7 +17,8 @@ use magicblock_committor_service::{ intent_executor::{ error::{IntentExecutorError, TransactionStrategyExecutionError}, task_info_fetcher::{ - CacheTaskInfoFetcher, TaskInfoFetcher, TaskInfoFetcherError, + CacheTaskInfoFetcher, RpcTaskInfoFetcher, TaskInfoFetcher, + TaskInfoFetcherError, }, ExecutionOutput, IntentExecutionResult, IntentExecutor, IntentExecutorImpl, @@ -29,11 +30,12 @@ use magicblock_committor_service::{ }, transaction_preparator::TransactionPreparatorImpl, }; +use magicblock_core::intent::CommittedAccount; use magicblock_program::{ args::ShortAccountMeta, magic_scheduled_base_intent::{ - BaseAction, CommitAndUndelegate, CommitType, CommittedAccount, - MagicBaseIntent, ProgramArgs, ScheduledIntentBundle, UndelegateType, + BaseAction, CommitAndUndelegate, CommitType, MagicBaseIntent, + ProgramArgs, ScheduledIntentBundle, UndelegateType, }, validator::validator_authority_id, }; @@ -74,9 +76,9 @@ const ACTOR_ESCROW_INDEX: u8 = 1; struct TestEnv { fixture: TestFixture, - task_info_fetcher: Arc, + task_info_fetcher: Arc>, intent_executor: - IntentExecutorImpl, + IntentExecutorImpl, pre_test_tablemania_state: HashMap, } @@ -88,8 +90,9 @@ impl TestEnv { .await; let transaction_preparator = fixture.create_transaction_preparator(); - let task_info_fetcher = - Arc::new(CacheTaskInfoFetcher::new(fixture.rpc_client.clone())); + let task_info_fetcher = Arc::new(CacheTaskInfoFetcher::new( + RpcTaskInfoFetcher::new(fixture.rpc_client.clone()), + )); let tm = &fixture.table_mania; let mut pre_test_tablemania_state = HashMap::new(); @@ -137,7 +140,7 @@ async fn test_commit_id_error_parsing() { // Invalidate ids before execution task_info_fetcher - .fetch_next_commit_ids( + .fetch_next_commit_nonces( &intent.get_undelegate_intent_pubkeys().unwrap(), remote_slot, ) @@ -444,7 +447,7 @@ async fn test_commit_id_error_recovery() { // Invalidate commit nonce cache let res = task_info_fetcher - .fetch_next_commit_ids(&[committed_account.pubkey], remote_slot) + .fetch_next_commit_nonces(&[committed_account.pubkey], remote_slot) .await; assert!(res.is_ok()); assert!(res.unwrap().contains_key(&committed_account.pubkey)); @@ -475,15 +478,14 @@ async fn test_commit_id_error_recovery() { // Cleanup succeeds assert!(intent_executor.cleanup().await.is_ok()); - let commit_ids_by_pk: HashMap<_, _> = [&committed_account] - .iter() - .map(|el| { - ( - el.pubkey, - task_info_fetcher.peek_commit_id(&el.pubkey).unwrap(), - ) - }) - .collect(); + let mut commit_ids_by_pk = HashMap::new(); + for el in [&committed_account].iter() { + let nonce = task_info_fetcher + .peek_commit_nonce(&el.pubkey) + .await + .unwrap(); + commit_ids_by_pk.insert(el.pubkey, nonce); + } verify( &fixture.table_mania, @@ -645,7 +647,7 @@ async fn test_commit_id_and_action_errors_recovery() { // Invalidate commit nonce cache let res = task_info_fetcher - .fetch_next_commit_ids(&[committed_account.pubkey], remote_slot) + .fetch_next_commit_nonces(&[committed_account.pubkey], remote_slot) .await; assert!(res.is_ok()); assert!(res.unwrap().contains_key(&committed_account.pubkey)); @@ -772,15 +774,14 @@ async fn test_cpi_limits_error_recovery() { // Cleanup after intent assert!(intent_executor.cleanup().await.is_ok()); - let commit_ids_by_pk: HashMap<_, _> = committed_accounts - .iter() - .map(|el| { - ( - el.pubkey, - task_info_fetcher.peek_commit_id(&el.pubkey).unwrap(), - ) - }) - .collect(); + let mut commit_ids_by_pk = HashMap::new(); + for el in committed_accounts.iter() { + let nonce = task_info_fetcher + .peek_commit_nonce(&el.pubkey) + .await + .unwrap(); + commit_ids_by_pk.insert(el.pubkey, nonce); + } verify( &fixture.table_mania, @@ -851,7 +852,7 @@ async fn test_commit_id_actions_cpi_limit_errors_recovery() { // Force CommitIDError by invalidating the commit-nonce cache before running let pubkeys: Vec<_> = committed_accounts.iter().map(|c| c.pubkey).collect(); let mut invalidated_keys = task_info_fetcher - .fetch_next_commit_ids(&pubkeys, Default::default()) + .fetch_next_commit_nonces(&pubkeys, Default::default()) .await .unwrap(); @@ -907,15 +908,14 @@ async fn test_commit_id_actions_cpi_limit_errors_recovery() { // Cleanup after intent assert!(intent_executor.cleanup().await.is_ok()); - let commit_ids_by_pk: HashMap<_, _> = committed_accounts - .iter() - .map(|el| { - ( - el.pubkey, - task_info_fetcher.peek_commit_id(&el.pubkey).unwrap(), - ) - }) - .collect(); + let mut commit_ids_by_pk = HashMap::new(); + for el in committed_accounts.iter() { + let nonce = task_info_fetcher + .peek_commit_nonce(&el.pubkey) + .await + .unwrap(); + commit_ids_by_pk.insert(el.pubkey, nonce); + } verify( &fixture.table_mania, fixture.rpc_client.get_inner(), @@ -1239,7 +1239,7 @@ fn create_scheduled_intent( async fn single_flow_transaction_strategy( authority: &Pubkey, - task_info_fetcher: &Arc, + task_info_fetcher: &Arc>, intent: &ScheduledIntentBundle, ) -> TransactionStrategy { let mut tasks = TaskBuilderImpl::commit_tasks( diff --git a/test-integration/test-committor-service/tests/test_ix_commit_local.rs b/test-integration/test-committor-service/tests/test_ix_commit_local.rs index 0a0417fce..f25bab459 100644 --- a/test-integration/test-committor-service/tests/test_ix_commit_local.rs +++ b/test-integration/test-committor-service/tests/test_ix_commit_local.rs @@ -12,9 +12,10 @@ use magicblock_committor_service::{ service_ext::{BaseIntentCommittorExt, CommittorServiceExt}, BaseIntentCommittor, CommittorService, ComputeBudgetConfig, }; +use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ - CommitAndUndelegate, CommitType, CommittedAccount, MagicBaseIntent, - MagicIntentBundle, ScheduledIntentBundle, UndelegateType, + CommitAndUndelegate, CommitType, MagicBaseIntent, MagicIntentBundle, + ScheduledIntentBundle, UndelegateType, }; use magicblock_rpc_client::MagicblockRpcClient; use program_flexi_counter::state::FlexiCounter;