Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
/// Emits the current optimized/unoptimized stream counts as
/// metrics.
fn update_stream_metrics(&self) {
self.update_stream_metrics_with_extra(0);
}

fn update_stream_metrics_with_extra(&self, extra_streams: usize) {
metrics::set_grpc_optimized_streams_gauge(
&self.client_id,
self.optimized_old_handles.len(),
Expand All @@ -362,6 +366,12 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
&self.client_id,
self.unoptimized_old_handles.len(),
);
let total = self.optimized_old_handles.len()
+ self.unoptimized_old_handles.len()
+ usize::from(self.current_new_handle.is_some())
+ usize::from(self.program_sub.is_some())
+ extra_streams;
metrics::set_grpc_total_streams_gauge(&self.client_id, total);
}

/// Rebuild all account streams from `subscriptions`.
Expand All @@ -384,16 +394,20 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
// Remove all account streams from the map but keep them
// alive until the new optimized streams are created to
// avoid a gap without any active streams (race condition).
let _prev_current_new = self.stream_map.remove(&StreamKey::CurrentNew);
let _prev_unoptimized: Vec<_> = (0..self.unoptimized_old_handles.len())
let prev_current_new = self.stream_map.remove(&StreamKey::CurrentNew);
let prev_unoptimized: Vec<_> = (0..self.unoptimized_old_handles.len())
.filter_map(|i| {
self.stream_map.remove(&StreamKey::UnoptimizedOld(i))
})
.collect();
let _prev_optimized: Vec<_> = (0..self.optimized_old_handles.len())
let prev_optimized: Vec<_> = (0..self.optimized_old_handles.len())
.filter_map(|i| self.stream_map.remove(&StreamKey::OptimizedOld(i)))
.collect();

let prev_stream_count = usize::from(prev_current_new.is_some())
+ prev_unoptimized.len()
+ prev_optimized.len();

// Collect all active subscriptions and chunk them.
let all_pks: Vec<Pubkey> =
self.subscriptions.read().iter().copied().collect();
Expand Down Expand Up @@ -423,11 +437,19 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
self.current_new_subs.clear();
self.current_new_handle = None;

self.update_stream_metrics();
// Record the spike: new streams + previous streams still alive.
self.update_stream_metrics_with_extra(prev_stream_count);

self.optimized_since_last_check = true;

// Old streams are dropped here when _prev_* go out of scope,
// after the new optimized streams are already active.
// Old streams are dropped here, update metric to reflect
// the actual count without the previous streams.
drop(prev_current_new);
drop(prev_unoptimized);
drop(prev_optimized);

self.update_stream_metrics();

Ok(())
}

Expand Down Expand Up @@ -572,6 +594,7 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
.await?;
self.stream_map.insert(StreamKey::Program, stream);
self.program_sub = Some((subscribed_programs, handle));
self.update_stream_metrics_with_extra(0);
}

Ok(())
Expand All @@ -586,6 +609,7 @@ impl<S: StreamHandle, SF: StreamFactory<S>> StreamManager<S, SF> {
pub fn clear_program_subscriptions(&mut self) {
self.stream_map.remove(&StreamKey::Program);
self.program_sub = None;
self.update_stream_metrics_with_extra(0);
}

/// Build a `SubscribeRequest` for the given program IDs.
Expand Down
17 changes: 17 additions & 0 deletions magicblock-metrics/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,16 @@ lazy_static::lazy_static! {
&["client_id"],
)
.unwrap();

static ref GRPC_TOTAL_STREAMS_GAUGE: IntGaugeVec =
IntGaugeVec::new(
Opts::new(
"grpc_total_streams_gauge",
"Total number of GRPC streams including current stream and program stream",
),
&["client_id"],
)
.unwrap();
Comment thread
thlorenz marked this conversation as resolved.
}

pub(crate) fn register() {
Expand Down Expand Up @@ -600,6 +610,7 @@ pub(crate) fn register() {
register!(PUBSUB_CLIENT_CONNECTIONS_GAUGE);
register!(GRPC_OPTIMIZED_STREAMS_GAUGE);
register!(GRPC_UNOPTIMIZED_STREAMS_GAUGE);
register!(GRPC_TOTAL_STREAMS_GAUGE);
});
}

Expand Down Expand Up @@ -927,3 +938,9 @@ pub fn set_grpc_unoptimized_streams_gauge(client_id: &str, count: usize) {
.with_label_values(&[client_id])
.set(count as i64);
}

pub fn set_grpc_total_streams_gauge(client_id: &str, count: usize) {
GRPC_TOTAL_STREAMS_GAUGE
.with_label_values(&[client_id])
.set(count as i64);
}
Loading