diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 89eefe2bed7..1332a7c1edf 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7121,6 +7121,8 @@ dependencies = [ "hyper-util", "itertools 0.14.0", "once_cell", + "opentelemetry", + "opentelemetry_sdk", "pin-project", "pnet", "prometheus", diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 899e289182a..0d5aea15273 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -521,7 +521,7 @@ mod tests { .unwrap(); // At this point the actor was started and even processed a message entirely. let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let wait_duration = Duration::from_millis(1); let processed = mailbox .send_message_with_backpressure_counter( @@ -548,7 +548,7 @@ mod tests { .await .unwrap(); let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let wait_duration = Duration::from_millis(1); mailbox .send_message_with_backpressure_counter( @@ -580,7 +580,7 @@ mod tests { .await .unwrap(); let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let start = Instant::now(); mailbox .ask_with_backpressure_counter(Duration::from_millis(1), None) diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index f16e7db4ce5..f20d2f800a5 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -17,13 +17,15 @@ use std::sync::Arc; use std::{env, fmt}; use anyhow::Context; +use opentelemetry::metrics::MeterProvider; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::{ - LogExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, + LogExporter, MetricExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, }; use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality}; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; use opentelemetry_sdk::{Resource, trace}; @@ -81,6 +83,26 @@ impl OtlpProtocol { } .context("failed to initialize OTLP traces exporter") } + + fn metric_exporter(&self, temporality: Temporality) -> anyhow::Result { + match self { + OtlpProtocol::Grpc => MetricExporter::builder() + .with_tonic() + .with_temporality(temporality) + .build(), + OtlpProtocol::HttpProtobuf => MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_protocol(OtlpWireProtocol::HttpBinary) + .build(), + OtlpProtocol::HttpJson => MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_protocol(OtlpWireProtocol::HttpJson) + .build(), + } + .context("failed to initialize OTLP metrics exporter") + } } impl FromStr for OtlpProtocol { @@ -104,11 +126,60 @@ impl FromStr for OtlpProtocol { } } +struct OtlpMetricsTemporality(Temporality); + +impl FromStr for OtlpMetricsTemporality { + type Err = anyhow::Error; + + fn from_str(temporality_str: &str) -> anyhow::Result { + const TEMPORALITY_DELTA: &str = "delta"; + const TEMPORALITY_LOWMEMORY: &str = "lowmemory"; + const TEMPORALITY_CUMULATIVE: &str = "cumulative"; + + match temporality_str { + TEMPORALITY_DELTA => Ok(OtlpMetricsTemporality(Temporality::Delta)), + TEMPORALITY_LOWMEMORY => Ok(OtlpMetricsTemporality(Temporality::LowMemory)), + TEMPORALITY_CUMULATIVE => Ok(OtlpMetricsTemporality(Temporality::Cumulative)), + other => anyhow::bail!( + "unsupported OTLP metrics temporality `{other}`, supported values are \ + `{TEMPORALITY_DELTA}`, `{TEMPORALITY_LOWMEMORY}` and `{TEMPORALITY_CUMULATIVE}`" + ), + } + } +} + +impl From for Temporality { + fn from(t: OtlpMetricsTemporality) -> Self { + t.0 + } +} + +pub struct TelemetryProviders { + tracer_provider: SdkTracerProvider, + logger_provider: SdkLoggerProvider, + meter_provider: SdkMeterProvider, +} + +impl TelemetryProviders { + pub fn shutdown(self) -> anyhow::Result<()> { + self.tracer_provider + .shutdown() + .context("failed to shutdown OpenTelemetry tracer provider")?; + self.logger_provider + .shutdown() + .context("failed to shutdown OpenTelemetry logger provider")?; + self.meter_provider + .shutdown() + .context("failed to shutdown OpenTelemetry meter provider")?; + Ok(()) + } +} + #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; /// Load the default logging filter from the environment. The filter can later -/// be updated using the result callback of [setup_logging_and_tracing]. +/// be updated using the result callback of [init_telemetry_providers]. fn startup_env_filter(level: Level) -> anyhow::Result { let env_filter = env::var("RUST_LOG") .map(|_| EnvFilter::from_default_env()) @@ -119,14 +190,11 @@ fn startup_env_filter(level: Level) -> anyhow::Result { type ReloadLayer = tracing_subscriber::reload::Layer; -pub fn setup_logging_and_tracing( +pub fn init_telemetry_providers( level: Level, ansi_colors: bool, build_info: &BuildInfo, -) -> anyhow::Result<( - EnvFilterReloadFn, - Option<(SdkTracerProvider, SdkLoggerProvider)>, -)> { +) -> anyhow::Result<(EnvFilterReloadFn, Option)> { #[cfg(feature = "tokio-console")] { if get_bool_from_env(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY, false) { @@ -204,12 +272,37 @@ pub fn setup_logging_and_tracing( .with_batch_exporter(log_exporter) .build(); - let tracing_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + let metrics_protocol_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", false); + let metrics_protocol = metrics_protocol_opt + .as_deref() + .map(OtlpProtocol::from_str) + .transpose()? + .unwrap_or(global_protocol); + let temporality_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE", false); + let temporality = temporality_opt + .as_deref() + .map(OtlpMetricsTemporality::from_str) + .transpose()? + .map(Temporality::from) + .unwrap_or(Temporality::Cumulative); + let metric_exporter = metrics_protocol.metric_exporter(temporality)?; + + let meter_provider = SdkMeterProvider::builder() + .with_resource(resource.clone()) + .with_periodic_exporter(metric_exporter) + .build(); + + let meter = meter_provider.meter("quickwit"); + quickwit_common::metrics::install_otel_meter(meter); + + let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() .with_span_processor(span_processor) .with_resource(resource) .build(); - let tracer = tracing_provider.tracer("quickwit"); + let tracer = tracer_provider.tracer("quickwit"); let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); // Bridge between tracing logs and otel tracing events @@ -220,7 +313,11 @@ pub fn setup_logging_and_tracing( .with(logs_otel_layer) .try_init() .context("failed to register tracing subscriber")?; - Some((tracing_provider, logger_provider)) + Some(TelemetryProviders { + tracer_provider, + logger_provider, + meter_provider, + }) } else { registry .try_init() diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 4a1f9ce036e..77064f2be63 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -22,7 +22,7 @@ use quickwit_cli::checklist::RED_COLOR; use quickwit_cli::cli::{CliCommand, build_cli}; #[cfg(feature = "jemalloc")] use quickwit_cli::jemalloc::start_jemalloc_metrics_loop; -use quickwit_cli::logger::setup_logging_and_tracing; +use quickwit_cli::logger::init_telemetry_providers; use quickwit_cli::{busy_detector, install_default_crypto_ring_provider}; use quickwit_common::runtimes::scrape_tokio_runtime_metrics; use quickwit_serve::BuildInfo; @@ -98,8 +98,8 @@ async fn main_impl() -> anyhow::Result<()> { start_jemalloc_metrics_loop(); let build_info = BuildInfo::get(); - let (env_filter_reload_fn, tracer_provider_opt) = - setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; + let (env_filter_reload_fn, telemetry_providers) = + init_telemetry_providers(command.default_log_level(), ansi_colors, build_info)?; let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await { error!(error=%command_error, "command failed"); @@ -113,13 +113,8 @@ async fn main_impl() -> anyhow::Result<()> { 0 }; - if let Some((trace_provider, logs_provider)) = tracer_provider_opt { - trace_provider - .shutdown() - .context("failed to shutdown OpenTelemetry tracer provider")?; - logs_provider - .shutdown() - .context("failed to shutdown OpenTelemetry logs provider")?; + if let Some(providers) = telemetry_providers { + providers.shutdown()?; } std::process::exit(return_code) diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index ee5fc303644..a49fed6c5d4 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -29,6 +29,7 @@ hyper = { workspace = true } hyper-util = { workspace = true, optional = true } itertools = { workspace = true } once_cell = { workspace = true } +opentelemetry = { workspace = true } pin-project = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } @@ -63,6 +64,7 @@ jemalloc-profiled = [ [dev-dependencies] hyper-util = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["testing"] } proptest = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index 69c2091c237..c4e5e5e1d1d 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -34,10 +34,9 @@ use async_speed_limit::limiter::Consume; use bytesize::ByteSize; use once_cell::sync::Lazy; use pin_project::pin_project; -use prometheus::IntCounter; use tokio::io::AsyncWrite; -use crate::metrics::{IntCounterVec, new_counter_vec}; +use crate::metrics::{IntCounter, IntCounterVec, new_counter_vec}; use crate::{KillSwitch, Progress, ProtectedZoneGuard}; // Max 1MB at a time. @@ -99,7 +98,7 @@ pub struct IoControls { impl Default for IoControls { fn default() -> Self { let default_bytes_counter = - IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap(); + IntCounter::new("default_write_num_bytes", "Default write counter.", "", &[]); IoControls { throughput_limiter_opt: None, progress: Progress::default(), diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 193def5e01a..19c5d9be3b5 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -13,29 +13,198 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::sync::{LazyLock, OnceLock}; +use std::sync::{Arc, LazyLock, OnceLock}; +use std::time::Instant; -use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder}; -pub use prometheus::{ - Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter, - IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec, - exponential_buckets, linear_buckets, -}; +use opentelemetry::KeyValue; +use opentelemetry::metrics::Meter; +use prometheus::{HistogramOpts, Opts, TextEncoder}; +pub use prometheus::{exponential_buckets, linear_buckets}; + +static OTEL_METER: OnceLock = OnceLock::new(); +const METRICS_NAMESPACE: &str = "quickwit"; + +pub fn install_otel_meter(meter: Meter) { + OTEL_METER + .set(meter) + .expect("OTel meter should only be installed once"); +} + +fn otel_meter() -> Option<&'static Meter> { + OTEL_METER.get() +} + +fn build_otel_attributes(const_labels: &[(&str, &str)]) -> Vec { + const_labels + .iter() + .map(|(k, v)| KeyValue::new(k.to_string(), v.to_string())) + .collect() +} + +fn build_prometheus_labels(const_labels: &[(&str, &str)]) -> HashMap { + const_labels + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() +} + +struct OtelState { + build_instrument: Box T + Send + Sync>, + instrument: OnceLock, +} + +impl OtelState { + fn new(build_instrument: impl Fn(&Meter) -> T + Send + Sync + 'static) -> Self { + OtelState { + build_instrument: Box::new(build_instrument), + instrument: OnceLock::new(), + } + } + + fn bind(&self, meter: &Meter) -> Option<&T> { + Some( + self.instrument + .get_or_init(|| (self.build_instrument)(meter)), + ) + } + + fn get(&self) -> Option<&T> { + self.instrument.get() + } +} #[derive(Clone)] -pub struct HistogramVec { - underlying: PrometheusHistogramVec, +struct OtelMetric { + state: Option>>, + attributes: Vec, } -impl HistogramVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { - self.underlying.with_label_values(&label_values) +impl OtelMetric { + fn new(state: Option>>, attributes: Vec) -> Self { + Self { state, attributes } + } + + fn with_attributes(&self, names: &[String], values: [&str; N]) -> Self { + if self.state.is_none() { + return Self::default(); + } + let mut attributes = self.attributes.clone(); + for (name, value) in names.iter().zip(values.iter()) { + attributes.push(KeyValue::new(name.clone(), value.to_string())); + } + Self { + state: self.state.clone(), + attributes, + } + } + + /// Invokes a recording operation (e.g. `counter.add`) on the bound OTel + /// instrument, passing in this metric's attributes. Lazily binds the + /// instrument on first use after the meter becomes available. No-ops when + /// OTel is disabled or the meter has not been installed yet. + fn with_instrument(&self, f: impl FnOnce(&T, &[KeyValue])) { + let Some(instrument) = self.get_or_bind_instrument() else { + return; + }; + f(instrument, &self.attributes); + } + + fn get_or_bind_instrument(&self) -> Option<&T> { + let state = self.state.as_ref()?; + if let Some(instrument) = state.get() { + return Some(instrument); + } + let meter = otel_meter()?; + state.bind(meter) + } +} + +impl Default for OtelMetric { + fn default() -> Self { + Self { + state: None, + attributes: Vec::new(), + } + } +} + +impl OtelMetric> { + fn add(&self, value: u64) { + self.with_instrument(|counter, attributes| counter.add(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: i64) { + self.with_instrument(|gauge, attributes| gauge.record(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: f64) { + self.with_instrument(|gauge, attributes| gauge.record(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: f64) { + self.with_instrument(|histogram, attributes| histogram.record(value, attributes)); + } +} + +#[derive(Clone)] +pub struct IntCounter { + prometheus: prometheus::IntCounter, + otel: OtelMetric>, +} + +impl std::fmt::Debug for IntCounter { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("IntCounter") + .field("value", &self.prometheus.get()) + .finish() + } +} + +impl IntCounter { + pub fn new( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], + ) -> IntCounter { + let counter_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); + IntCounter { + prometheus: prom, + otel: OtelMetric::default(), + } + } + + pub fn inc(&self) { + self.prometheus.inc(); + self.otel.add(1); + } + + pub fn inc_by(&self, v: u64) { + self.prometheus.inc_by(v); + self.otel.add(v); + } + + pub fn get(&self) -> u64 { + self.prometheus.get() } } #[derive(Clone)] pub struct IntCounterVec { - underlying: PrometheusIntCounterVec, + prometheus: prometheus::IntCounterVec, + otel: OtelMetric>, + label_names: Vec, } impl IntCounterVec { @@ -46,62 +215,268 @@ impl IntCounterVec { const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let counter_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounterVec::new(counter_opts, &label_names) .expect("failed to create counter vec"); - IntCounterVec { underlying } + + IntCounterVec { + prometheus: prom, + otel: OtelMetric::new(None, build_otel_attributes(const_labels)), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { - self.underlying.with_label_values(&label_values) + IntCounter { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } + } +} + +/// For relative operations (`inc`, `dec`, `add`, `sub`), the OTel value is derived by reading the +/// Prometheus gauge after mutation, since OTel gauges do not support relative updates. This is not +/// atomic: under concurrent updates, the OTel side may briefly record a stale value. This is +/// acceptable for now because gauges are inherently point-in-time approximations, and the next +/// update self-corrects. +/// +/// TODO: for strict correctness, manage a single `AtomicI64` as the source of truth and feed its +/// value into both Prometheus and OTel. +#[derive(Clone)] +pub struct IntGauge { + prometheus: prometheus::IntGauge, + otel: OtelMetric>, +} + +impl IntGauge { + pub fn set(&self, v: i64) { + self.prometheus.set(v); + self.otel.record(v); + } + + pub fn inc(&self) { + self.prometheus.inc(); + self.otel.record(self.prometheus.get()); + } + + pub fn dec(&self) { + self.prometheus.dec(); + self.otel.record(self.prometheus.get()); + } + + pub fn add(&self, delta: i64) { + self.prometheus.add(delta); + self.otel.record(self.prometheus.get()); + } + + pub fn sub(&self, delta: i64) { + self.prometheus.sub(delta); + self.otel.record(self.prometheus.get()); + } + + pub fn get(&self) -> i64 { + self.prometheus.get() } } #[derive(Clone)] pub struct IntGaugeVec { - underlying: PrometheusIntGaugeVec, + prometheus: prometheus::IntGaugeVec, + otel: OtelMetric>, + label_names: Vec, } impl IntGaugeVec { pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { - self.underlying.with_label_values(&label_values) + IntGauge { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } + } +} + +#[derive(Clone)] +pub struct Gauge { + prometheus: prometheus::Gauge, + otel: OtelMetric>, +} + +impl Gauge { + pub fn set(&self, v: f64) { + self.prometheus.set(v); + self.otel.record(v); + } + + pub fn get(&self) -> f64 { + self.prometheus.get() + } +} + +#[derive(Clone)] +pub struct Histogram { + prometheus: prometheus::Histogram, + otel: OtelMetric>, +} + +impl Histogram { + pub fn observe(&self, v: f64) { + self.prometheus.observe(v); + self.otel.record(v); + } + + pub fn start_timer(&self) -> HistogramTimer { + HistogramTimer { + histogram: Some(self.clone()), + start: Instant::now(), + } + } +} + +pub struct HistogramTimer { + histogram: Option, + start: Instant, +} + +impl HistogramTimer { + pub fn observe_duration(mut self) { + if let Some(histogram) = self.histogram.take() { + histogram.observe(self.start.elapsed().as_secs_f64()); + } + } +} + +impl Drop for HistogramTimer { + fn drop(&mut self) { + if let Some(histogram) = self.histogram.take() { + histogram.observe(self.start.elapsed().as_secs_f64()); + } + } +} + +#[derive(Clone)] +pub struct HistogramVec { + prometheus: prometheus::HistogramVec, + otel: OtelMetric>, + label_names: Vec, +} + +impl HistogramVec { + pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { + Histogram { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } } } pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'static str, String>) { - let mut counter_opts = Opts::new(name, help).namespace("quickwit"); + let mut counter_opts = Opts::new(name, help).namespace(METRICS_NAMESPACE); for (k, v) in kvs { counter_opts = counter_opts.const_label(k, v); } - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); + let counter = + prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); counter.inc(); prometheus::register(Box::new(counter)).expect("failed to register counter"); } +fn new_otel_state( + name: &str, + subsystem: &str, + help: &str, + build_instrument: impl Fn(&Meter, &str, &str) -> T + Send + Sync + 'static, +) -> Arc> +where + T: Send + Sync + 'static, +{ + let name = if subsystem.is_empty() { + format!("{METRICS_NAMESPACE}_{name}") + } else { + format!("{METRICS_NAMESPACE}_{subsystem}_{name}") + }; + let description = help.to_string(); + Arc::new(OtelState::new(move |meter| { + build_instrument(meter, &name, &description) + })) +} + +fn new_counter_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .u64_counter(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_int_gauge_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .i64_gauge(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_float_gauge_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .f64_gauge(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_histogram_otel_state( + name: &str, + subsystem: &str, + help: &str, + boundaries: Vec, +) -> Arc>> { + new_otel_state(name, subsystem, help, move |meter, name, description| { + meter + .f64_histogram(name.to_string()) + .with_description(description.to_string()) + .with_boundaries(boundaries.clone()) + .build() + }) +} + pub fn new_counter( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], ) -> IntCounter { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let counter_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); - prometheus::register(Box::new(counter.clone())).expect("failed to register counter"); - counter + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); + prometheus::register(Box::new(prom.clone())).expect("failed to register counter"); + + IntCounter { + prometheus: prom, + otel: OtelMetric::new( + Some(new_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } pub fn new_counter_vec( @@ -111,82 +486,114 @@ pub fn new_counter_vec( const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let int_counter_vec = IntCounterVec::new(name, help, subsystem, const_labels, label_names); - let collector = Box::new(int_counter_vec.underlying.clone()); - prometheus::register(collector).expect("failed to register counter vec"); - int_counter_vec + let counter_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounterVec::new(counter_opts, &label_names) + .expect("failed to create counter vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register counter vec"); + + IntCounterVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } -pub fn new_float_gauge( +pub fn new_gauge( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], -) -> Gauge { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); +) -> IntGauge { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); - prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge"); - gauge + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge"); + + IntGauge { + prometheus: prom, + otel: OtelMetric::new( + Some(new_int_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } -pub fn new_gauge( +pub fn new_gauge_vec( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], -) -> IntGauge { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); + label_names: [&str; N], +) -> IntGaugeVec { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); - prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge"); - gauge + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge vec"); + + IntGaugeVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_int_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } -pub fn new_gauge_vec( +pub fn new_float_gauge( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], - label_names: [&str; N], -) -> IntGaugeVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); +) -> Gauge { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = - PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register float gauge"); - let collector = Box::new(underlying.clone()); - prometheus::register(collector).expect("failed to register counter vec"); - - IntGaugeVec { underlying } + Gauge { + prometheus: prom, + otel: OtelMetric::new( + Some(new_float_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } pub fn new_histogram(name: &str, help: &str, subsystem: &str, buckets: Vec) -> Histogram { let histogram_opts = HistogramOpts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .buckets(buckets); - let histogram = Histogram::with_opts(histogram_opts).expect("failed to create histogram"); - prometheus::register(Box::new(histogram.clone())).expect("failed to register histogram"); - histogram + .buckets(buckets.clone()); + let prom = + prometheus::Histogram::with_opts(histogram_opts).expect("failed to create histogram"); + prometheus::register(Box::new(prom.clone())).expect("failed to register histogram"); + + Histogram { + prometheus: prom, + otel: OtelMetric::new( + Some(new_histogram_otel_state( + name, + subsystem, + help, + buckets.clone(), + )), + Vec::new(), + ), + } } pub fn new_histogram_vec( @@ -197,22 +604,28 @@ pub fn new_histogram_vec( label_names: [&str; N], buckets: Vec, ) -> HistogramVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let histogram_opts = HistogramOpts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels) - .buckets(buckets); - let underlying = PrometheusHistogramVec::new(histogram_opts, &label_names) + .const_labels(build_prometheus_labels(const_labels)) + .buckets(buckets.clone()); + let prom = prometheus::HistogramVec::new(histogram_opts, &label_names) .expect("failed to create histogram vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register histogram vec"); - let collector = Box::new(underlying.clone()); - prometheus::register(collector).expect("failed to register histogram vec"); - - HistogramVec { underlying } + HistogramVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_histogram_otel_state( + name, + subsystem, + help, + buckets.clone(), + )), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } pub struct GaugeGuard<'a> { @@ -291,8 +704,6 @@ impl Drop for OwnedGaugeGuard { pub fn metrics_text_payload() -> Result { let metric_families = prometheus::gather(); - // Arbitrary non-zero size in order to skip a bunch of - // buffer growth-reallocations when encoding metrics. let mut buffer = String::with_capacity(1024); let encoder = TextEncoder::new(); match encoder.encode_utf8(&metric_families, &mut buffer) { @@ -451,3 +862,384 @@ pub fn index_label(index_id: &str) -> &str { } pub static MEMORY_METRICS: LazyLock = LazyLock::new(MemoryMetrics::default); + +#[cfg(test)] +mod tests { + use std::sync::OnceLock; + + use opentelemetry::metrics::MeterProvider; + use opentelemetry_sdk::metrics::data::{ + AggregatedMetrics, HistogramDataPoint, MetricData, ResourceMetrics, + }; + use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider}; + use serial_test::serial; + + use super::*; + + static TEST_OTEL_EXPORTER: OnceLock = OnceLock::new(); + static TEST_OTEL_PROVIDER: OnceLock = OnceLock::new(); + + fn ensure_test_otel_provider() -> (&'static InMemoryMetricExporter, &'static SdkMeterProvider) { + let exporter = TEST_OTEL_EXPORTER.get_or_init(InMemoryMetricExporter::default); + let provider = TEST_OTEL_PROVIDER.get_or_init(|| { + let reader = PeriodicReader::builder(exporter.clone()).build(); + let provider = SdkMeterProvider::builder().with_reader(reader).build(); + install_otel_meter(provider.meter("quickwit-tests")); + provider + }); + (exporter, provider) + } + + fn find_metric_data<'a>( + metrics: &'a [ResourceMetrics], + metric_name: &str, + ) -> Option<&'a AggregatedMetrics> { + metrics + .iter() + .flat_map(|resource_metrics| resource_metrics.scope_metrics()) + .flat_map(|scope_metrics| scope_metrics.metrics()) + .find(|metric| metric.name() == metric_name) + .map(|metric| metric.data()) + } + + fn flush_and_read_metric( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + read: impl FnOnce(&AggregatedMetrics) -> T, + ) -> T { + provider.force_flush().unwrap(); + let exported_metrics = exporter.get_finished_metrics().unwrap(); + let data = find_metric_data(&exported_metrics, metric_name) + .unwrap_or_else(|| panic!("metric '{metric_name}' should be exported")); + read(data) + } + + fn flush_and_get_counter_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> u64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::U64(MetricData::Sum(sum_data)) = data else { + panic!("expected u64 sum metric"); + }; + sum_data + .data_points() + .next() + .expect("should have one data point") + .value() + }) + } + + fn flush_and_get_gauge_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> i64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::I64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected i64 gauge metric"); + }; + gauge_data + .data_points() + .last() + .expect("should have at least one data point") + .value() + }) + } + + fn flush_and_get_histogram_data_point( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> HistogramDataPoint { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = data else { + panic!("expected f64 histogram metric"); + }; + histogram_data + .data_points() + .next() + .expect("should have one data point") + .clone() + }) + } + + fn flush_and_get_float_gauge_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> f64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::F64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected f64 gauge metric"); + }; + gauge_data + .data_points() + .last() + .expect("should have at least one data point") + .value() + }) + } + + #[test] + #[serial] + fn test_counter() { + let (exporter, provider) = ensure_test_otel_provider(); + + // inc + let counter = new_counter("test_ctr_inc", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc(); + assert_eq!(counter.get(), 1); + let otel_value = + flush_and_get_counter_value(exporter, provider, "quickwit_test_test_ctr_inc"); + assert_eq!(otel_value, 1); + + // inc_by + let counter = new_counter("test_ctr_inc_by", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc_by(5); + assert_eq!(counter.get(), 5); + let otel_value = + flush_and_get_counter_value(exporter, provider, "quickwit_test_test_ctr_inc_by"); + assert_eq!(otel_value, 5); + } + + #[test] + #[serial] + fn test_gauge() { + let (exporter, provider) = ensure_test_otel_provider(); + + // set + let gauge = new_gauge("test_gauge_set", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.set(10); + assert_eq!(gauge.get(), 10); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_set"); + assert_eq!(otel_value, 10); + + // inc + let gauge = new_gauge("test_gauge_inc", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.inc(); + assert_eq!(gauge.get(), 1); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_inc"); + assert_eq!(otel_value, 1); + + // dec + let gauge = new_gauge("test_gauge_dec", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.dec(); + assert_eq!(gauge.get(), -1); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_dec"); + assert_eq!(otel_value, -1); + + // add + let gauge = new_gauge("test_gauge_add", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.add(15); + assert_eq!(gauge.get(), 15); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_add"); + assert_eq!(otel_value, 15); + + // sub + let gauge = new_gauge("test_gauge_sub", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.sub(3); + assert_eq!(gauge.get(), -3); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_sub"); + assert_eq!(otel_value, -3); + } + + #[test] + #[serial] + fn test_float_gauge_set() { + let (exporter, provider) = ensure_test_otel_provider(); + let gauge = new_float_gauge("test_float_gauge", "test", "test", &[]); + assert_eq!(gauge.get(), 0.0); + gauge.set(1.23); + assert_eq!(gauge.get(), 1.23); + + let otel_value = + flush_and_get_float_gauge_value(exporter, provider, "quickwit_test_test_float_gauge"); + assert_eq!(otel_value, 1.23); + } + + #[test] + #[serial] + fn test_histogram_observe() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_obs", "test", "test", vec![1.0, 5.0, 10.0]); + histogram.observe(2.5); + histogram.observe(7.0); + + let dp = + flush_and_get_histogram_data_point(exporter, provider, "quickwit_test_test_hist_obs"); + assert_eq!(dp.count(), 2); + assert_eq!(dp.max().unwrap(), 7.0); + assert_eq!(dp.min().unwrap(), 2.5); + assert_eq!(dp.bounds().collect::>(), vec![1.0, 5.0, 10.0]); + } + + #[test] + #[serial] + fn test_histogram_vec_observe() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram_vec = new_histogram_vec( + "test_hist_vec_obs", + "test", + "test", + &[], + ["method"], + vec![0.5, 1.5, 3.0], + ); + histogram_vec.with_label_values(["GET"]).observe(1.0); + + flush_and_read_metric( + exporter, + provider, + "quickwit_test_test_hist_vec_obs", + |data| { + let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = data else { + panic!("expected f64 histogram metric"); + }; + let data_point = histogram_data + .data_points() + .find(|point| { + point + .attributes() + .any(|kv| kv.key.as_str() == "method" && kv.value.as_str() == "GET") + }) + .expect("should contain the labelled data point"); + assert_eq!(data_point.count(), 1); + assert_eq!(data_point.min().unwrap(), 1.0); + }, + ); + } + + #[test] + #[serial] + fn test_histogram_timer_drop_observes() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_timer_drop", "test", "test", vec![1.0, 5.0, 10.0]); + { + let _timer = histogram.start_timer(); + } + + let dp = flush_and_get_histogram_data_point( + exporter, + provider, + "quickwit_test_test_hist_timer_drop", + ); + assert_eq!(dp.count(), 1); + } + + #[test] + #[serial] + fn test_histogram_timer_observe_duration() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_timer_obs", "test", "test", vec![1.0, 5.0, 10.0]); + let timer = histogram.start_timer(); + timer.observe_duration(); + + let dp = flush_and_get_histogram_data_point( + exporter, + provider, + "quickwit_test_test_hist_timer_obs", + ); + assert_eq!(dp.count(), 1); + } + + #[test] + #[serial] + fn test_counter_vec_with_label_values() { + let (exporter, provider) = ensure_test_otel_provider(); + let vec = new_counter_vec("test_cvec", "test", "test", &[], ["method"]); + let post_counter = vec.with_label_values(["POST"]); + post_counter.inc_by(3); + assert_eq!(post_counter.get(), 3); + + flush_and_read_metric(exporter, provider, "quickwit_test_test_cvec", |data| { + let AggregatedMetrics::U64(MetricData::Sum(sum_data)) = data else { + panic!("expected u64 sum metric"); + }; + let post_value = sum_data + .data_points() + .find(|dp| { + dp.attributes() + .any(|kv| kv.key.as_str() == "method" && kv.value.as_str() == "POST") + }) + .expect("should contain POST data point") + .value(); + assert_eq!(post_value, 3); + }); + } + + #[test] + #[serial] + fn test_gauge_vec_with_label_values() { + let (exporter, provider) = ensure_test_otel_provider(); + let vec = new_gauge_vec("test_gvec", "test", "test", &[], ["pool"]); + let indexing = vec.with_label_values(["indexing"]); + indexing.set(10); + assert_eq!(indexing.get(), 10); + + flush_and_read_metric(exporter, provider, "quickwit_test_test_gvec", |data| { + let AggregatedMetrics::I64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected i64 gauge metric"); + }; + let indexing_value = gauge_data + .data_points() + .find(|dp| { + dp.attributes() + .any(|kv| kv.key.as_str() == "pool" && kv.value.as_str() == "indexing") + }) + .expect("should contain pool=indexing data point") + .value(); + assert_eq!(indexing_value, 10); + }); + } + + #[test] + fn test_gauge_guard_add_sub_drop() { + let gauge = new_gauge("test_guard", "test", "test", &[]); + { + let mut guard = GaugeGuard::from_gauge(&gauge); + guard.add(5); + assert_eq!(gauge.get(), 5); + guard.sub(2); + assert_eq!(gauge.get(), 3); + } + // After drop, the delta (3) is subtracted. + assert_eq!(gauge.get(), 0); + } + + #[test] + fn test_owned_gauge_guard_add_sub_drop() { + let gauge = new_gauge("test_owned_guard", "test", "test", &[]); + { + let mut guard = OwnedGaugeGuard::from_gauge(gauge.clone()); + guard.add(5); + assert_eq!(gauge.get(), 5); + guard.sub(2); + assert_eq!(gauge.get(), 3); + } + assert_eq!(gauge.get(), 0); + } + + #[test] + fn test_metrics_text_payload_contains_registered_metrics() { + let counter = new_counter("test_payload_ctr", "test", "test", &[]); + counter.inc_by(42); + let payload = metrics_text_payload().unwrap(); + assert!(payload.contains("quickwit_test_test_payload_ctr")); + assert!(payload.contains("42")); + } +} diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index ee93ab7aba9..b34c3b477c7 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -17,11 +17,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use once_cell::sync::OnceCell; -use prometheus::{Gauge, IntCounter, IntGauge}; use tokio::runtime::Runtime; use tokio_metrics::{RuntimeMetrics, RuntimeMonitor}; -use crate::metrics::{new_counter, new_float_gauge, new_gauge}; +use crate::metrics::{Gauge, IntCounter, IntGauge, new_counter, new_float_gauge, new_gauge}; static RUNTIMES: OnceCell> = OnceCell::new(); diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index e0fc126b465..3740a7f0527 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -18,12 +18,11 @@ use std::pin::Pin; use bytesize::ByteSize; use futures::{Stream, StreamExt, TryStreamExt, stream}; -use prometheus::IntGauge; use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream}; use tracing::warn; -use crate::metrics::GaugeGuard; +use crate::metrics::{GaugeGuard, IntGauge}; use crate::tower::RpcName; pub type BoxStream = Pin + Send + Unpin + 'static>>; diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index 18201196cf9..17fcfc3415d 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -17,11 +17,10 @@ use std::sync::Arc; use futures::{Future, TryFutureExt}; use once_cell::sync::Lazy; -use prometheus::IntGauge; use tokio::sync::oneshot; use tracing::error; -use crate::metrics::{GaugeGuard, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; +use crate::metrics::{GaugeGuard, IntGauge, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; /// An executor backed by a thread pool to run CPU-intensive tasks. /// diff --git a/quickwit/quickwit-common/src/tower/circuit_breaker.rs b/quickwit/quickwit-common/src/tower/circuit_breaker.rs index 09ada07e187..3d7f2d272cf 100644 --- a/quickwit/quickwit-common/src/tower/circuit_breaker.rs +++ b/quickwit/quickwit-common/src/tower/circuit_breaker.rs @@ -19,10 +19,11 @@ use std::task::{Context, Poll}; use std::time::Duration; use pin_project::pin_project; -use prometheus::IntCounter; use tokio::time::Instant; use tower::{Layer, Service}; +use crate::metrics::IntCounter; + /// The circuit breaker layer implements the [circuit breaker pattern](https://martinfowler.com/bliki/CircuitBreaker.html). /// /// It counts the errors emitted by the inner service, and if the number of errors exceeds a certain @@ -49,7 +50,7 @@ pub struct CircuitBreakerLayer { time_window: Duration, timeout: Duration, evaluator: Evaluator, - circuit_break_total: prometheus::IntCounter, + circuit_break_total: IntCounter, } pub trait CircuitBreakerEvaluator: Clone { @@ -61,7 +62,7 @@ pub trait CircuitBreakerEvaluator: Clone { self, max_num_errors_per_secs: u32, timeout: Duration, - circuit_break_total: prometheus::IntCounter, + circuit_break_total: IntCounter, ) -> CircuitBreakerLayer { CircuitBreakerLayer { max_error_count_per_time_window: max_num_errors_per_secs, @@ -301,8 +302,12 @@ mod tests { const TIMEOUT: Duration = Duration::from_millis(500); - let int_counter: prometheus::IntCounter = - IntCounter::new("circuit_break_total_test", "test circuit breaker counter").unwrap(); + let int_counter = crate::metrics::new_counter( + "circuit_break_total_test", + "test circuit breaker counter", + "test", + &[], + ); let mut service = ServiceBuilder::new() .layer(TestCircuitBreakerEvaluator.make_layer(10, TIMEOUT, int_counter)) .service_fn(|_| async {