diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index a9bab75a7..a18195792 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -469,7 +469,8 @@ dependencies = [ "bytes", "chrono", "cookie", - "datadog-fips", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=28f796bf767fff56caf08153ade5cd80c8e8f705)", + "datadog-logs-agent", "datadog-protos", "ddsketch-agent", "dogstatsd", @@ -749,6 +750,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "datadog-fips" +version = "0.1.0" +source = "git+https://github.com/DataDog/serverless-components?rev=8cb5daa7679df7d0f814aad016b592288e91cea5#8cb5daa7679df7d0f814aad016b592288e91cea5" +dependencies = [ + "reqwest", + "tracing", +] + +[[package]] +name = "datadog-logs-agent" +version = "0.1.0" +source = "git+https://github.com/DataDog/serverless-components?rev=8cb5daa7679df7d0f814aad016b592288e91cea5#8cb5daa7679df7d0f814aad016b592288e91cea5" +dependencies = [ + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=8cb5daa7679df7d0f814aad016b592288e91cea5)", + "futures", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "reqwest", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", + "zstd", +] + [[package]] name = "datadog-protos" version = "0.1.0" diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index e6611236e..bc9dfbf69 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -78,6 +78,7 @@ libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" } dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "28f796bf767fff56caf08153ade5cd80c8e8f705", default-features = false } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "28f796bf767fff56caf08153ade5cd80c8e8f705", default-features = false } +datadog-logs-agent = { git = "https://github.com/DataDog/serverless-components", rev = "8cb5daa7679df7d0f814aad016b592288e91cea5", default-features = false } libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 3c2ef31cb..819eda7ff 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -45,13 +45,7 @@ use bottlecap::{ listener::Listener as LifecycleListener, }, logger, - logs::{ - agent::LogsAgent, - aggregator_service::{ - AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, - }, - flusher::LogsFlusher, - }, + logs::agent::LogsAgent, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, proxy::{interceptor, should_start_proxy}, secrets::decrypt, @@ -79,6 +73,10 @@ use bottlecap::{ }, }; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; +use datadog_logs_agent::{ + AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, + Destination, LogFlusher, LogFlusherConfig, LogsAdditionalEndpoint, +}; use decrypt::resolve_secrets; use dogstatsd::{ aggregator::{ @@ -306,7 +304,8 @@ async fn extension_loop_active( event_bus_tx.clone(), aws_config.is_managed_instance_mode(), &shared_client, - ); + ) + .await; let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd( tags_provider.clone(), @@ -1027,7 +1026,7 @@ fn setup_tag_provider( )) } -fn start_logs_agent( +async fn start_logs_agent( config: &Arc, api_key_factory: Arc, tags_provider: &Arc, @@ -1036,11 +1035,11 @@ fn start_logs_agent( client: &Client, ) -> ( Sender, - LogsFlusher, + LogFlusher, CancellationToken, LogsAggregatorHandle, ) { - let (aggregator_service, aggregator_handle) = LogsAggregatorService::default(); + let (aggregator_service, aggregator_handle) = LogsAggregatorService::new(); // Start service in background tokio::spawn(async move { aggregator_service.run().await; @@ -1062,12 +1061,37 @@ fn start_logs_agent( drop(agent); }); - let flusher = LogsFlusher::new( - api_key_factory, - aggregator_handle.clone(), - config.clone(), - client.clone(), - ); + let api_key = api_key_factory.get_api_key().await.unwrap_or_default(); + + let mode = if config.observability_pipelines_worker_logs_enabled { + Destination::ObservabilityPipelinesWorker { + url: config.observability_pipelines_worker_logs_url.clone(), + } + } else { + Destination::Datadog + }; + + let additional_endpoints: Vec = config + .logs_config_additional_endpoints + .iter() + .map(|ep| LogsAdditionalEndpoint { + api_key: ep.api_key.clone(), + url: format!("https://{}:{}/api/v2/logs", ep.host, ep.port), + is_reliable: ep.is_reliable, + }) + .collect(); + + let flusher_config = LogFlusherConfig { + api_key, + site: config.site.clone(), + mode, + additional_endpoints, + use_compression: config.logs_config_use_compression, + compression_level: config.logs_config_compression_level, + flush_timeout: std::time::Duration::from_secs(config.flush_timeout), + }; + + let flusher = LogFlusher::new(flusher_config, client.clone(), aggregator_handle.clone()); (tx, flusher, cancel_token, aggregator_handle) } diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index f48d2c8d2..33041a376 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -711,6 +711,7 @@ impl ConfigSource for EnvConfigSource { #[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics #[cfg(test)] mod tests { + #![allow(clippy::result_large_err)] use std::time::Duration; use super::*; diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 23fe110a3..0c9b2a49c 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -811,6 +811,7 @@ pub fn deserialize_optional_duration_from_seconds_ignore_zero<'de, D: Deserializ #[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics #[cfg(test)] pub mod tests { + #![allow(clippy::result_large_err)] use libdd_trace_obfuscation::replacer::parse_rules_from_string; use super::*; diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index 0934d9663..57bbbfd7d 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -742,6 +742,7 @@ impl ConfigSource for YamlConfigSource { #[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics #[cfg(test)] mod tests { + #![allow(clippy::result_large_err)] use std::path::Path; use std::time::Duration; diff --git a/bottlecap/src/flushing/handles.rs b/bottlecap/src/flushing/handles.rs index 61376ac4b..de36f07d8 100644 --- a/bottlecap/src/flushing/handles.rs +++ b/bottlecap/src/flushing/handles.rs @@ -27,7 +27,7 @@ pub struct MetricsRetryBatch { pub struct FlushHandles { /// Handles for trace flush operations. Returns failed traces for retry. pub trace_flush_handles: Vec>>, - /// Handles for log flush operations. Returns failed request builders for retry. + /// Handles for log flush operations. Returns builders for transient failures to retry next invocation. pub log_flush_handles: Vec>>, /// Handles for metrics flush operations. Returns batch info for retry. pub metric_flush_handles: Vec>, diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index bd9c66882..f7c07a9bb 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -8,8 +8,9 @@ use dogstatsd::{ aggregator::AggregatorHandle as MetricsAggregatorHandle, flusher::Flusher as MetricsFlusher, }; +use datadog_logs_agent::LogFlusher; + use crate::flushing::handles::{FlushHandles, MetricsRetryBatch}; -use crate::logs::flusher::LogsFlusher; use crate::traces::{ proxy_flusher::Flusher as ProxyFlusher, stats_flusher::StatsFlusher, trace_flusher::TraceFlusher, @@ -23,7 +24,7 @@ use crate::traces::{ /// - Performing blocking flushes (spawn + await) pub struct FlushingService { // Flushers - logs_flusher: LogsFlusher, + logs_flusher: LogFlusher, trace_flusher: Arc, stats_flusher: Arc, proxy_flusher: Arc, @@ -40,7 +41,7 @@ impl FlushingService { /// Creates a new `FlushingService` with the given flushers. #[must_use] pub fn new( - logs_flusher: LogsFlusher, + logs_flusher: LogFlusher, trace_flusher: Arc, stats_flusher: Arc, proxy_flusher: Arc, @@ -76,7 +77,7 @@ impl FlushingService { let lf = self.logs_flusher.clone(); self.handles .log_flush_handles - .push(tokio::spawn(async move { lf.flush(None).await })); + .push(tokio::spawn(async move { lf.flush(vec![]).await })); // Spawn traces flush let tf = self.trace_flusher.clone(); @@ -206,7 +207,7 @@ impl FlushingService { match item.try_clone() { Some(item_clone) => { joinset.spawn(async move { - lf.flush(Some(item_clone)).await; + lf.flush(vec![item_clone]).await; }); } None => { @@ -325,7 +326,7 @@ impl FlushingService { .collect(); tokio::join!( - self.logs_flusher.flush(None), + self.logs_flusher.flush(vec![]), futures::future::join_all(metrics_futures), self.trace_flusher.flush(None), self.stats_flusher.flush(force_stats, None), diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index 71e5a721e..44503c996 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -6,9 +6,10 @@ use tracing::debug; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; -use crate::logs::{aggregator_service::AggregatorHandle, processor::LogsProcessor}; +use crate::logs::processor::LogsProcessor; use crate::tags; use crate::{LAMBDA_RUNTIME_SLUG, config}; +use datadog_logs_agent::AggregatorHandle; const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100); diff --git a/bottlecap/src/logs/aggregator.rs b/bottlecap/src/logs/aggregator.rs deleted file mode 100644 index 6d4f8287a..000000000 --- a/bottlecap/src/logs/aggregator.rs +++ /dev/null @@ -1,227 +0,0 @@ -use std::collections::VecDeque; -use tracing::warn; - -use crate::logs::constants; - -/// Takes in log payloads and aggregates them into larger batches to be flushed to Datadog. -#[derive(Debug, Clone)] -pub struct Aggregator { - messages: VecDeque, - max_batch_entries_size: usize, - max_content_size_bytes: usize, - max_log_size_bytes: usize, - buffer: Vec, -} - -impl Default for Aggregator { - fn default() -> Self { - Aggregator { - messages: VecDeque::new(), - max_batch_entries_size: constants::MAX_BATCH_ENTRIES_SIZE, - max_content_size_bytes: constants::MAX_CONTENT_SIZE_BYTES, - max_log_size_bytes: constants::MAX_LOG_SIZE_BYTES, - buffer: Vec::with_capacity(constants::MAX_CONTENT_SIZE_BYTES), - } - } -} - -impl Aggregator { - #[allow(dead_code)] - #[allow(clippy::must_use_candidate)] - pub fn new( - max_batch_entries_size: usize, - max_content_size_bytes: usize, - max_log_size_bytes: usize, - ) -> Self { - Aggregator { - messages: VecDeque::new(), - max_batch_entries_size, - max_content_size_bytes, - max_log_size_bytes, - buffer: Vec::with_capacity(max_content_size_bytes), - } - } - - /// Takes in a batch of log payloads. - pub fn add_batch(&mut self, logs: Vec) { - for log in logs { - self.messages.push_back(log); - } - } - - /// Returns a batch of log payloads, subject to the max content size. - pub fn get_batch(&mut self) -> Vec { - self.buffer.extend(b"["); - - // Fill the batch with logs from the messages - for _ in 0..self.max_batch_entries_size { - if let Some(log) = self.messages.pop_front() { - // Check if the buffer will be full after adding the log - if self.buffer.len() + log.len() > self.max_content_size_bytes { - // Put the log back in the queue - self.messages.push_front(log); - break; - } - - if log.len() > self.max_log_size_bytes { - warn!( - "Log size exceeds the 1MB limit: {}, will be truncated by the backend.", - log.len() - ); - } - - self.buffer.extend(log.as_bytes()); - self.buffer.extend(b","); - } else { - break; - } - } - // Make sure we added at least one element - if self.buffer.len() > 1 { - // Remove the last comma and close bracket - self.buffer.pop(); - self.buffer.extend(b"]"); - } else { - // No elements, remove opening bracket - self.buffer.pop(); - } - - std::mem::take(&mut self.buffer) - } -} - -#[cfg(test)] -#[allow(clippy::unwrap_used)] -mod tests { - use super::*; - - use crate::logs::lambda::{IntakeLog, Lambda, Message}; - - #[test] - fn test_add_batch() { - let mut aggregator = Aggregator::default(); - let log = IntakeLog { - message: Message { - message: "test".to_string(), - lambda: Lambda { - arn: "arn".to_string(), - request_id: Some("request_id".to_string()), - }, - timestamp: 0, - status: "status".to_string(), - }, - hostname: "hostname".to_string(), - service: "service".to_string(), - tags: "tags".to_string(), - source: "source".to_string(), - }; - let serialized_log = serde_json::to_string(&log).unwrap(); - aggregator.add_batch(vec![serialized_log.clone()]); - assert_eq!(aggregator.messages.len(), 1); - assert_eq!(aggregator.messages[0], serialized_log); - } - - #[test] - fn test_get_batch() { - let mut aggregator = Aggregator::default(); - let log = IntakeLog { - message: Message { - message: "test".to_string(), - lambda: Lambda { - arn: "arn".to_string(), - request_id: Some("request_id".to_string()), - }, - timestamp: 0, - status: "status".to_string(), - }, - hostname: "hostname".to_string(), - service: "service".to_string(), - tags: "tags".to_string(), - source: "source".to_string(), - }; - let serialized_log = serde_json::to_string(&log).unwrap(); - aggregator.add_batch(vec![serialized_log.clone()]); - assert_eq!(aggregator.messages.len(), 1); - let batch = aggregator.get_batch(); - let serialized_batch = format!("[{}]", serde_json::to_string(&log).unwrap()); - assert_eq!(batch, serialized_batch.as_bytes()); - } - - #[test] - fn test_get_batch_full_entries() { - let mut aggregator = Aggregator::new(2, 1_024, 1_024); - let log = IntakeLog { - message: Message { - message: "test".to_string(), - lambda: Lambda { - arn: "arn".to_string(), - request_id: Some("request_id".to_string()), - }, - timestamp: 0, - status: "status".to_string(), - }, - hostname: "hostname".to_string(), - service: "service".to_string(), - tags: "tags".to_string(), - source: "source".to_string(), - }; - // Add 3 logs - let serialized_log = serde_json::to_string(&log).unwrap(); - aggregator.add_batch(vec![ - serialized_log.clone(), - serialized_log.clone(), - serialized_log.clone(), - ]); - - // The batch should only contain the first 2 logs - let first_batch = aggregator.get_batch(); - let serialized_log = serde_json::to_string(&log).unwrap(); - let serialized_batch = format!("[{serialized_log},{serialized_log}]"); - assert_eq!(first_batch, serialized_batch.as_bytes()); - assert_eq!(aggregator.messages.len(), 1); - - // The second batch should only contain the last log - let second_batch = aggregator.get_batch(); - let serialized_batch = format!("[{serialized_log}]"); - assert_eq!(second_batch, serialized_batch.as_bytes()); - assert_eq!(aggregator.messages.len(), 0); - } - - #[test] - fn test_get_batch_full_payload() { - let mut aggregator = Aggregator::new(2, 256, 1_024); - let log = IntakeLog { - message: Message { - message: "test".to_string(), - lambda: Lambda { - arn: "arn".to_string(), - request_id: Some("request_id".to_string()), - }, - timestamp: 0, - status: "status".to_string(), - }, - hostname: "hostname".to_string(), - service: "service".to_string(), - tags: "tags".to_string(), - source: "source".to_string(), - }; - // Add 2 logs - let serialized_log = serde_json::to_string(&log).unwrap(); - aggregator.add_batch(vec![serialized_log.clone()]); - - // This log will exceed the max content size - let mut big_log = log.clone(); - big_log.message.message = "a".repeat(256); - let serialized_big_log = serde_json::to_string(&log).unwrap(); - aggregator.add_batch(vec![serialized_big_log.clone()]); - - let first_batch = aggregator.get_batch(); - let serialized_log = serde_json::to_string(&log).unwrap(); - let serialized_batch = format!("[{serialized_log}]"); - assert_eq!(first_batch, serialized_batch.as_bytes()); - - // I really doubt someone would make a log that is 5MB long, - // so we never send it, but we still keep it in the queue. - assert_eq!(aggregator.messages.len(), 1); - } -} diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs deleted file mode 100644 index d281bfb19..000000000 --- a/bottlecap/src/logs/aggregator_service.rs +++ /dev/null @@ -1,150 +0,0 @@ -use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error}; - -use crate::logs::{aggregator::Aggregator, constants}; - -#[derive(Debug)] -pub enum AggregatorCommand { - InsertBatch(Vec), - GetBatches(oneshot::Sender>>), - Shutdown, -} - -#[derive(Clone, Debug)] -pub struct AggregatorHandle { - tx: mpsc::UnboundedSender, -} - -impl AggregatorHandle { - pub fn insert_batch( - &self, - logs: Vec, - ) -> Result<(), mpsc::error::SendError> { - self.tx.send(AggregatorCommand::InsertBatch(logs)) - } - - pub async fn get_batches(&self) -> Result>, String> { - let (response_tx, response_rx) = oneshot::channel(); - self.tx - .send(AggregatorCommand::GetBatches(response_tx)) - .map_err(|e| format!("Failed to send flush command: {e}"))?; - - response_rx - .await - .map_err(|e| format!("Failed to receive flush response: {e}")) - } - - pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { - self.tx.send(AggregatorCommand::Shutdown) - } -} - -pub struct AggregatorService { - aggregator: Aggregator, - rx: mpsc::UnboundedReceiver, -} - -impl AggregatorService { - #[must_use] - #[allow(clippy::should_implement_trait)] - pub fn default() -> (Self, AggregatorHandle) { - Self::new( - constants::MAX_BATCH_ENTRIES_SIZE, - constants::MAX_CONTENT_SIZE_BYTES, - constants::MAX_LOG_SIZE_BYTES, - ) - } - - #[must_use] - pub fn new( - max_batch_entries_size: usize, - max_content_size_bytes: usize, - max_log_size_bytes: usize, - ) -> (Self, AggregatorHandle) { - let (tx, rx) = mpsc::unbounded_channel(); - let aggregator = Aggregator::new( - max_batch_entries_size, - max_content_size_bytes, - max_log_size_bytes, - ); - - let service = Self { aggregator, rx }; - let handle = AggregatorHandle { tx }; - - (service, handle) - } - - pub async fn run(mut self) { - debug!("Logs aggregator service started"); - - while let Some(command) = self.rx.recv().await { - match command { - AggregatorCommand::InsertBatch(logs) => { - self.aggregator.add_batch(logs); - } - AggregatorCommand::GetBatches(response_tx) => { - let mut batches = Vec::new(); - let mut current_batch = self.aggregator.get_batch(); - while !current_batch.is_empty() { - batches.push(current_batch); - current_batch = self.aggregator.get_batch(); - } - if response_tx.send(batches).is_err() { - error!("Failed to send logs flush response - receiver dropped"); - } - } - AggregatorCommand::Shutdown => { - debug!("Logs aggregator service shutting down"); - break; - } - } - } - } -} - -#[cfg(test)] -#[allow(clippy::unwrap_used)] -mod tests { - use super::*; - use crate::logs::lambda::{IntakeLog, Lambda, Message}; - - #[tokio::test] - async fn test_aggregator_service_insert_and_flush() { - let (service, handle) = AggregatorService::default(); - - let service_handle = tokio::spawn(async move { - service.run().await; - }); - - let log = IntakeLog { - message: Message { - message: "test".to_string(), - lambda: Lambda { - arn: "arn".to_string(), - request_id: Some("request_id".to_string()), - }, - timestamp: 0, - status: "status".to_string(), - }, - hostname: "hostname".to_string(), - service: "service".to_string(), - tags: "tags".to_string(), - source: "source".to_string(), - }; - let serialized_log = serde_json::to_string(&log).unwrap(); - - handle.insert_batch(vec![serialized_log.clone()]).unwrap(); - - let batches = handle.get_batches().await.unwrap(); - assert_eq!(batches.len(), 1); - let serialized_batch = format!("[{serialized_log}]"); - assert_eq!(batches[0], serialized_batch.as_bytes()); - - handle - .shutdown() - .expect("Failed to shutdown aggregator service"); - service_handle - .await - .expect("Aggregator service task failed"); - } -} diff --git a/bottlecap/src/logs/constants.rs b/bottlecap/src/logs/constants.rs deleted file mode 100644 index d6b49498d..000000000 --- a/bottlecap/src/logs/constants.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// Maximum content size per payload uncompressed in bytes, -/// that the Datadog Logs API accepts. -pub const MAX_CONTENT_SIZE_BYTES: usize = 5 * 1_024 * 1_024; - -/// Maximum size in bytes of a single log entry, before it -/// gets truncated. -pub const MAX_LOG_SIZE_BYTES: usize = 1_024 * 1_024; - -/// Maximum logs array size accepted. -pub const MAX_BATCH_ENTRIES_SIZE: usize = 1000; diff --git a/bottlecap/src/logs/flusher.rs b/bottlecap/src/logs/flusher.rs deleted file mode 100644 index d82fceb42..000000000 --- a/bottlecap/src/logs/flusher.rs +++ /dev/null @@ -1,311 +0,0 @@ -use crate::FLUSH_RETRY_COUNT; -use crate::config; -use crate::logs::aggregator_service::AggregatorHandle; -use dogstatsd::api_key::ApiKeyFactory; -use futures::future::join_all; -use hyper::StatusCode; -use reqwest::header::HeaderMap; -use std::error::Error; -use std::time::Instant; -use std::{io::Write, sync::Arc}; -use thiserror::Error as ThisError; -use tokio::{sync::OnceCell, task::JoinSet}; -use tracing::{debug, error}; -use zstd::stream::write::Encoder; - -#[derive(ThisError, Debug)] -#[error("{message}")] -pub struct FailedRequestError { - pub request: reqwest::RequestBuilder, - pub message: String, -} - -#[derive(Debug, Clone)] -pub struct Flusher { - client: reqwest::Client, - endpoint: String, - config: Arc, - api_key_factory: Arc, - headers: OnceCell, -} - -impl Flusher { - #[must_use] - pub fn new( - api_key_factory: Arc, - endpoint: String, - config: Arc, - client: reqwest::Client, - ) -> Self { - Flusher { - client, - endpoint, - config, - api_key_factory, - headers: OnceCell::new(), - } - } - - pub async fn flush(&self, batches: Option>>>) -> Vec { - let Some(api_key) = self.api_key_factory.get_api_key().await else { - error!("LOGS | Skipping flushing: Failed to resolve API key"); - return vec![]; - }; - - let mut set = JoinSet::new(); - - if let Some(logs_batches) = batches { - for batch in logs_batches.iter() { - if batch.is_empty() { - continue; - } - let req = self.create_request(batch.clone(), api_key.as_str()).await; - set.spawn(async move { Self::send(req).await }); - } - } - - let mut failed_requests = Vec::new(); - for result in set.join_all().await { - if let Err(e) = result { - debug!("LOGS | Failed to join task: {}", e); - continue; - } - - // At this point we know the task completed successfully, - // but the send operation itself may have failed - if let Err(e) = result { - if let Some(failed_req_err) = e.downcast_ref::() { - // Clone the request from our custom error - failed_requests.push( - failed_req_err - .request - .try_clone() - .expect("should be able to clone request"), - ); - debug!("LOGS | Failed to send request after retries, will retry later"); - } else { - debug!("LOGS | Failed to send request: {}", e); - } - } - } - failed_requests - } - - async fn create_request(&self, data: Vec, api_key: &str) -> reqwest::RequestBuilder { - let url = if self.config.observability_pipelines_worker_logs_enabled { - self.endpoint.clone() - } else { - format!("{}/api/v2/logs", self.endpoint) - }; - let headers = self.get_headers(api_key).await; - self.client - .post(&url) - .timeout(std::time::Duration::from_secs(self.config.flush_timeout)) - .headers(headers.clone()) - .body(data) - } - - async fn send(req: reqwest::RequestBuilder) -> Result<(), Box> { - let mut attempts = 0; - - loop { - let time = Instant::now(); - attempts += 1; - let Some(cloned_req) = req.try_clone() else { - return Err(Box::new(std::io::Error::other("can't clone"))); - }; - let resp = cloned_req.send().await; - let elapsed = time.elapsed(); - - match resp { - Ok(resp) => { - let status = resp.status(); - _ = resp.text().await; - if status == StatusCode::FORBIDDEN { - // Access denied. Stop retrying. - error!( - "LOGS | Request was denied by Datadog: Access denied. Please verify that your API key is valid." - ); - return Ok(()); - } - if status.is_success() { - return Ok(()); - } - } - Err(e) => { - if attempts >= FLUSH_RETRY_COUNT { - // After 3 failed attempts, return the original request for later retry - // Create a custom error that can be downcast to get the RequestBuilder - error!( - "LOGS | Failed to send request after {} ms and {} attempts: {:?}", - elapsed.as_millis(), - attempts, - e - ); - return Err(Box::new(FailedRequestError { - request: req, - message: format!("LOGS | Failed after {attempts} attempts: {e}"), - })); - } - } - } - } - } - - async fn get_headers(&self, api_key: &str) -> &HeaderMap { - self.headers - .get_or_init(move || async move { - let mut headers = HeaderMap::new(); - headers.insert( - "DD-API-KEY", - api_key.parse().expect("failed to parse header"), - ); - if !self.config.observability_pipelines_worker_logs_enabled { - headers.insert( - "DD-PROTOCOL", - "agent-json".parse().expect("failed to parse header"), - ); - } - headers.insert( - "Content-Type", - "application/json".parse().expect("failed to parse header"), - ); - - if self.config.logs_config_use_compression - && !self.config.observability_pipelines_worker_logs_enabled - { - headers.insert( - "Content-Encoding", - "zstd".parse().expect("failed to parse header"), - ); - } - headers - }) - .await - } -} - -#[allow(clippy::module_name_repetitions)] -#[derive(Clone)] -pub struct LogsFlusher { - config: Arc, - pub flushers: Vec, - aggregator_handle: AggregatorHandle, -} - -impl LogsFlusher { - pub fn new( - api_key_factory: Arc, - aggregator_handle: AggregatorHandle, - config: Arc, - client: reqwest::Client, - ) -> Self { - let mut flushers = Vec::new(); - - let endpoint = if config.observability_pipelines_worker_logs_enabled { - if config.observability_pipelines_worker_logs_url.is_empty() { - error!("LOGS | Observability Pipelines Worker URL is empty"); - } - config.observability_pipelines_worker_logs_url.clone() - } else { - config.logs_config_logs_dd_url.clone() - }; - - // Create primary flusher - flushers.push(Flusher::new( - Arc::clone(&api_key_factory), - endpoint, - config.clone(), - client.clone(), - )); - - // Create flushers for additional endpoints - for endpoint in &config.logs_config_additional_endpoints { - let endpoint_url = format!("https://{}:{}", endpoint.host, endpoint.port); - let additional_api_key_factory = - Arc::new(ApiKeyFactory::new(endpoint.api_key.clone().as_str())); - flushers.push(Flusher::new( - additional_api_key_factory, - endpoint_url, - config.clone(), - client.clone(), - )); - } - - LogsFlusher { - config, - flushers, - aggregator_handle, - } - } - - pub async fn flush( - &self, - retry_request: Option, - ) -> Vec { - let mut failed_requests = Vec::new(); - - // If retry_request is provided, only process that request - if let Some(req) = retry_request { - if let Some(req_clone) = req.try_clone() - && let Err(e) = Flusher::send(req_clone).await - && let Some(failed_req_err) = e.downcast_ref::() - { - failed_requests.push( - failed_req_err - .request - .try_clone() - .expect("should be able to clone request"), - ); - } - } else { - let logs_batches = Arc::new({ - match self.aggregator_handle.get_batches().await { - Ok(batches) => batches - .into_iter() - .map(|batch| self.compress(batch)) - .collect(), - Err(e) => { - debug!("Failed to flush from aggregator: {}", e); - Vec::new() - } - } - }); - - // Send batches to each flusher - let futures = self.flushers.iter().map(|flusher| { - let batches = Arc::clone(&logs_batches); - let flusher = flusher.clone(); - async move { flusher.flush(Some(batches)).await } - }); - - let results = join_all(futures).await; - for failed in results { - failed_requests.extend(failed); - } - } - failed_requests - } - - fn compress(&self, data: Vec) -> Vec { - if !self.config.logs_config_use_compression - || self.config.observability_pipelines_worker_logs_enabled - { - return data; - } - - match self.encode(&data) { - Ok(compressed_data) => compressed_data, - Err(e) => { - debug!("LOGS | Failed to compress data: {}", e); - data - } - } - } - - fn encode(&self, data: &[u8]) -> Result, Box> { - let mut encoder = Encoder::new(Vec::new(), self.config.logs_config_compression_level)?; - encoder.write_all(data)?; - encoder.finish().map_err(|e| Box::new(e) as Box) - } -} diff --git a/bottlecap/src/logs/lambda/mod.rs b/bottlecap/src/logs/lambda/mod.rs index e196c1ece..ea5fcff82 100644 --- a/bottlecap/src/logs/lambda/mod.rs +++ b/bottlecap/src/logs/lambda/mod.rs @@ -2,21 +2,6 @@ use serde::Serialize; pub mod processor; -/// -/// Intake Log for AWS Lambda Telemetry Events. -/// -#[derive(Clone, Debug, PartialEq, Serialize)] -pub struct IntakeLog { - /// Setting it as a struct, allowing us to override fields. - pub message: Message, - pub hostname: String, - pub service: String, - #[serde(rename(serialize = "ddtags"))] - pub tags: String, - #[serde(rename(serialize = "ddsource"))] - pub source: String, -} - /// /// Message for AWS Lambda logs. /// diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 8325dde32..46780418e 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -11,11 +11,27 @@ use crate::event_bus::Event; use crate::extension::telemetry::events::ReportMetrics; use crate::extension::telemetry::events::{Status, TelemetryEvent, TelemetryRecord}; use crate::lifecycle::invocation::context::Context as InvocationContext; -use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::processor::{Processor, Rule}; use crate::tags::provider; -use crate::logs::lambda::{IntakeLog, Message}; +use crate::logs::lambda::Message; +use datadog_logs_agent::{AggregatorHandle, IntakeEntry}; + +const OOM_ERRORS: [&str; 7] = [ + "fatal error: runtime: out of memory", // Go + "java.lang.OutOfMemoryError", // Java + "JavaScript heap out of memory", // Node + "Runtime exited with error: signal: killed", // Node + "MemoryError", // Python + "failed to allocate memory (NoMemoryError)", // Ruby + "OutOfMemoryException", // .NET +]; + +fn is_oom_error(error_msg: &str) -> bool { + OOM_ERRORS + .iter() + .any(|&oom_str| error_msg.contains(oom_str)) +} #[allow(clippy::module_name_repetitions)] #[derive(Clone, Debug)] @@ -28,9 +44,9 @@ pub struct LambdaProcessor { // Current Invocation Context invocation_context: InvocationContext, // Logs which don't have a `request_id` - orphan_logs: Vec, + orphan_logs: Vec, // Logs which are ready to be aggregated - ready_logs: Vec, + ready_logs: Vec, // Main event bus event_bus: Sender, // Logs enabled @@ -39,22 +55,6 @@ pub struct LambdaProcessor { is_managed_instance_mode: bool, } -const OOM_ERRORS: [&str; 7] = [ - "fatal error: runtime: out of memory", // Go - "java.lang.OutOfMemoryError", // Java - "JavaScript heap out of memory", // Node - "Runtime exited with error: signal: killed", // Node - "MemoryError", // Python - "failed to allocate memory (NoMemoryError)", // Ruby - "OutOfMemoryException", // .NET -]; - -fn is_oom_error(error_msg: &str) -> bool { - OOM_ERRORS - .iter() - .any(|&oom_str| error_msg.contains(oom_str)) -} - /// Maps AWS/common log level strings to Datadog log status values. /// Case-insensitive and accepts both short and long forms /// (e.g. "WARN"/"WARNING", "INFO"/"INFORMATION", "ERR"/"ERROR"). @@ -71,7 +71,7 @@ fn map_log_level_to_status(level: &str) -> Option<&'static str> { } } -impl Processor for LambdaProcessor {} +impl Processor for LambdaProcessor {} impl LambdaProcessor { #[must_use] @@ -354,10 +354,13 @@ impl LambdaProcessor { } } - fn get_intake_log(&mut self, mut lambda_message: Message) -> Result> { + fn get_log_entry( + &mut self, + mut lambda_message: Message, + ) -> Result> { // Assign request_id from message or context if available lambda_message.lambda.request_id = match lambda_message.lambda.request_id { - Some(request_id) => Some(request_id.clone()), + Some(request_id) => Some(request_id), None => { // If there is no request_id available in the current invocation context, // then set to None, same goes if we are in a Managed Instance – as concurrent @@ -374,58 +377,71 @@ impl LambdaProcessor { let parsed_json = serde_json::from_str::(lambda_message.message.as_str()); - let log = if let Ok(serde_json::Value::Object(mut json_obj)) = parsed_json { - let mut tags = self.tags.clone(); - let final_message = Self::extract_tags_and_get_message( - &mut json_obj, - &mut tags, - lambda_message.message.clone(), - ); - - // Extract log level from JSON (AWS JSON log format / Powertools). - // Try "level" first (standard), then fall back to "status" (Datadog convention). - let status = json_obj - .get("level") - .or_else(|| json_obj.get("status")) - .and_then(|v| v.as_str()) - .and_then(map_log_level_to_status) - .map_or( - lambda_message.status.clone(), - std::string::ToString::to_string, + let (final_message, tags, status) = + if let Ok(serde_json::Value::Object(mut json_obj)) = parsed_json { + let mut tags = self.tags.clone(); + let msg = Self::extract_tags_and_get_message( + &mut json_obj, + &mut tags, + lambda_message.message.clone(), ); - IntakeLog { - hostname: self.function_arn.clone(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: self.service.clone(), - tags, - message: Message { - message: final_message, - lambda: lambda_message.lambda, - timestamp: lambda_message.timestamp, - status, - }, - } - } else { - // Not JSON or not an object - use message as-is - IntakeLog { - hostname: self.function_arn.clone(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: self.service.clone(), - tags: self.tags.clone(), - message: lambda_message, - } + // Extract log level from JSON (AWS JSON log format / Powertools). + // Try "level" first (standard), then fall back to "status" (Datadog convention). + let status = json_obj + .get("level") + .or_else(|| json_obj.get("status")) + .and_then(|v| v.as_str()) + .and_then(map_log_level_to_status) + .map_or( + lambda_message.status.clone(), + std::string::ToString::to_string, + ); + + (msg, tags, status) + } else { + ( + lambda_message.message, + self.tags.clone(), + lambda_message.status, + ) + }; + + let mut attrs = serde_json::Map::new(); + attrs.insert( + "lambda".to_string(), + serde_json::json!({ + "arn": lambda_message.lambda.arn, + "request_id": lambda_message.lambda.request_id, + }), + ); + + let entry = IntakeEntry { + message: final_message, + timestamp: lambda_message.timestamp, + hostname: Some(self.function_arn.clone()), + service: Some(self.service.clone()), + ddsource: Some(LAMBDA_RUNTIME_SLUG.to_string()), + ddtags: Some(tags), + status: Some(status), + attributes: attrs, }; - if log.message.lambda.request_id.is_some() || self.is_managed_instance_mode { + let has_request_id = entry + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .is_some_and(|v| !v.is_null()); + + if has_request_id || self.is_managed_instance_mode { // In On-Demand mode, ship logs with request_id. // In Managed Instance mode, ship logs without request_id immediately as well. // These are inter-invocation/sandbox logs that should be aggregated without // waiting to be attached to the next invocation. - Ok(log) + Ok(entry) } else { // In On-Demand mode, if no request_id is available, queue as orphan log - self.orphan_logs.push(log); + self.orphan_logs.push(entry); Err("No request_id available, queueing for later".into()) } } @@ -459,23 +475,20 @@ impl LambdaProcessor { original_message } - async fn make_log(&mut self, event: TelemetryEvent) -> Result> { + async fn make_log(&mut self, event: TelemetryEvent) -> Result> { match self.get_message(event).await { - Ok(lambda_message) => self.get_intake_log(lambda_message), + Ok(lambda_message) => self.get_log_entry(lambda_message), // TODO: Check what to do when we can't process the event Err(e) => Err(e), } } - /// Processes a log, applies filtering rules, serializes it, and queues it for aggregation - fn process_and_queue_log(&mut self, mut log: IntakeLog) { - let should_send_log = self.logs_enabled - && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); - if should_send_log && let Ok(serialized_log) = serde_json::to_string(&log) { - // explicitly drop log so we don't accidentally re-use it and push - // duplicate logs to the aggregator - drop(log); - self.ready_logs.push(serialized_log); + /// Processes a log, applies filtering rules, and queues it for aggregation + fn process_and_queue_log(&mut self, mut log: IntakeEntry) { + let should_send_log = + self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message); + if should_send_log { + self.ready_logs.push(log); } } @@ -486,8 +499,14 @@ impl LambdaProcessor { // Process orphan logs, since we have a `request_id` now let orphan_logs = std::mem::take(&mut self.orphan_logs); for mut orphan_log in orphan_logs { - orphan_log.message.lambda.request_id = - Some(self.invocation_context.request_id.clone()); + if let Some(lambda_val) = orphan_log.attributes.get_mut("lambda") + && let Some(obj) = lambda_val.as_object_mut() + { + obj.insert( + "request_id".to_string(), + serde_json::Value::String(self.invocation_context.request_id.clone()), + ); + } self.process_and_queue_log(orphan_log); } } @@ -514,8 +533,8 @@ mod tests { InitPhase, InitType, ManagedInstanceReportMetrics, OnDemandReportMetrics, ReportMetrics, RuntimeDoneMetrics, Status, }; - use crate::logs::aggregator_service::AggregatorService; use crate::logs::lambda::Lambda; + use datadog_logs_agent::{AggregatorService, IntakeEntry}; macro_rules! get_message_tests { ($($name:ident: $value:expr,)*) => { @@ -875,23 +894,37 @@ mod tests { }; let lambda_message = processor.get_message(event.clone()).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); - assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - assert_eq!(intake_log.hostname, "test-arn".to_string()); - assert_eq!(intake_log.service, "test-service".to_string()); - assert!(intake_log.tags.contains("test:tags")); + assert_eq!(intake_log.ddsource.as_deref(), Some(LAMBDA_RUNTIME_SLUG)); + assert_eq!(intake_log.hostname.as_deref(), Some("test-arn")); + assert_eq!(intake_log.service.as_deref(), Some("test-service")); + assert!( + intake_log + .ddtags + .as_deref() + .unwrap_or("") + .contains("test:tags") + ); assert_eq!( intake_log.message, - Message { - message: "START RequestId: test-request-id Version: test".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, + "START RequestId: test-request-id Version: test" + ); + assert_eq!( + intake_log + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") + ); + assert_eq!( + intake_log + .attributes + .get("lambda") + .and_then(|l| l.get("arn")) + .and_then(|v| v.as_str()), + Some("test-arn") ); } @@ -921,7 +954,7 @@ mod tests { let lambda_message = processor.get_message(event.clone()).await.unwrap(); assert_eq!(lambda_message.lambda.request_id, None); - let intake_log = processor.get_intake_log(lambda_message).unwrap_err(); + let intake_log = processor.get_log_entry(lambda_message).unwrap_err(); assert_eq!( intake_log.to_string(), "No request_id available, queueing for later" @@ -956,7 +989,7 @@ mod tests { }; let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - processor.get_intake_log(start_lambda_message).unwrap(); + processor.get_log_entry(start_lambda_message).unwrap(); // This could be any event that doesn't have a `request_id` let event = TelemetryEvent { @@ -965,10 +998,14 @@ mod tests { }; let lambda_message = processor.get_message(event.clone()).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); assert_eq!( - intake_log.message.lambda.request_id, - Some("test-request-id".to_string()) + intake_log + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") ); } @@ -1000,15 +1037,24 @@ mod tests { let lambda_message = processor.get_message(event.clone()).await.unwrap(); assert_eq!(lambda_message.lambda.request_id, None); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.lambda.request_id, None); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert!( + intake_log + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .is_none_or(serde_json::Value::is_null) + ); assert_eq!(processor.orphan_logs.len(), 0); - assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - assert_eq!(intake_log.hostname, "test-arn".to_string()); - assert_eq!(intake_log.service, "test-service".to_string()); - assert_eq!(intake_log.message.message, "test-function".to_string()); - assert_eq!(intake_log.tags, tags_provider.get_tags_string()); + assert_eq!(intake_log.ddsource.as_deref(), Some(LAMBDA_RUNTIME_SLUG)); + assert_eq!(intake_log.hostname.as_deref(), Some("test-arn")); + assert_eq!(intake_log.service.as_deref(), Some("test-service")); + assert_eq!(intake_log.message, "test-function"); + assert_eq!( + intake_log.ddtags.as_deref(), + Some(tags_provider.get_tags_string().as_str()) + ); } // process @@ -1027,7 +1073,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let (aggregator_service, aggregator_handle) = AggregatorService::new(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; @@ -1053,23 +1099,36 @@ mod tests { let batches = aggregator_handle.get_batches().await.unwrap(); assert_eq!(batches.len(), 1); - let log = IntakeLog { - message: Message { - message: "START RequestId: test-request-id Version: test".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - let serialized_log = format!("[{}]", serde_json::to_string(&log).unwrap()); - assert_eq!(batches[0], serialized_log.as_bytes()); + let entries: Vec = serde_json::from_slice(&batches[0]).unwrap(); + assert_eq!(entries.len(), 1); + let entry = &entries[0]; + assert_eq!( + entry.message, + "START RequestId: test-request-id Version: test" + ); + assert_eq!(entry.hostname.as_deref(), Some("test-arn")); + assert_eq!(entry.ddsource.as_deref(), Some(LAMBDA_RUNTIME_SLUG)); + assert_eq!(entry.service.as_deref(), Some("test-service")); + assert_eq!( + entry.ddtags.as_deref(), + Some(tags_provider.get_tags_string().as_str()) + ); + assert_eq!( + entry + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") + ); + assert_eq!( + entry + .attributes + .get("lambda") + .and_then(|l| l.get("arn")) + .and_then(|v| v.as_str()), + Some("test-arn") + ); aggregator_handle .shutdown() @@ -1096,7 +1155,7 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let (aggregator_service, aggregator_handle) = AggregatorService::new(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); @@ -1145,7 +1204,7 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let (aggregator_service, aggregator_handle) = AggregatorService::new(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); @@ -1188,7 +1247,7 @@ mod tests { let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let (aggregator_service, aggregator_handle) = AggregatorService::new(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); @@ -1227,42 +1286,31 @@ mod tests { let batches = aggregator_handle.get_batches().await.unwrap(); assert_eq!(batches.len(), 1); - let start_log = IntakeLog { - message: Message { - message: "START RequestId: test-request-id Version: test".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - let function_log = IntakeLog { - message: Message { - message: "test-function".to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - let serialized_log = format!( - "[{},{}]", - serde_json::to_string(&start_log).unwrap(), - serde_json::to_string(&function_log).unwrap() + let entries: Vec = serde_json::from_slice(&batches[0]).unwrap(); + assert_eq!(entries.len(), 2); + let start_entry = &entries[0]; + assert_eq!( + start_entry.message, + "START RequestId: test-request-id Version: test" + ); + assert_eq!( + start_entry + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") + ); + let function_entry = &entries[1]; + assert_eq!(function_entry.message, "test-function"); + assert_eq!( + function_entry + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") ); - assert_eq!(batches[0], serialized_log.as_bytes()); aggregator_handle .shutdown() @@ -1303,36 +1351,47 @@ mod tests { }; let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - processor.get_intake_log(start_lambda_message).unwrap(); + processor.get_log_entry(start_lambda_message).unwrap(); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details": "my-structured-message","ddtags":"added_tag1:added_value1,added_tag2:added_value2"}}"#.to_string())), }; let lambda_message = processor.get_message(event.clone()).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - - assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - assert_eq!(intake_log.hostname, "test-arn".to_string()); - assert_eq!(intake_log.service, "test-service".to_string()); - assert!(intake_log.tags.contains("added_tag1:added_value1")); - let function_log = IntakeLog { - message: Message { - message: r#"{"custom_details":"my-structured-message"}"#.to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string() - + ",added_tag1:added_value1,added_tag2:added_value2", - }; - assert_eq!(intake_log, function_log); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + + assert_eq!(intake_log.ddsource.as_deref(), Some(LAMBDA_RUNTIME_SLUG)); + assert_eq!(intake_log.hostname.as_deref(), Some("test-arn")); + assert_eq!(intake_log.service.as_deref(), Some("test-service")); + assert!( + intake_log + .ddtags + .as_deref() + .unwrap_or("") + .contains("added_tag1:added_value1") + ); + assert_eq!( + intake_log.message, + r#"{"custom_details":"my-structured-message"}"# + ); + assert_eq!( + intake_log.ddtags.as_deref(), + Some( + format!( + "{},added_tag1:added_value1,added_tag2:added_value2", + tags_provider.get_tags_string() + ) + .as_str() + ) + ); + assert_eq!( + intake_log + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") + ); } #[tokio::test] async fn test_process_logs_structured_no_ddtags() { @@ -1365,34 +1424,34 @@ mod tests { }; let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap(); - processor.get_intake_log(start_lambda_message).unwrap(); + processor.get_log_entry(start_lambda_message).unwrap(); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), record: TelemetryRecord::Function(Value::String(r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string())), }; let lambda_message = processor.get_message(event.clone()).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - - assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string()); - assert_eq!(intake_log.hostname, "test-arn".to_string()); - assert_eq!(intake_log.service, "test-service".to_string()); - let function_log = IntakeLog { - message: Message { - message: r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"#.to_string(), - lambda: Lambda { - arn: "test-arn".to_string(), - request_id: Some("test-request-id".to_string()), - }, - timestamp: 1_673_061_827_000, - status: "info".to_string(), - }, - hostname: "test-arn".to_string(), - source: LAMBDA_RUNTIME_SLUG.to_string(), - service: "test-service".to_string(), - tags: tags_provider.get_tags_string(), - }; - assert_eq!(intake_log, function_log); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + + assert_eq!(intake_log.ddsource.as_deref(), Some(LAMBDA_RUNTIME_SLUG)); + assert_eq!(intake_log.hostname.as_deref(), Some("test-arn")); + assert_eq!(intake_log.service.as_deref(), Some("test-service")); + assert_eq!( + intake_log.message, + r#"{"message":{"custom_details":"my-structured-message"},"my_other_details":"included"}"# + ); + assert_eq!( + intake_log.ddtags.as_deref(), + Some(tags_provider.get_tags_string().as_str()) + ); + assert_eq!( + intake_log + .attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .and_then(|v| v.as_str()), + Some("test-request-id") + ); } #[tokio::test] @@ -1422,7 +1481,7 @@ mod tests { )); let (tx, _rx) = tokio::sync::mpsc::channel(2); - let (aggregator_service, aggregator_handle) = AggregatorService::default(); + let (aggregator_service, aggregator_handle) = AggregatorService::new(); let service_handle = tokio::spawn(async move { aggregator_service.run().await; }); @@ -1536,7 +1595,12 @@ mod tests { )), }; let log1 = processor.make_log(function_event).await.unwrap(); - assert_eq!(log1.message.lambda.request_id, None); + assert!( + log1.attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .is_none_or(serde_json::Value::is_null) + ); assert_eq!(processor.orphan_logs.len(), 0); // Now send a PlatformStart event with a request_id to set the context @@ -1560,7 +1624,12 @@ mod tests { let log2 = processor.make_log(another_function_event).await.unwrap(); // In managed instance mode, logs should not inherit request_id from context - assert_eq!(log2.message.lambda.request_id, None); + assert!( + log2.attributes + .get("lambda") + .and_then(|l| l.get("request_id")) + .is_none_or(serde_json::Value::is_null) + ); assert_eq!(processor.orphan_logs.len(), 0); } @@ -1730,7 +1799,7 @@ mod tests { }, }; let start_msg = processor.get_message(start_event).await.unwrap(); - processor.get_intake_log(start_msg).unwrap(); + processor.get_log_entry(start_msg).unwrap(); // Test WARN level let event = TelemetryEvent { @@ -1740,8 +1809,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "warn"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "warn"); // Test ERROR level let event = TelemetryEvent { @@ -1751,8 +1820,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "error"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "error"); // Test FATAL level let event = TelemetryEvent { @@ -1762,8 +1831,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "critical"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "critical"); // Test DEBUG level let event = TelemetryEvent { @@ -1773,8 +1842,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "debug"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "debug"); // Test INFO level (should remain "info") let event = TelemetryEvent { @@ -1784,8 +1853,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "info"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "info"); } #[tokio::test] @@ -1815,7 +1884,7 @@ mod tests { }, }; let start_msg = processor.get_message(start_event).await.unwrap(); - processor.get_intake_log(start_msg).unwrap(); + processor.get_log_entry(start_msg).unwrap(); // JSON without level field let event = TelemetryEvent { @@ -1825,8 +1894,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "info"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "info"); } #[tokio::test] @@ -1856,7 +1925,7 @@ mod tests { }, }; let start_msg = processor.get_message(start_event).await.unwrap(); - processor.get_intake_log(start_msg).unwrap(); + processor.get_log_entry(start_msg).unwrap(); // JSON with unrecognized level let event = TelemetryEvent { @@ -1866,8 +1935,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "info"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "info"); } #[tokio::test] @@ -1906,8 +1975,8 @@ mod tests { assert_eq!(lambda_message.status, "error"); // The intake log should preserve the "error" status (message is not JSON) - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "error"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "error"); } #[tokio::test] @@ -1938,14 +2007,9 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "error"); - assert!( - intake_log - .message - .message - .contains("DD_EXTENSION | ERROR |") - ); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "error"); + assert!(intake_log.message.contains("DD_EXTENSION | ERROR |")); // DEBUG level let event = TelemetryEvent { @@ -1956,8 +2020,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "debug"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "debug"); } #[tokio::test] @@ -1987,7 +2051,7 @@ mod tests { }, }; let start_msg = processor.get_message(start_event).await.unwrap(); - processor.get_intake_log(start_msg).unwrap(); + processor.get_log_entry(start_msg).unwrap(); // level as a number let event = TelemetryEvent { @@ -1997,8 +2061,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "info"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "info"); // level as null let event = TelemetryEvent { @@ -2008,8 +2072,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "info"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "info"); // level as a boolean let event = TelemetryEvent { @@ -2019,8 +2083,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "info"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "info"); } #[tokio::test] @@ -2050,7 +2114,7 @@ mod tests { }, }; let start_msg = processor.get_message(start_event).await.unwrap(); - processor.get_intake_log(start_msg).unwrap(); + processor.get_log_entry(start_msg).unwrap(); // JSON log with both ddtags and level let event = TelemetryEvent { @@ -2061,15 +2125,27 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); // Level should be extracted - assert_eq!(intake_log.message.status, "warn"); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "warn"); // Tags should be extracted and appended - assert!(intake_log.tags.contains("env:staging")); - assert!(intake_log.tags.contains("team:backend")); + assert!( + intake_log + .ddtags + .as_deref() + .unwrap_or("") + .contains("env:staging") + ); + assert!( + intake_log + .ddtags + .as_deref() + .unwrap_or("") + .contains("team:backend") + ); // ddtags should be removed from the message - assert!(!intake_log.message.message.contains("ddtags")); + assert!(!intake_log.message.contains("ddtags")); } #[tokio::test] @@ -2099,7 +2175,7 @@ mod tests { }, }; let start_msg = processor.get_message(start_event).await.unwrap(); - processor.get_intake_log(start_msg).unwrap(); + processor.get_log_entry(start_msg).unwrap(); // JSON log with "status" field instead of "level" (Datadog convention) let event = TelemetryEvent { @@ -2109,8 +2185,8 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "error"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "error"); // "level" takes priority over "status" when both are present let event = TelemetryEvent { @@ -2120,7 +2196,7 @@ mod tests { )), }; let lambda_message = processor.get_message(event).await.unwrap(); - let intake_log = processor.get_intake_log(lambda_message).unwrap(); - assert_eq!(intake_log.message.status, "warn"); + let intake_log = processor.get_log_entry(lambda_message).unwrap(); + assert_eq!(intake_log.status.as_deref().unwrap_or("info"), "warn"); } } diff --git a/bottlecap/src/logs/mod.rs b/bottlecap/src/logs/mod.rs index 3c40f2983..405b8faf0 100644 --- a/bottlecap/src/logs/mod.rs +++ b/bottlecap/src/logs/mod.rs @@ -1,7 +1,3 @@ pub mod agent; -pub mod aggregator; -pub mod aggregator_service; -pub mod constants; -pub mod flusher; pub mod lambda; pub mod processor; diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index cff6df5f0..1d8d60304 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -7,9 +7,9 @@ use crate::LAMBDA_RUNTIME_SLUG; use crate::config::{self, processing_rule}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; -use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::lambda::processor::LambdaProcessor; use crate::tags; +use datadog_logs_agent::AggregatorHandle; impl LogsProcessor { #[must_use] diff --git a/bottlecap/src/otlp/mod.rs b/bottlecap/src/otlp/mod.rs index 4e0f4aed7..6f7c3f5f7 100644 --- a/bottlecap/src/otlp/mod.rs +++ b/bottlecap/src/otlp/mod.rs @@ -16,6 +16,7 @@ pub fn should_enable_otlp_agent(config: &Arc) -> bool { #[cfg(test)] mod tests { + #![allow(clippy::result_large_err)] use super::*; use std::path::Path; diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 4b6367c9a..06f649010 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -2,12 +2,13 @@ use bottlecap::LAMBDA_RUNTIME_SLUG; use bottlecap::config::Config; use bottlecap::event_bus::EventBus; use bottlecap::extension::telemetry::events::TelemetryEvent; -use bottlecap::logs::{agent::LogsAgent, flusher::LogsFlusher}; +use bottlecap::logs::agent::LogsAgent; use bottlecap::tags::provider::Provider; -use dogstatsd::api_key::ApiKeyFactory; +use datadog_logs_agent::{AggregatorService, Destination, LogFlusher, LogFlusherConfig}; use httpmock::prelude::*; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; mod common; @@ -19,9 +20,11 @@ async fn test_logs() { }; let dd_api_key = "my_test_key"; - // protobuf is using hashmap, can't set a btreemap to have sorted keys. Using multiple regexp since - // Can't do look around since -> error: look-around, including look-ahead and look-behind, is not supported - let regexp_message = r#"[{"message":{"message":"START RequestId: 459921b5-681c-4a96-beb0-81e0aa586026 Version: $LATEST","lambda":{"arn":"test-arn","request_id":"459921b5-681c-4a96-beb0-81e0aa586026"},"timestamp":1666361103165,"status":"info"},"hostname":"test-arn","service":"","#; + // New flat LogEntry format: top-level message, hostname, lambda attrs flattened + let regexp_message = + r#""message":"START RequestId: 459921b5-681c-4a96-beb0-81e0aa586026 Version: $LATEST""#; + let regexp_request_id = r#""request_id":"459921b5-681c-4a96-beb0-81e0aa586026""#; + let regexp_hostname = r#""hostname":"test-arn""#; let regexp_compute_state = r#"_dd.compute_stats:1"#; let regexp_arch = format!(r#"architecture:{}"#, arch); let regexp_function_arn = r#"function_arn:test-arn"#; @@ -34,6 +37,8 @@ async fn test_logs() { .header("DD-API-KEY", dd_api_key) .header("Content-Type", "application/json") .body_contains(regexp_message) + .body_contains(regexp_request_id) + .body_contains(regexp_hostname) .body_contains(regexp_compute_state) .body_contains(regexp_arch) .body_contains(regexp_function_arn) @@ -56,8 +61,7 @@ async fn test_logs() { let (_, bus_tx) = EventBus::run(); - let (logs_aggr_service, logs_aggr_handle) = - bottlecap::logs::aggregator_service::AggregatorService::default(); + let (logs_aggr_service, logs_aggr_handle) = AggregatorService::new(); tokio::spawn(async move { logs_aggr_service.run().await; }); @@ -69,10 +73,20 @@ async fn test_logs() { logs_aggr_handle.clone(), false, ); - let api_key_factory = Arc::new(ApiKeyFactory::new(dd_api_key)); + let client = bottlecap::http::get_client(&Arc::clone(&arc_conf)); - let logs_flusher = - LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone(), client); + let flusher_config = LogFlusherConfig { + api_key: dd_api_key.to_string(), + site: "datadoghq.com".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/api/v2/logs", server.url("")), + }, + additional_endpoints: Vec::new(), + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + let logs_flusher = LogFlusher::new(flusher_config, client, logs_aggr_handle); let telemetry_events: Vec = serde_json::from_str( r#"[{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#) @@ -89,7 +103,7 @@ async fn test_logs() { logs_agent.sync_consume().await; - let _ = logs_flusher.flush(None).await; + let _ = logs_flusher.flush(vec![]).await; hello_mock.assert(); }