Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use magicblock_config::config::GrpcConfig;
use magicblock_metrics::metrics;
use solana_pubkey::Pubkey;
use tokio_stream::StreamMap;
use tracing::warn;
use tracing::{trace, warn};

use super::{
write_with_retry, LaserResult, LaserStream, LaserStreamWithHandle,
Expand Down Expand Up @@ -141,6 +141,12 @@ pub struct StreamManager<S: StreamHandle, SF: StreamFactory<S>> {
/// Consumed by [Self::take_optimized_flag] so the actor can
/// reset its time-based optimization interval.
optimized_since_last_check: bool,

/// Defensive guard against re-entrant [Self::optimize] calls.
/// Today re-entry is impossible (`&mut self` + single
/// `tokio::select!` loop), but the flag protects against future
/// refactors that might introduce concurrency.
optimizing: bool,
}

#[allow(unused)]
Expand All @@ -164,6 +170,7 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
chain_slot,
client_id,
optimized_since_last_check: false,
optimizing: false,
};
mgr.update_stream_metrics();
mgr
Expand Down Expand Up @@ -305,6 +312,7 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
self.stream_map.remove(&StreamKey::OptimizedOld(i));
}
self.optimized_old_handles.clear();
self.optimizing = false;
self.update_stream_metrics();
}

Expand Down Expand Up @@ -390,6 +398,25 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
pub async fn optimize(
&mut self,
commitment: &CommitmentLevel,
) -> RemoteAccountProviderResult<()> {
if self.optimizing {
trace!(
client_id = self.client_id,
"optimize() called while already optimizing, skipping"
);
return Ok(());
}
{
self.optimizing = true;
let result = self.optimize_inner(commitment).await;
self.optimizing = false;
result
}
}

async fn optimize_inner(
&mut self,
commitment: &CommitmentLevel,
) -> RemoteAccountProviderResult<()> {
// Remove all account streams from the map but keep them
// alive until the new optimized streams are created to
Expand Down Expand Up @@ -437,7 +464,8 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
self.current_new_subs.clear();
self.current_new_handle = None;

// Record the spike: new streams + previous streams still alive.
// Record the spike: new streams + previous streams still
// alive.
self.update_stream_metrics_with_extra(prev_stream_count);

self.optimized_since_last_check = true;
Expand Down
Loading