Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ mod tests {
.unwrap();
// At this point the actor was started and even processed a message entirely.
let backpressure_micros_counter =
IntCounter::new("test_counter", "help for test_counter").unwrap();
IntCounter::new("test_counter", "help for test_counter", "", &[]);
let wait_duration = Duration::from_millis(1);
let processed = mailbox
.send_message_with_backpressure_counter(
Expand All @@ -548,7 +548,7 @@ mod tests {
.await
.unwrap();
let backpressure_micros_counter =
IntCounter::new("test_counter", "help for test_counter").unwrap();
IntCounter::new("test_counter", "help for test_counter", "", &[]);
let wait_duration = Duration::from_millis(1);
mailbox
.send_message_with_backpressure_counter(
Expand Down Expand Up @@ -580,7 +580,7 @@ mod tests {
.await
.unwrap();
let backpressure_micros_counter =
IntCounter::new("test_counter", "help for test_counter").unwrap();
IntCounter::new("test_counter", "help for test_counter", "", &[]);
let start = Instant::now();
mailbox
.ask_with_backpressure_counter(Duration::from_millis(1), None)
Expand Down
117 changes: 107 additions & 10 deletions quickwit/quickwit-cli/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ use std::sync::Arc;
use std::{env, fmt};

use anyhow::Context;
use opentelemetry::metrics::MeterProvider;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{
LogExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig,
LogExporter, MetricExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig,
};
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider};
use opentelemetry_sdk::{Resource, trace};
Expand Down Expand Up @@ -81,6 +83,26 @@ impl OtlpProtocol {
}
.context("failed to initialize OTLP traces exporter")
}

fn metric_exporter(&self, temporality: Temporality) -> anyhow::Result<MetricExporter> {
match self {
OtlpProtocol::Grpc => MetricExporter::builder()
.with_tonic()
.with_temporality(temporality)
.build(),
OtlpProtocol::HttpProtobuf => MetricExporter::builder()
.with_http()
.with_temporality(temporality)
.with_protocol(OtlpWireProtocol::HttpBinary)
.build(),
OtlpProtocol::HttpJson => MetricExporter::builder()
.with_http()
.with_temporality(temporality)
.with_protocol(OtlpWireProtocol::HttpJson)
.build(),
}
.context("failed to initialize OTLP metrics exporter")
}
}

impl FromStr for OtlpProtocol {
Expand All @@ -104,11 +126,60 @@ impl FromStr for OtlpProtocol {
}
}

struct OtlpMetricsTemporality(Temporality);

impl FromStr for OtlpMetricsTemporality {
type Err = anyhow::Error;

fn from_str(temporality_str: &str) -> anyhow::Result<Self> {
const TEMPORALITY_DELTA: &str = "delta";
const TEMPORALITY_LOWMEMORY: &str = "lowmemory";
const TEMPORALITY_CUMULATIVE: &str = "cumulative";

match temporality_str {
TEMPORALITY_DELTA => Ok(OtlpMetricsTemporality(Temporality::Delta)),
TEMPORALITY_LOWMEMORY => Ok(OtlpMetricsTemporality(Temporality::LowMemory)),
TEMPORALITY_CUMULATIVE => Ok(OtlpMetricsTemporality(Temporality::Cumulative)),
other => anyhow::bail!(
"unsupported OTLP metrics temporality `{other}`, supported values are \
`{TEMPORALITY_DELTA}`, `{TEMPORALITY_LOWMEMORY}` and `{TEMPORALITY_CUMULATIVE}`"
),
}
}
}

impl From<OtlpMetricsTemporality> for Temporality {
fn from(t: OtlpMetricsTemporality) -> Self {
t.0
}
}

pub struct TelemetryProviders {
tracer_provider: SdkTracerProvider,
logger_provider: SdkLoggerProvider,
meter_provider: SdkMeterProvider,
}

impl TelemetryProviders {
pub fn shutdown(self) -> anyhow::Result<()> {
self.tracer_provider
.shutdown()
.context("failed to shutdown OpenTelemetry tracer provider")?;
self.logger_provider
.shutdown()
.context("failed to shutdown OpenTelemetry logger provider")?;
self.meter_provider
.shutdown()
.context("failed to shutdown OpenTelemetry meter provider")?;
Ok(())
}
}

#[cfg(feature = "tokio-console")]
use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY;

/// Load the default logging filter from the environment. The filter can later
/// be updated using the result callback of [setup_logging_and_tracing].
/// be updated using the result callback of [init_telemetry_providers].
fn startup_env_filter(level: Level) -> anyhow::Result<EnvFilter> {
let env_filter = env::var("RUST_LOG")
.map(|_| EnvFilter::from_default_env())
Expand All @@ -119,14 +190,11 @@ fn startup_env_filter(level: Level) -> anyhow::Result<EnvFilter> {

type ReloadLayer = tracing_subscriber::reload::Layer<EnvFilter, tracing_subscriber::Registry>;

pub fn setup_logging_and_tracing(
pub fn init_telemetry_providers(
level: Level,
ansi_colors: bool,
build_info: &BuildInfo,
) -> anyhow::Result<(
EnvFilterReloadFn,
Option<(SdkTracerProvider, SdkLoggerProvider)>,
)> {
) -> anyhow::Result<(EnvFilterReloadFn, Option<TelemetryProviders>)> {
#[cfg(feature = "tokio-console")]
{
if get_bool_from_env(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY, false) {
Expand Down Expand Up @@ -204,12 +272,37 @@ pub fn setup_logging_and_tracing(
.with_batch_exporter(log_exporter)
.build();

let tracing_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
let metrics_protocol_opt =
get_from_env_opt::<String>("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", false);
let metrics_protocol = metrics_protocol_opt
.as_deref()
.map(OtlpProtocol::from_str)
.transpose()?
.unwrap_or(global_protocol);
let temporality_opt =
get_from_env_opt::<String>("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE", false);
let temporality = temporality_opt
.as_deref()
.map(OtlpMetricsTemporality::from_str)
.transpose()?
.map(Temporality::from)
.unwrap_or(Temporality::Cumulative);
let metric_exporter = metrics_protocol.metric_exporter(temporality)?;

let meter_provider = SdkMeterProvider::builder()
.with_resource(resource.clone())
.with_periodic_exporter(metric_exporter)
.build();

let meter = meter_provider.meter("quickwit");
quickwit_common::metrics::install_otel_meter(meter);

let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_span_processor(span_processor)
.with_resource(resource)
.build();

let tracer = tracing_provider.tracer("quickwit");
let tracer = tracer_provider.tracer("quickwit");
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);

// Bridge between tracing logs and otel tracing events
Expand All @@ -220,7 +313,11 @@ pub fn setup_logging_and_tracing(
.with(logs_otel_layer)
.try_init()
.context("failed to register tracing subscriber")?;
Some((tracing_provider, logger_provider))
Some(TelemetryProviders {
tracer_provider,
logger_provider,
meter_provider,
})
} else {
registry
.try_init()
Expand Down
15 changes: 5 additions & 10 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use quickwit_cli::checklist::RED_COLOR;
use quickwit_cli::cli::{CliCommand, build_cli};
#[cfg(feature = "jemalloc")]
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
use quickwit_cli::logger::setup_logging_and_tracing;
use quickwit_cli::logger::init_telemetry_providers;
use quickwit_cli::{busy_detector, install_default_crypto_ring_provider};
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
use quickwit_serve::BuildInfo;
Expand Down Expand Up @@ -98,8 +98,8 @@ async fn main_impl() -> anyhow::Result<()> {
start_jemalloc_metrics_loop();

let build_info = BuildInfo::get();
let (env_filter_reload_fn, tracer_provider_opt) =
setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?;
let (env_filter_reload_fn, telemetry_providers) =
init_telemetry_providers(command.default_log_level(), ansi_colors, build_info)?;

let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await {
error!(error=%command_error, "command failed");
Expand All @@ -113,13 +113,8 @@ async fn main_impl() -> anyhow::Result<()> {
0
};

if let Some((trace_provider, logs_provider)) = tracer_provider_opt {
trace_provider
.shutdown()
.context("failed to shutdown OpenTelemetry tracer provider")?;
logs_provider
.shutdown()
.context("failed to shutdown OpenTelemetry logs provider")?;
if let Some(providers) = telemetry_providers {
providers.shutdown()?;
}

std::process::exit(return_code)
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ hyper = { workspace = true }
hyper-util = { workspace = true, optional = true }
itertools = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
pin-project = { workspace = true }
pnet = { workspace = true }
prometheus = { workspace = true }
Expand Down Expand Up @@ -63,6 +64,7 @@ jemalloc-profiled = [

[dev-dependencies]
hyper-util = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["testing"] }
proptest = { workspace = true }
serde_json = { workspace = true }
serial_test = { workspace = true }
Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-common/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ use async_speed_limit::limiter::Consume;
use bytesize::ByteSize;
use once_cell::sync::Lazy;
use pin_project::pin_project;
use prometheus::IntCounter;
use tokio::io::AsyncWrite;

use crate::metrics::{IntCounterVec, new_counter_vec};
use crate::metrics::{IntCounter, IntCounterVec, new_counter_vec};
use crate::{KillSwitch, Progress, ProtectedZoneGuard};

// Max 1MB at a time.
Expand Down Expand Up @@ -99,7 +98,7 @@ pub struct IoControls {
impl Default for IoControls {
fn default() -> Self {
let default_bytes_counter =
IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap();
IntCounter::new("default_write_num_bytes", "Default write counter.", "", &[]);
IoControls {
throughput_limiter_opt: None,
progress: Progress::default(),
Expand Down
Loading
Loading