From cd77bf2672d73e12f49dbd71f848fb4440c82d2b Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Tue, 31 Mar 2026 17:54:42 +0200 Subject: [PATCH 1/2] Support configurable OTLP exporter protocol for traces and logs --- LICENSE-3rdparty.csv | 2 + quickwit/Cargo.lock | 17 ++++ quickwit/Cargo.toml | 2 +- quickwit/quickwit-cli/src/logger.rs | 115 ++++++++++++++++++++++++++-- 4 files changed, 127 insertions(+), 9 deletions(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 1f51678111f..3104b943436 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -154,6 +154,7 @@ concurrent-queue,https://github.com/smol-rs/concurrent-queue,Apache-2.0 OR MIT," console,https://github.com/console-rs/console,MIT,The console Authors console-api,https://github.com/tokio-rs/console,MIT,"Eliza Weisman , Tokio Contributors " console-subscriber,https://github.com/tokio-rs/console,MIT,"Eliza Weisman , Tokio Contributors " +const-hex,https://github.com/danipopes/const-hex,MIT OR Apache-2.0,DaniPopes <57450786+DaniPopes@users.noreply.github.com> const-oid,https://github.com/RustCrypto/formats/tree/master/const-oid,Apache-2.0 OR MIT,RustCrypto Developers const-random,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck const-random-macro,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck @@ -545,6 +546,7 @@ proc-macro2-diagnostics,https://github.com/SergioBenitez/proc-macro2-diagnostics procfs,https://github.com/eminence/procfs,MIT OR Apache-2.0,Andrew Chin procfs-core,https://github.com/eminence/procfs,MIT OR Apache-2.0,Andrew Chin prometheus,https://github.com/tikv/rust-prometheus,Apache-2.0,"overvenus@gmail.com, siddontang@gmail.com, vistaswx@gmail.com" +proptest,https://github.com/proptest-rs/proptest,MIT OR Apache-2.0,Jason Lingle prost,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " prost-build,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " prost-derive,https://github.com/tokio-rs/prost,Apache-2.0,"Dan Burkert , Lucio Franco , Casper Meijn , Tokio Contributors " diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d099964a383..89eefe2bed7 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -2071,6 +2071,18 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-hex" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531185e432bb31db1ecda541e9e7ab21468d4d844ad7505e0546a49b4945d49b" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "proptest", + "serde_core", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -5664,6 +5676,7 @@ dependencies = [ "opentelemetry_sdk", "prost 0.14.3", "reqwest", + "serde_json", "thiserror 2.0.18", "tokio", "tonic 0.14.5", @@ -5676,9 +5689,13 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ + "base64 0.22.1", + "const-hex", "opentelemetry", "opentelemetry_sdk", "prost 0.14.3", + "serde", + "serde_json", "tonic 0.14.5", "tonic-prost", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index d252652b27f..9242390d898 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -169,7 +169,7 @@ openssl-probe = "0.1" opentelemetry = "0.31" opentelemetry-appender-tracing = "0.31" opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic"] } +opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic", "http-json"] } ouroboros = "0.18" parquet = { version = "57", default-features = false, features = ["arrow", "zstd", "snap", "variant_experimental"] } percent-encoding = "2.3" diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index e1e60a14f93..7583b8ea6e5 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -19,6 +19,7 @@ use anyhow::Context; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{Protocol as OtlpWireProtocol, WithExportConfig}; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; @@ -39,6 +40,54 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; use crate::QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum OtlpProtocol { + Grpc, + HttpProtobuf, + HttpJson, +} + +fn parse_otlp_protocol(protocol_str: &str) -> anyhow::Result { + const OTLP_PROTOCOL_GRPC: &str = "grpc"; + const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; + const OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; + + match protocol_str { + OTLP_PROTOCOL_GRPC => Ok(OtlpProtocol::Grpc), + OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(OtlpProtocol::HttpProtobuf), + OTLP_PROTOCOL_HTTP_JSON => Ok(OtlpProtocol::HttpJson), + other => anyhow::bail!( + "unsupported OTLP protocol `{other}`, supported values are `{OTLP_PROTOCOL_GRPC}`, \ + `{OTLP_PROTOCOL_HTTP_PROTOBUF}` and `{OTLP_PROTOCOL_HTTP_JSON}`" + ), + } +} + +/// Resolves the OTLP protocol from candidates in priority order, defaulting to gRPC. +fn resolve_otlp_protocol(candidates: &[Option<&str>]) -> anyhow::Result { + match candidates.iter().flatten().next() { + Some(protocol_str) => parse_otlp_protocol(protocol_str), + None => Ok(OtlpProtocol::Grpc), + } +} + +macro_rules! build_otlp_exporter { + ($builder:expr, $protocol:expr) => { + match $protocol { + OtlpProtocol::Grpc => $builder.with_tonic().build(), + OtlpProtocol::HttpProtobuf => $builder + .with_http() + .with_protocol(OtlpWireProtocol::HttpBinary) + .build(), + OtlpProtocol::HttpJson => $builder + .with_http() + .with_protocol(OtlpWireProtocol::HttpJson) + .build(), + } + }; +} + #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; @@ -98,10 +147,16 @@ pub fn setup_logging_and_tracing( // Note on disabling ANSI characters: setting the ansi boolean on event format is insufficient. // It is thus set on layers, see https://github.com/tokio-rs/tracing/issues/1817 let provider_opt = if get_bool_from_env(QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY, false) { - let span_exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .build() - .context("failed to initialize OpenTelemetry OTLP exporter")?; + let global_protocol = env::var("OTEL_EXPORTER_OTLP_PROTOCOL").ok(); + let traces_protocol = resolve_otlp_protocol(&[ + env::var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL") + .ok() + .as_deref(), + global_protocol.as_deref(), + ])?; + let span_exporter = + build_otlp_exporter!(opentelemetry_otlp::SpanExporter::builder(), traces_protocol) + .context("failed to initialize OTLP traces exporter")?; let span_processor = trace::BatchSpanProcessor::builder(span_exporter) .with_batch_config( BatchConfigBuilder::default() @@ -117,10 +172,13 @@ pub fn setup_logging_and_tracing( .with_attribute(KeyValue::new("service.version", build_info.version.clone())) .build(); - let logs_exporter = opentelemetry_otlp::LogExporter::builder() - .with_tonic() - .build() - .context("failed to initialize OpenTelemetry OTLP logs")?; + let logs_protocol = resolve_otlp_protocol(&[ + env::var("OTEL_EXPORTER_OTLP_LOGS_PROTOCOL").ok().as_deref(), + global_protocol.as_deref(), + ])?; + let logs_exporter = + build_otlp_exporter!(opentelemetry_otlp::LogExporter::builder(), logs_protocol) + .context("failed to initialize OTLP logs exporter")?; let logger_provider = SdkLoggerProvider::builder() .with_resource(resource.clone()) @@ -363,3 +421,44 @@ pub(super) mod jemalloc_profiled { )) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resolve_otlp_protocol_defaults_to_grpc() { + let protocol = resolve_otlp_protocol(&[None, None]).unwrap(); + assert_eq!(protocol, OtlpProtocol::Grpc); + } + + #[test] + fn test_resolve_otlp_protocol_first_candidate_takes_priority() { + let protocol = resolve_otlp_protocol(&[Some("http/protobuf"), Some("grpc")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::HttpProtobuf); + } + + #[test] + fn test_resolve_otlp_protocol_falls_back_to_later_candidate() { + let protocol = resolve_otlp_protocol(&[None, Some("http/protobuf")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::HttpProtobuf); + } + + #[test] + fn test_resolve_otlp_protocol_grpc_explicit() { + let protocol = resolve_otlp_protocol(&[Some("grpc")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::Grpc); + } + + #[test] + fn test_resolve_otlp_protocol_http_json_explicit() { + let protocol = resolve_otlp_protocol(&[Some("http/json")]).unwrap(); + assert_eq!(protocol, OtlpProtocol::HttpJson); + } + + #[test] + fn test_resolve_otlp_protocol_rejects_unsupported_value() { + let result = resolve_otlp_protocol(&[Some("http/xml")]); + assert!(result.is_err()); + } +} From 5c76b24f751f991d70e4158dd4e80cf49412d6ba Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 2 Apr 2026 16:11:56 -0400 Subject: [PATCH 2/2] Refactor --- quickwit/quickwit-cli/Cargo.toml | 1 + quickwit/quickwit-cli/src/logger.rs | 163 ++++++++++++++-------------- quickwit/quickwit-config/Cargo.toml | 2 +- 3 files changed, 82 insertions(+), 84 deletions(-) diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml index 5d9dc955107..b40bf5585d3 100644 --- a/quickwit/quickwit-cli/Cargo.toml +++ b/quickwit/quickwit-cli/Cargo.toml @@ -79,6 +79,7 @@ quickwit-actors = { workspace = true, features = ["testsuite"] } quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-config = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } +quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } [features] diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 6ce2276dbad..f16e7db4ce5 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; use std::sync::Arc; use std::{env, fmt}; @@ -19,12 +20,14 @@ use anyhow::Context; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; -use opentelemetry_otlp::{Protocol as OtlpWireProtocol, WithExportConfig}; +use opentelemetry_otlp::{ + LogExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, +}; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; use opentelemetry_sdk::{Resource, trace}; -use quickwit_common::{get_bool_from_env, get_from_env_opt}; +use quickwit_common::{get_bool_from_env, get_from_env, get_from_env_opt}; use quickwit_serve::{BuildInfo, EnvFilterReloadFn}; use time::format_description::BorrowedFormatItem; use tracing::{Event, Level, Subscriber}; @@ -48,44 +51,57 @@ enum OtlpProtocol { HttpJson, } -fn parse_otlp_protocol(protocol_str: &str) -> anyhow::Result { - const OTLP_PROTOCOL_GRPC: &str = "grpc"; - const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; - const OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; - - match protocol_str { - OTLP_PROTOCOL_GRPC => Ok(OtlpProtocol::Grpc), - OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(OtlpProtocol::HttpProtobuf), - OTLP_PROTOCOL_HTTP_JSON => Ok(OtlpProtocol::HttpJson), - other => anyhow::bail!( - "unsupported OTLP protocol `{other}`, supported values are `{OTLP_PROTOCOL_GRPC}`, \ - `{OTLP_PROTOCOL_HTTP_PROTOBUF}` and `{OTLP_PROTOCOL_HTTP_JSON}`" - ), - } -} - -/// Resolves the OTLP protocol from candidates in priority order, defaulting to gRPC. -fn resolve_otlp_protocol(candidates: &[Option<&str>]) -> anyhow::Result { - match candidates.iter().flatten().next() { - Some(protocol_str) => parse_otlp_protocol(protocol_str), - None => Ok(OtlpProtocol::Grpc), +impl OtlpProtocol { + fn log_exporter(&self) -> anyhow::Result { + match self { + OtlpProtocol::Grpc => LogExporter::builder().with_tonic().build(), + OtlpProtocol::HttpProtobuf => LogExporter::builder() + .with_http() + .with_protocol(OtlpWireProtocol::HttpBinary) + .build(), + OtlpProtocol::HttpJson => LogExporter::builder() + .with_http() + .with_protocol(OtlpWireProtocol::HttpJson) + .build(), + } + .context("failed to initialize OTLP logs exporter") } -} -macro_rules! build_otlp_exporter { - ($builder:expr, $protocol:expr) => { - match $protocol { - OtlpProtocol::Grpc => $builder.with_tonic().build(), - OtlpProtocol::HttpProtobuf => $builder + fn span_exporter(&self) -> anyhow::Result { + match self { + OtlpProtocol::Grpc => SpanExporter::builder().with_tonic().build(), + OtlpProtocol::HttpProtobuf => SpanExporter::builder() .with_http() .with_protocol(OtlpWireProtocol::HttpBinary) .build(), - OtlpProtocol::HttpJson => $builder + OtlpProtocol::HttpJson => SpanExporter::builder() .with_http() .with_protocol(OtlpWireProtocol::HttpJson) .build(), } - }; + .context("failed to initialize OTLP traces exporter") + } +} + +impl FromStr for OtlpProtocol { + type Err = anyhow::Error; + + fn from_str(protocol_str: &str) -> anyhow::Result { + const OTLP_PROTOCOL_GRPC: &str = "grpc"; + const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; + const OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; + + match protocol_str { + OTLP_PROTOCOL_GRPC => Ok(OtlpProtocol::Grpc), + OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(OtlpProtocol::HttpProtobuf), + OTLP_PROTOCOL_HTTP_JSON => Ok(OtlpProtocol::HttpJson), + other => anyhow::bail!( + "unsupported OTLP protocol `{other}`, supported values are \ + `{OTLP_PROTOCOL_GRPC}`, `{OTLP_PROTOCOL_HTTP_PROTOBUF}` and \ + `{OTLP_PROTOCOL_HTTP_JSON}`" + ), + } + } } #[cfg(feature = "tokio-console")] @@ -147,16 +163,19 @@ pub fn setup_logging_and_tracing( // Note on disabling ANSI characters: setting the ansi boolean on event format is insufficient. // It is thus set on layers, see https://github.com/tokio-rs/tracing/issues/1817 let provider_opt = if get_bool_from_env(QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY, false) { - let global_protocol = env::var("OTEL_EXPORTER_OTLP_PROTOCOL").ok(); - let traces_protocol = resolve_otlp_protocol(&[ - env::var("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL") - .ok() - .as_deref(), - global_protocol.as_deref(), - ])?; - let span_exporter = - build_otlp_exporter!(opentelemetry_otlp::SpanExporter::builder(), traces_protocol) - .context("failed to initialize OTLP traces exporter")?; + let global_protocol_str = + get_from_env("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc".to_string(), false); + let global_protocol = OtlpProtocol::from_str(&global_protocol_str)?; + + let traces_protocol_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL", false); + let traces_protocol = traces_protocol_opt + .as_deref() + .map(OtlpProtocol::from_str) + .transpose()? + .unwrap_or(global_protocol); + + let span_exporter = traces_protocol.span_exporter()?; let span_processor = trace::BatchSpanProcessor::builder(span_exporter) .with_batch_config( BatchConfigBuilder::default() @@ -172,17 +191,17 @@ pub fn setup_logging_and_tracing( .with_attribute(KeyValue::new("service.version", build_info.version.clone())) .build(); - let logs_protocol = resolve_otlp_protocol(&[ - env::var("OTEL_EXPORTER_OTLP_LOGS_PROTOCOL").ok().as_deref(), - global_protocol.as_deref(), - ])?; - let logs_exporter = - build_otlp_exporter!(opentelemetry_otlp::LogExporter::builder(), logs_protocol) - .context("failed to initialize OTLP logs exporter")?; - + let logs_protocol_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_LOGS_PROTOCOL", false); + let logs_protocol = logs_protocol_opt + .as_deref() + .map(OtlpProtocol::from_str) + .transpose()? + .unwrap_or(global_protocol); + let log_exporter = logs_protocol.log_exporter()?; let logger_provider = SdkLoggerProvider::builder() .with_resource(resource.clone()) - .with_batch_exporter(logs_exporter) + .with_batch_exporter(log_exporter) .build(); let tracing_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() @@ -489,39 +508,17 @@ mod tests { use super::*; #[test] - fn test_resolve_otlp_protocol_defaults_to_grpc() { - let protocol = resolve_otlp_protocol(&[None, None]).unwrap(); - assert_eq!(protocol, OtlpProtocol::Grpc); - } - - #[test] - fn test_resolve_otlp_protocol_first_candidate_takes_priority() { - let protocol = resolve_otlp_protocol(&[Some("http/protobuf"), Some("grpc")]).unwrap(); - assert_eq!(protocol, OtlpProtocol::HttpProtobuf); - } - - #[test] - fn test_resolve_otlp_protocol_falls_back_to_later_candidate() { - let protocol = resolve_otlp_protocol(&[None, Some("http/protobuf")]).unwrap(); - assert_eq!(protocol, OtlpProtocol::HttpProtobuf); - } - - #[test] - fn test_resolve_otlp_protocol_grpc_explicit() { - let protocol = resolve_otlp_protocol(&[Some("grpc")]).unwrap(); - assert_eq!(protocol, OtlpProtocol::Grpc); - } - - #[test] - fn test_resolve_otlp_protocol_http_json_explicit() { - let protocol = resolve_otlp_protocol(&[Some("http/json")]).unwrap(); - assert_eq!(protocol, OtlpProtocol::HttpJson); - } - - #[test] - fn test_resolve_otlp_protocol_rejects_unsupported_value() { - let result = resolve_otlp_protocol(&[Some("http/xml")]); - assert!(result.is_err()); + fn test_otlp_protocol_from_str() { + assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc); + assert_eq!( + OtlpProtocol::from_str("http/protobuf").unwrap(), + OtlpProtocol::HttpProtobuf + ); + assert_eq!( + OtlpProtocol::from_str("http/json").unwrap(), + OtlpProtocol::HttpJson + ); + assert!(OtlpProtocol::from_str("http/xml").is_err()); } /// A shared buffer writer for capturing log output in tests. diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index a1877661490..59e98ad3aa1 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -42,8 +42,8 @@ quickwit-proto = { workspace = true } [dev-dependencies] tokio = { workspace = true } -quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-common = { workspace = true, features = ["testsuite"] } +quickwit-proto = { workspace = true, features = ["testsuite"] } [features] testsuite = []