diff --git a/.gitlab/datasources/test-suites.yaml b/.gitlab/datasources/test-suites.yaml index be065c284..6ea591354 100644 --- a/.gitlab/datasources/test-suites.yaml +++ b/.gitlab/datasources/test-suites.yaml @@ -3,3 +3,4 @@ test_suites: - name: otlp - name: snapstart - name: lmi + - name: durable diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index a9bab75a7..65177900a 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -383,7 +383,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.11.0", "log", "prettyplease", "proc-macro2", @@ -1652,15 +1652,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -2607,9 +2598,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 3c2ef31cb..09b259943 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -51,6 +51,7 @@ use bottlecap::{ AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService, }, flusher::LogsFlusher, + lambda::DurableContextUpdate, }, otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, proxy::{interceptor, should_start_proxy}, @@ -298,15 +299,20 @@ async fn extension_loop_active( // and shares the connection pool. let shared_client = bottlecap::http::get_client(config); - let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) = - start_logs_agent( - config, - Arc::clone(&api_key_factory), - &tags_provider, - event_bus_tx.clone(), - aws_config.is_managed_instance_mode(), - &shared_client, - ); + let ( + logs_agent_channel, + logs_flusher, + logs_agent_cancel_token, + logs_aggregator_handle, + durable_context_tx, + ) = start_logs_agent( + config, + Arc::clone(&api_key_factory), + &tags_provider, + event_bus_tx.clone(), + aws_config.is_managed_instance_mode(), + &shared_client, + ); let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd( tags_provider.clone(), @@ -325,6 +331,7 @@ async fn extension_loop_active( Arc::clone(&aws_config), metrics_aggregator_handle.clone(), Arc::clone(&propagator), + durable_context_tx, ); tokio::spawn(async move { invocation_processor_service.run().await; @@ -1039,6 +1046,7 @@ fn start_logs_agent( LogsFlusher, CancellationToken, LogsAggregatorHandle, + Sender, ) { let (aggregator_service, aggregator_handle) = LogsAggregatorService::default(); // Start service in background @@ -1046,7 +1054,7 @@ fn start_logs_agent( aggregator_service.run().await; }); - let (mut agent, tx) = LogsAgent::new( + let (mut agent, tx, durable_context_tx) = LogsAgent::new( Arc::clone(tags_provider), Arc::clone(config), event_bus, @@ -1068,7 +1076,13 @@ fn start_logs_agent( config.clone(), client.clone(), ); - (tx, flusher, cancel_token, aggregator_handle) + ( + tx, + flusher, + cancel_token, + aggregator_handle, + durable_context_tx, + ) } #[allow(clippy::type_complexity)] diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 7e274d3ce..03eb5d490 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -9,7 +9,9 @@ use libdd_trace_protobuf::pb::Span; use libdd_trace_utils::tracer_header_tags; use serde_json::Value; use tokio::time::Instant; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; + +use tokio::sync::mpsc; use crate::{ config::{self, aws::AwsConfig}, @@ -24,6 +26,7 @@ use crate::{ span_inferrer::{self, SpanInferrer}, triggers::get_default_service_name, }, + logs::lambda::DurableContextUpdate, metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics}, proc::{ self, CPUData, NetworkData, @@ -88,6 +91,10 @@ pub struct Processor { /// Tracks whether if first invocation after init has been received in Managed Instance mode. /// Used to determine if we should search for the empty context on an invocation. awaiting_first_invocation: bool, + /// Sender used to forward durable execution context extracted from `aws.lambda` spans to the + /// logs agent. Decouples the trace agent from the logs agent: the trace agent sends spans + /// to the lifecycle processor, which extracts durable context and relays it here. + durable_context_tx: mpsc::Sender, } impl Processor { @@ -98,6 +105,7 @@ impl Processor { aws_config: Arc, metrics_aggregator: dogstatsd::aggregator::AggregatorHandle, propagator: Arc, + durable_context_tx: mpsc::Sender, ) -> Self { let resource = tags_provider .get_canonical_resource_name() @@ -127,6 +135,7 @@ impl Processor { dynamic_tags: HashMap::new(), active_invocations: 0, awaiting_first_invocation: false, + durable_context_tx, } } @@ -1342,6 +1351,23 @@ impl Processor { self.context_buffer.add_tracer_span(request_id, span); } } + + /// Forwards durable execution context extracted from an `aws.lambda` span to the logs + /// pipeline so it can release held logs and tag them with durable execution metadata. + pub fn forward_durable_context( + &mut self, + request_id: &str, + execution_id: &str, + execution_name: &str, + ) { + if let Err(e) = self.durable_context_tx.try_send(DurableContextUpdate { + request_id: request_id.to_owned(), + execution_id: execution_id.to_owned(), + execution_name: execution_name.to_owned(), + }) { + error!("Invocation Processor | Failed to forward durable context to logs agent: {e}"); + } + } } #[cfg(test)] @@ -1387,7 +1413,15 @@ mod tests { tokio::spawn(service.run()); let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); - Processor::new(tags_provider, config, aws_config, handle, propagator) + let (durable_context_tx, _) = tokio::sync::mpsc::channel(1); + Processor::new( + tags_provider, + config, + aws_config, + handle, + propagator, + durable_context_tx, + ) } #[test] @@ -1924,7 +1958,15 @@ mod tests { let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); - let processor = Processor::new(tags_provider, config, aws_config, handle, propagator); + let (durable_context_tx, _) = tokio::sync::mpsc::channel(1); + let processor = Processor::new( + tags_provider, + config, + aws_config, + handle, + propagator, + durable_context_tx, + ); assert!( processor.is_managed_instance_mode(), diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 13f41cf01..a7d533279 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -17,6 +17,7 @@ use crate::{ context::{Context, ReparentingInfo}, processor::Processor, }, + logs::lambda::DurableContextUpdate, tags::provider, traces::{ context::SpanContext, propagation::DatadogCompositePropagator, @@ -111,6 +112,11 @@ pub enum ProcessorCommand { AddTracerSpan { span: Box, }, + ForwardDurableContext { + request_id: String, + execution_id: String, + execution_name: String, + }, OnOutOfMemoryError { timestamp: i64, }, @@ -380,6 +386,21 @@ impl InvocationProcessorHandle { .await } + pub async fn forward_durable_context( + &self, + request_id: String, + execution_id: String, + execution_name: String, + ) -> Result<(), mpsc::error::SendError> { + self.sender + .send(ProcessorCommand::ForwardDurableContext { + request_id, + execution_id, + execution_name, + }) + .await + } + pub async fn on_out_of_memory_error( &self, timestamp: i64, @@ -430,6 +451,7 @@ impl InvocationProcessorService { aws_config: Arc, metrics_aggregator_handle: AggregatorHandle, propagator: Arc, + durable_context_tx: mpsc::Sender, ) -> (InvocationProcessorHandle, Self) { let (sender, receiver) = mpsc::channel(1000); @@ -439,6 +461,7 @@ impl InvocationProcessorService { aws_config, metrics_aggregator_handle, propagator, + durable_context_tx, ); let handle = InvocationProcessorHandle { sender }; @@ -588,6 +611,17 @@ impl InvocationProcessorService { ProcessorCommand::AddTracerSpan { span } => { self.processor.add_tracer_span(&span); } + ProcessorCommand::ForwardDurableContext { + request_id, + execution_id, + execution_name, + } => { + self.processor.forward_durable_context( + &request_id, + &execution_id, + &execution_name, + ); + } ProcessorCommand::OnOutOfMemoryError { timestamp } => { self.processor.on_out_of_memory_error(timestamp); } diff --git a/bottlecap/src/logs/agent.rs b/bottlecap/src/logs/agent.rs index 71e5a721e..3fcd2d586 100644 --- a/bottlecap/src/logs/agent.rs +++ b/bottlecap/src/logs/agent.rs @@ -12,9 +12,12 @@ use crate::{LAMBDA_RUNTIME_SLUG, config}; const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100); +use crate::logs::lambda::DurableContextUpdate; + #[allow(clippy::module_name_repetitions)] pub struct LogsAgent { rx: mpsc::Receiver, + durable_context_rx: mpsc::Receiver, processor: LogsProcessor, aggregator_handle: AggregatorHandle, cancel_token: CancellationToken, @@ -28,7 +31,7 @@ impl LogsAgent { event_bus: Sender, aggregator_handle: AggregatorHandle, is_managed_instance_mode: bool, - ) -> (Self, Sender) { + ) -> (Self, Sender, Sender) { let processor = LogsProcessor::new( Arc::clone(&datadog_config), tags_provider, @@ -38,16 +41,18 @@ impl LogsAgent { ); let (tx, rx) = mpsc::channel::(1000); + let (durable_context_tx, durable_context_rx) = mpsc::channel::(500); let cancel_token = CancellationToken::new(); let agent = Self { rx, + durable_context_rx, processor, aggregator_handle, cancel_token, }; - (agent, tx) + (agent, tx, durable_context_tx) } pub async fn spin(&mut self) { @@ -56,6 +61,9 @@ impl LogsAgent { Some(event) = self.rx.recv() => { self.processor.process(event, &self.aggregator_handle).await; } + Some(update) = self.durable_context_rx.recv() => { + self.processor.process_durable_context_update(update, &self.aggregator_handle); + } () = self.cancel_token.cancelled() => { debug!("LOGS_AGENT | Received shutdown signal, draining remaining events"); diff --git a/bottlecap/src/logs/aggregator.rs b/bottlecap/src/logs/aggregator.rs index 6d4f8287a..e523e46b8 100644 --- a/bottlecap/src/logs/aggregator.rs +++ b/bottlecap/src/logs/aggregator.rs @@ -106,6 +106,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), @@ -130,6 +131,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), @@ -156,6 +158,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), @@ -196,6 +199,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), diff --git a/bottlecap/src/logs/aggregator_service.rs b/bottlecap/src/logs/aggregator_service.rs index d281bfb19..563bea32f 100644 --- a/bottlecap/src/logs/aggregator_service.rs +++ b/bottlecap/src/logs/aggregator_service.rs @@ -122,6 +122,7 @@ mod tests { lambda: Lambda { arn: "arn".to_string(), request_id: Some("request_id".to_string()), + ..Lambda::default() }, timestamp: 0, status: "status".to_string(), diff --git a/bottlecap/src/logs/lambda/mod.rs b/bottlecap/src/logs/lambda/mod.rs index e196c1ece..0a49c9418 100644 --- a/bottlecap/src/logs/lambda/mod.rs +++ b/bottlecap/src/logs/lambda/mod.rs @@ -2,6 +2,21 @@ use serde::Serialize; pub mod processor; +/// Context extracted from an `aws.lambda` span and forwarded to the logs pipeline. +#[derive(Clone)] +pub struct DurableContextUpdate { + pub request_id: String, + pub execution_id: String, + pub execution_name: String, +} + +/// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`. +#[derive(Clone, Debug)] +pub struct DurableExecutionContext { + pub execution_id: String, + pub execution_name: String, +} + /// /// Intake Log for AWS Lambda Telemetry Events. /// @@ -30,10 +45,14 @@ pub struct Message { pub status: String, } -#[derive(Serialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Debug, Clone, Default, PartialEq)] pub struct Lambda { pub arn: String, pub request_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub durable_execution_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub durable_execution_name: Option, } impl Message { @@ -50,6 +69,7 @@ impl Message { lambda: Lambda { arn: function_arn, request_id, + ..Lambda::default() }, timestamp, status: status.unwrap_or("info".to_string()), diff --git a/bottlecap/src/logs/lambda/processor.rs b/bottlecap/src/logs/lambda/processor.rs index 8325dde32..d71dd526a 100644 --- a/bottlecap/src/logs/lambda/processor.rs +++ b/bottlecap/src/logs/lambda/processor.rs @@ -1,3 +1,4 @@ +use std::collections::{HashMap, VecDeque}; use std::error::Error; use std::fmt::Write; use std::sync::Arc; @@ -15,7 +16,7 @@ use crate::logs::aggregator_service::AggregatorHandle; use crate::logs::processor::{Processor, Rule}; use crate::tags::provider; -use crate::logs::lambda::{IntakeLog, Message}; +use crate::logs::lambda::{DurableExecutionContext, IntakeLog, Message}; #[allow(clippy::module_name_repetitions)] #[derive(Clone, Debug)] @@ -37,8 +38,30 @@ pub struct LambdaProcessor { logs_enabled: bool, // Managed Instance mode is_managed_instance_mode: bool, + // Whether this is a Durable Function runtime. + // None = not yet determined (hold all logs until known). Happens during cold start. + // Some(true) = durable function. Hold logs for a request_id until + // the durable execution id and execution name for this invocation are known. + // These two fields are extracted from the aws.lambda span sent by the tracer. + // Some(false) = not a durable function; mark logs as ready to be aggregated as normal. + is_durable_function: Option, + // Logs held pending resolution, keyed by request_id. + // While is_durable_function is None, every incoming log (except durable execution SDK logs) + // is stashed here so we can decide whether to filter/tag it once the flag is known. + // While is_durable_function is Some(true), logs whose request_id has no + // durable execution context yet are also stashed here; they are drained + // the moment that context arrives. + held_logs: HashMap>, + // Maps request_id -> (durable execution id, durable execution name) + durable_context_map: HashMap, + // Insertion order for FIFO eviction when map reaches capacity + durable_context_order: VecDeque, } +// Matches `lifecycle::invocation::ContextBuffer` default capacity: sized to absorb async +// event backlog where invocation contexts may arrive out of order. +const DURABLE_CONTEXT_MAP_CAPACITY: usize = 500; + const OOM_ERRORS: [&str; 7] = [ "fatal error: runtime: out of memory", // Go "java.lang.OutOfMemoryError", // Java @@ -55,6 +78,19 @@ fn is_oom_error(error_msg: &str) -> bool { .any(|&oom_str| error_msg.contains(oom_str)) } +/// Parses a Lambda durable execution ARN and returns `(execution_id, execution_name)`. +/// +/// Expected format: +/// `arn:aws:lambda:{region}:{account}:function:{name}:{version}/durable-execution/{exec_name}/{exec_id}` +fn parse_durable_execution_arn(arn: &str) -> Option<(String, String)> { + const SEPARATOR: &str = "/durable-execution/"; + let durable_part = arn.split(SEPARATOR).nth(1)?; + let mut parts = durable_part.splitn(2, '/'); + let exec_name = parts.next().filter(|s| !s.is_empty())?.to_string(); + let exec_id = parts.next().filter(|s| !s.is_empty())?.to_string(); + Some((exec_id, exec_name)) +} + /// Maps AWS/common log level strings to Datadog log status values. /// Case-insensitive and accepts both short and long forms /// (e.g. "WARN"/"WARNING", "INFO"/"INFORMATION", "ERR"/"ERROR"). @@ -103,6 +139,10 @@ impl LambdaProcessor { ready_logs: Vec::new(), event_bus, is_managed_instance_mode, + is_durable_function: None, + held_logs: HashMap::new(), + durable_context_map: HashMap::with_capacity(DURABLE_CONTEXT_MAP_CAPACITY), + durable_context_order: VecDeque::with_capacity(DURABLE_CONTEXT_MAP_CAPACITY), } } @@ -111,7 +151,7 @@ impl LambdaProcessor { let copy = event.clone(); match event.record { TelemetryRecord::Function(v) => { - let (request_id, message) = match v { + let (request_id, message, durable_ctx) = match v { serde_json::Value::Object(obj) => { let request_id = if self.is_managed_instance_mode { obj.get("requestId") @@ -121,11 +161,17 @@ impl LambdaProcessor { } else { None }; + // When a message is logged from the durable execution SDK, it contains an `executionArn` field. + // In this case, extract the durable execution context from the `executionArn` field, and later + // set durable execution id and name as log attributes. + let durable_ctx = obj.get("executionArn") + .and_then(|v| v.as_str()) + .and_then(parse_durable_execution_arn); let msg = Some(serde_json::to_string(&obj).unwrap_or_default()); - (request_id, msg) + (request_id, msg, durable_ctx) }, - serde_json::Value::String(s) => (None, Some(s)), - _ => (None, None), + serde_json::Value::String(s) => (None, Some(s), None), + _ => (None, None, None), }; if let Some(message) = message { @@ -136,13 +182,20 @@ impl LambdaProcessor { } } - return Ok(Message::new( + let mut msg = Message::new( message, request_id, self.function_arn.clone(), event.time.timestamp_millis(), None, - )); + ); + // If the message is logged from the durable execution SDK, + // set durable execution id and name as log attributes. + if let Some((exec_id, exec_name)) = durable_ctx { + msg.lambda.durable_execution_id = Some(exec_id); + msg.lambda.durable_execution_name = Some(exec_name); + } + return Ok(msg); } Err("Unable to parse log".into()) @@ -185,6 +238,10 @@ impl LambdaProcessor { let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display + let is_durable = rv.contains("DurableFunction"); + self.is_durable_function = Some(is_durable); + self.resolve_held_logs_on_durable_function_set(is_durable); + Ok(Message::new( format!("INIT_START Runtime Version: {rv} Runtime Version ARN: {rv_arn}"), None, @@ -467,15 +524,177 @@ impl LambdaProcessor { } } + /// Inserts the durable execution context for a `request_id`, received from the `aws.lambda` + /// span. Evicts the oldest entry when the map is at capacity. If `is_durable_function` is + /// already `Some(true)`, drains any held logs for that `request_id`. + pub fn insert_to_durable_context_map( + &mut self, + request_id: &str, // key + execution_id: &str, // value + execution_name: &str, // value + ) { + if self.durable_context_map.contains_key(request_id) { + error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map"); + return; + } + if self.durable_context_order.len() >= DURABLE_CONTEXT_MAP_CAPACITY + && let Some(oldest) = self.durable_context_order.pop_front() + { + self.durable_context_map.remove(&oldest); + } + self.durable_context_order.push_back(request_id.to_string()); + self.durable_context_map.insert( + request_id.to_string(), + DurableExecutionContext { + execution_id: execution_id.to_string(), + execution_name: execution_name.to_string(), + }, + ); + self.drain_held_logs_for_request_id(request_id); + } + + pub fn take_ready_logs(&mut self) -> Vec { + std::mem::take(&mut self.ready_logs) + } + + /// Moves all logs held for `request_id` into `ready_logs`, tagging each with the + /// durable execution context that is now known for that `request_id`. + fn drain_held_logs_for_request_id(&mut self, request_id: &str) { + let Some(held) = self.held_logs.remove(request_id) else { + return; + }; + let durable_ctx = self.durable_context_map.get(request_id).cloned(); + if let Some(ctx) = durable_ctx { + for mut log in held { + log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone()); + log.message.lambda.durable_execution_name = Some(ctx.execution_name.clone()); + if let Ok(s) = serde_json::to_string(&log) { + drop(log); + self.ready_logs.push(s); + } + } + } + } + + /// Called once when `is_durable_function` is set, draining every entry in `held_logs`: + /// - `false` → drain all held logs. + /// - `true` → drain logs whose `request_id` is already in `durable_context_map`, + /// the rest stay in `held_logs` until their context arrives via an `aws.lambda` span. + fn resolve_held_logs_on_durable_function_set(&mut self, is_durable: bool) { + let held = std::mem::take(&mut self.held_logs); + if is_durable { + // Drain logs whose request_id is already in the map. + for (request_id, logs) in held { + let durable_ctx = self.durable_context_map.get(&request_id).cloned(); + if let Some(ctx) = durable_ctx { + // If the request_id is in the durable context map, set durable execution id + // and execution name, and add logs to ready_logs. + for mut log in logs { + log.message.lambda.durable_execution_id = Some(ctx.execution_id.clone()); + log.message.lambda.durable_execution_name = + Some(ctx.execution_name.clone()); + if let Ok(s) = serde_json::to_string(&log) { + self.ready_logs.push(s); + } + } + } else { + // No context yet — keep logs in held_logs until the aws.lambda span arrives. + self.held_logs.insert(request_id, logs); + } + } + } else { + // Drain all held logs. + for (_, logs) in held { + for log in logs { + if let Ok(s) = serde_json::to_string(&log) { + self.ready_logs.push(s); + } + } + } + } + } + /// Processes a log, applies filtering rules, serializes it, and queues it for aggregation fn process_and_queue_log(&mut self, mut log: IntakeLog) { let should_send_log = self.logs_enabled && LambdaProcessor::apply_rules(&self.rules, &mut log.message.message); - if should_send_log && let Ok(serialized_log) = serde_json::to_string(&log) { - // explicitly drop log so we don't accidentally re-use it and push - // duplicate logs to the aggregator - drop(log); - self.ready_logs.push(serialized_log); + if should_send_log { + self.queue_log_after_rules(log); + } + } + + /// Queues a log that has already had processing rules applied. + /// + /// Logs from the durable execution SDK include an `executionArn` field from which + /// `durable_execution_id` and `durable_execution_name` are extracted in `get_message()`. + /// Such logs are pushed directly to `ready_logs` without holding. + /// + /// For all other logs, routing depends on `is_durable_function`: + /// - `None` → stash in `held_logs[request_id]`; logs without a `request_id` are + /// marked as ready to be aggregated since they cannot carry durable context. + /// - `Some(false)` → serialize and push straight to `ready_logs`. + /// - `Some(true)` → mark this log as ready to be aggregated if its `request_id` is already in `durable_context_map` + /// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`. + fn queue_log_after_rules(&mut self, mut log: IntakeLog) { + // Durable execution SDK logs already carry execution context extracted from executionArn. + if log.message.lambda.durable_execution_id.is_some() { + if let Ok(serialized_log) = serde_json::to_string(&log) { + drop(log); + self.ready_logs.push(serialized_log); + } + return; + } + + match self.is_durable_function { + // We don't yet know if this is a durable function. Hold the log until we know. + None => { + if let Some(rid) = log.message.lambda.request_id.clone() { + self.held_logs.entry(rid).or_default().push(log); + } else { + error!("LOGS | queue_log_after_rules: log without request_id: {log:?}"); + drop(log); + } + } + // Not a durable function. Serialize and push the log. + Some(false) => { + if let Ok(serialized_log) = serde_json::to_string(&log) { + // explicitly drop log so we don't accidentally re-use it and push + // duplicate logs to the aggregator + drop(log); + self.ready_logs.push(serialized_log); + } + } + // Durable function. Mark this log as ready to be aggregated if its request_id already has context; otherwise hold. + Some(true) => { + let durable_ctx = log + .message + .lambda + .request_id + .as_ref() + .and_then(|rid| self.durable_context_map.get(rid)) + .cloned(); + + match durable_ctx { + Some(ctx) => { + log.message.lambda.durable_execution_id = Some(ctx.execution_id); + log.message.lambda.durable_execution_name = Some(ctx.execution_name); + if let Ok(serialized_log) = serde_json::to_string(&log) { + // explicitly drop log so we don't accidentally re-use it and push + // duplicate logs to the aggregator + drop(log); + self.ready_logs.push(serialized_log); + } + } + None => { + if let Some(rid) = log.message.lambda.request_id.clone() { + self.held_logs.entry(rid).or_default().push(log); + } else { + error!("LOGS | queue_log_after_rules: log without request_id: {log:?}"); + drop(log); + } + } + } + } } } @@ -569,6 +788,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: None, + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -586,6 +806,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: None, + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -608,6 +829,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: None, + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -628,6 +850,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -653,6 +876,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -678,6 +902,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "error".to_string(), @@ -708,6 +933,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -733,6 +959,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "info".to_string(), @@ -758,6 +985,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "error".to_string(), @@ -783,6 +1011,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "error".to_string(), @@ -808,6 +1037,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_058_627_000, status: "error".to_string(), @@ -888,6 +1118,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1039,6 +1270,7 @@ mod tests { tx.clone(), false, ); + processor.is_durable_function = Some(false); let event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -1059,6 +1291,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1199,6 +1432,7 @@ mod tests { tx.clone(), false, ); + processor.is_durable_function = Some(false); let start_event = TelemetryEvent { time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), @@ -1233,6 +1467,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1248,6 +1483,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1322,6 +1558,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1383,6 +1620,7 @@ mod tests { lambda: Lambda { arn: "test-arn".to_string(), request_id: Some("test-request-id".to_string()), + ..Lambda::default() }, timestamp: 1_673_061_827_000, status: "info".to_string(), @@ -1433,6 +1671,7 @@ mod tests { tx.clone(), false, ); + processor.is_durable_function = Some(false); // First, send an extension log (orphan) that doesn't have a request_id let extension_event = TelemetryEvent { @@ -2123,4 +2362,149 @@ mod tests { let intake_log = processor.get_intake_log(lambda_message).unwrap(); assert_eq!(intake_log.message.status, "warn"); } + + // --- parse_durable_execution_arn --- + + #[test] + fn test_parse_durable_execution_arn_valid() { + let arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:1/durable-execution/my-exec-name/exec-id-abc"; + let result = parse_durable_execution_arn(arn); + assert_eq!( + result, + Some(("exec-id-abc".to_string(), "my-exec-name".to_string())) + ); + } + + #[test] + fn test_parse_durable_execution_arn_latest() { + let arn = "arn:aws:lambda:us-west-2:999999999999:function:fn:$LATEST/durable-execution/exec-name/exec-uuid-123"; + let result = parse_durable_execution_arn(arn); + assert_eq!( + result, + Some(("exec-uuid-123".to_string(), "exec-name".to_string())) + ); + } + + #[test] + fn test_parse_durable_execution_arn_missing_separator() { + let arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:1"; + assert!(parse_durable_execution_arn(arn).is_none()); + } + + #[test] + fn test_parse_durable_execution_arn_missing_exec_id() { + let arn = "arn:aws:lambda:us-east-1:123456789012:function:my-function:1/durable-execution/only-name"; + assert!(parse_durable_execution_arn(arn).is_none()); + } + + // --- executionArn extraction in get_message / queue_log_after_rules --- + + fn make_processor_for_durable_arn_tests() -> LambdaProcessor { + let tags = HashMap::new(); + let config = Arc::new(config::Config { + service: Some("test-service".to_string()), + tags: tags.clone(), + serverless_logs_enabled: true, + ..config::Config::default() + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + let (tx, _) = tokio::sync::mpsc::channel(2); + LambdaProcessor::new(tags_provider, config, tx, false) + } + + #[tokio::test] + async fn test_durable_sdk_log_sets_execution_context_from_execution_arn() { + let mut processor = make_processor_for_durable_arn_tests(); + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::Object(serde_json::from_str( + r#"{"message":"hello","executionArn":"arn:aws:lambda:us-east-1:123:function:fn:1/durable-execution/my-name/my-id"}"# + ).unwrap())), + }; + let msg = processor.get_message(event).await.unwrap(); + assert_eq!(msg.lambda.durable_execution_id, Some("my-id".to_string())); + assert_eq!( + msg.lambda.durable_execution_name, + Some("my-name".to_string()) + ); + } + + #[tokio::test] + async fn test_durable_sdk_log_pushed_to_ready_logs_without_holding() { + let mut processor = make_processor_for_durable_arn_tests(); + // is_durable_function is still None (PlatformInitStart not yet received) + assert!(processor.is_durable_function.is_none()); + // Durable SDK logs arrive during an invocation, so request_id is set + processor.invocation_context.request_id = "req-abc".to_string(); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::Object(serde_json::from_str( + r#"{"message":"hello","executionArn":"arn:aws:lambda:us-east-1:123:function:fn:1/durable-execution/my-name/my-id"}"# + ).unwrap())), + }; + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(async move { aggregator_service.run().await }); + processor.process(event, &aggregator_handle).await; + + // Log should have been sent to aggregator (not held) + assert!(processor.held_logs.is_empty()); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); + let logs: Vec = serde_json::from_slice(&batches[0]).unwrap(); + assert_eq!( + logs[0]["message"]["lambda"]["durable_execution_id"], + "my-id" + ); + assert_eq!( + logs[0]["message"]["lambda"]["durable_execution_name"], + "my-name" + ); + } + + #[tokio::test] + async fn test_durable_sdk_log_pushed_to_ready_logs_even_in_durable_function_mode() { + let mut processor = make_processor_for_durable_arn_tests(); + processor.is_durable_function = Some(true); + processor.invocation_context.request_id = "req-abc".to_string(); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::Object(serde_json::from_str( + r#"{"message":"hello","executionArn":"arn:aws:lambda:us-east-1:123:function:fn:1/durable-execution/my-name/my-id"}"# + ).unwrap())), + }; + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(async move { aggregator_service.run().await }); + processor.process(event, &aggregator_handle).await; + + assert!(processor.held_logs.is_empty()); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert_eq!(batches.len(), 1); + } + + #[tokio::test] + async fn test_function_log_without_execution_arn_is_held_in_durable_mode() { + let mut processor = make_processor_for_durable_arn_tests(); + processor.is_durable_function = Some(true); + // Simulate a known request_id with no durable context yet + processor.invocation_context.request_id = "req-123".to_string(); + + let event = TelemetryEvent { + time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(), + record: TelemetryRecord::Function(Value::String("plain log without ARN".to_string())), + }; + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(async move { aggregator_service.run().await }); + processor.process(event, &aggregator_handle).await; + + // Should be held, not sent to aggregator + assert!(processor.held_logs.contains_key("req-123")); + let batches = aggregator_handle.get_batches().await.unwrap(); + assert!(batches.is_empty()); + } } diff --git a/bottlecap/src/logs/processor.rs b/bottlecap/src/logs/processor.rs index cff6df5f0..9e692b360 100644 --- a/bottlecap/src/logs/processor.rs +++ b/bottlecap/src/logs/processor.rs @@ -1,14 +1,14 @@ use std::sync::Arc; use tokio::sync::mpsc::Sender; -use tracing::debug; +use tracing::{debug, error}; use crate::LAMBDA_RUNTIME_SLUG; use crate::config::{self, processing_rule}; use crate::event_bus::Event; use crate::extension::telemetry::events::TelemetryEvent; use crate::logs::aggregator_service::AggregatorHandle; -use crate::logs::lambda::processor::LambdaProcessor; +use crate::logs::lambda::{DurableContextUpdate, processor::LambdaProcessor}; use crate::tags; impl LogsProcessor { @@ -34,6 +34,7 @@ impl LogsProcessor { } } + // TODO: rename this method to process_telemetry_event() pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) { match self { LogsProcessor::Lambda(lambda_processor) => { @@ -41,6 +42,43 @@ impl LogsProcessor { } } } + + pub fn process_durable_context_update( + &mut self, + update: DurableContextUpdate, + aggregator_handle: &AggregatorHandle, + ) { + self.insert_to_durable_map( + &update.request_id, + &update.execution_id, + &update.execution_name, + ); + let ready_logs = self.take_ready_logs(); + if !ready_logs.is_empty() + && let Err(e) = aggregator_handle.insert_batch(ready_logs) + { + error!("LOGS_PROCESSOR | Failed to insert batch: {}", e); + } + } + + pub fn insert_to_durable_map( + &mut self, + request_id: &str, + execution_id: &str, + execution_name: &str, + ) { + match self { + LogsProcessor::Lambda(p) => { + p.insert_to_durable_context_map(request_id, execution_id, execution_name); + } + } + } + + pub fn take_ready_logs(&mut self) -> Vec { + match self { + LogsProcessor::Lambda(p) => p.take_ready_logs(), + } + } } #[allow(clippy::module_name_repetitions)] diff --git a/bottlecap/src/proxy/interceptor.rs b/bottlecap/src/proxy/interceptor.rs index ce3517c1b..21a018377 100644 --- a/bottlecap/src/proxy/interceptor.rs +++ b/bottlecap/src/proxy/interceptor.rs @@ -499,6 +499,7 @@ mod tests { initialization_type: "on-demand".into(), }); let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); + let (durable_context_tx, _durable_context_rx) = tokio::sync::mpsc::channel(1); let (invocation_processor_handle, invocation_processor_service) = InvocationProcessorService::new( Arc::clone(&tags_provider), @@ -506,6 +507,7 @@ mod tests { Arc::clone(&aws_config), metrics_aggregator, Arc::clone(&propagator), + durable_context_tx, ); tokio::spawn(async move { invocation_processor_service.run().await; diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 6ad5c00cc..6cc708c43 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -591,6 +591,24 @@ impl TraceAgent { { error!("Failed to add tracer span to processor: {}", e); } + + if span.name == "aws.lambda" + && let (Some(request_id), Some(execution_id), Some(execution_name)) = ( + span.meta.get("request_id"), + span.meta.get("durable_function_execution_id"), + span.meta.get("durable_function_execution_name"), + ) + && let Err(e) = invocation_processor_handle + .forward_durable_context( + request_id.clone(), + execution_id.clone(), + execution_name.clone(), + ) + .await + { + error!("Failed to forward durable context to processor: {e}"); + } + handle_reparenting(&mut reparenting_info, &mut span); // Keep the span diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 4b6367c9a..8f619fdb1 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -62,7 +62,7 @@ async fn test_logs() { logs_aggr_service.run().await; }); - let (mut logs_agent, logs_agent_tx) = LogsAgent::new( + let (mut logs_agent, logs_agent_tx, _durable_context_tx) = LogsAgent::new( tags_provider, Arc::clone(&arc_conf), bus_tx.clone(), @@ -75,7 +75,7 @@ async fn test_logs() { LogsFlusher::new(api_key_factory, logs_aggr_handle, arc_conf.clone(), client); let telemetry_events: Vec = serde_json::from_str( - r#"[{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#) + r#"[{"time":"2022-10-21T14:05:03.000Z","type":"platform.initStart","record":{"initializationType":"on-demand","phase":"init"}},{"time":"2022-10-21T14:05:03.165Z","type":"platform.start","record":{"requestId":"459921b5-681c-4a96-beb0-81e0aa586026","version":"$LATEST","tracing":{"spanId":"24cd7d670fa455f0","type":"X-Amzn-Trace-Id","value":"Root=1-6352a70e-1e2c502e358361800241fd45;Parent=35465b3a9e2f7c6a;Sampled=1"}}}]"#) .map_err(|e| e.to_string()).expect("Failed parsing telemetry events"); let sender = logs_agent_tx.clone(); @@ -85,10 +85,9 @@ async fn test_logs() { .send(an_event.clone()) .await .expect("Failed sending telemetry events"); + logs_agent.sync_consume().await; } - logs_agent.sync_consume().await; - let _ = logs_flusher.flush(None).await; hello_mock.assert(); diff --git a/integration-tests/bin/app.ts b/integration-tests/bin/app.ts index a7cb568ee..900cd4027 100644 --- a/integration-tests/bin/app.ts +++ b/integration-tests/bin/app.ts @@ -2,6 +2,7 @@ import 'source-map-support/register'; import * as cdk from 'aws-cdk-lib'; import {Base} from '../lib/stacks/base'; +import {Durable} from '../lib/stacks/durable'; import {Otlp} from '../lib/stacks/otlp'; import {Snapstart} from '../lib/stacks/snapstart'; import {LambdaManagedInstancesStack} from '../lib/stacks/lmi'; @@ -25,6 +26,9 @@ const stacks = [ new Base(app, `integ-${identifier}-base`, { env, }), + new Durable(app, `integ-${identifier}-durable`, { + env, + }), new Otlp(app, `integ-${identifier}-otlp`, { env, }), diff --git a/integration-tests/lambda/durable-python/lambda_durable_function.py b/integration-tests/lambda/durable-python/lambda_durable_function.py new file mode 100644 index 000000000..0e7f5ad10 --- /dev/null +++ b/integration-tests/lambda/durable-python/lambda_durable_function.py @@ -0,0 +1,9 @@ +import logging + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def handler(event, context): + logger.info('Hello from durable function!') + # Return None rather than an HTTP-style dict; the Lambda durable execution + # runtime rejects {'statusCode': 200} with "Invalid Status in invocation output." diff --git a/integration-tests/lambda/durable-python/lambda_non_durable_function.py b/integration-tests/lambda/durable-python/lambda_non_durable_function.py new file mode 100644 index 000000000..ce47095cd --- /dev/null +++ b/integration-tests/lambda/durable-python/lambda_non_durable_function.py @@ -0,0 +1,10 @@ +import logging + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def handler(event, context): + logger.info('Hello from non-durable function!') + return { + 'statusCode': 200 + } diff --git a/integration-tests/lambda/durable-python/requirements.txt b/integration-tests/lambda/durable-python/requirements.txt new file mode 100644 index 000000000..e8e026e23 --- /dev/null +++ b/integration-tests/lambda/durable-python/requirements.txt @@ -0,0 +1 @@ +# No additional requirements needed - datadog-lambda layer provides dependencies diff --git a/integration-tests/lib/stacks/durable.ts b/integration-tests/lib/stacks/durable.ts new file mode 100644 index 000000000..ee252615d --- /dev/null +++ b/integration-tests/lib/stacks/durable.ts @@ -0,0 +1,72 @@ +import * as cdk from 'aws-cdk-lib'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import { Construct } from 'constructs'; +import { + createLogGroup, + defaultDatadogEnvVariables, + defaultDatadogSecretPolicy, + getExtensionLayer, + getDefaultPythonLayer, + defaultPythonRuntime, +} from '../util'; + +export class Durable extends cdk.Stack { + constructor(scope: Construct, id: string, props: cdk.StackProps) { + super(scope, id, props); + + const extensionLayer = getExtensionLayer(this); + const pythonLayer = getDefaultPythonLayer(this); + + // Non-durable Python Lambda - used to verify that logs do NOT have durable execution context. + const pythonFunctionName = `${id}-python-lambda`; + const pythonFunction = new lambda.Function(this, pythonFunctionName, { + runtime: defaultPythonRuntime, + architecture: lambda.Architecture.ARM_64, + handler: 'datadog_lambda.handler.handler', + code: lambda.Code.fromAsset('./lambda/durable-python'), + functionName: pythonFunctionName, + timeout: cdk.Duration.seconds(30), + memorySize: 256, + environment: { + ...defaultDatadogEnvVariables, + DD_SERVICE: pythonFunctionName, + DD_TRACE_ENABLED: 'true', + DD_LAMBDA_HANDLER: 'lambda_non_durable_function.handler', + DD_TRACE_AGENT_URL: 'http://127.0.0.1:8126', + DD_LOG_LEVEL: 'DEBUG', + }, + logGroup: createLogGroup(this, pythonFunctionName), + }); + pythonFunction.addToRolePolicy(defaultDatadogSecretPolicy); + pythonFunction.addLayers(extensionLayer); + pythonFunction.addLayers(pythonLayer); + + // Durable Python Lambda - used to verify that logs DO have durable execution context. + const durablePythonFunctionName = `${id}-python-durable-lambda`; + const durablePythonFunction = new lambda.Function(this, durablePythonFunctionName, { + runtime: defaultPythonRuntime, + architecture: lambda.Architecture.ARM_64, + handler: 'datadog_lambda.handler.handler', + code: lambda.Code.fromAsset('./lambda/durable-python'), + functionName: durablePythonFunctionName, + timeout: cdk.Duration.seconds(30), + memorySize: 256, + environment: { + ...defaultDatadogEnvVariables, + DD_SERVICE: durablePythonFunctionName, + DD_TRACE_ENABLED: 'true', + DD_LAMBDA_HANDLER: 'lambda_durable_function.handler', + DD_TRACE_AGENT_URL: 'http://127.0.0.1:8126', + DD_LOG_LEVEL: 'DEBUG', + }, + logGroup: createLogGroup(this, durablePythonFunctionName), + durableConfig: { + executionTimeout: cdk.Duration.minutes(15), + retentionPeriod: cdk.Duration.days(14), + }, + }); + durablePythonFunction.addToRolePolicy(defaultDatadogSecretPolicy); + durablePythonFunction.addLayers(extensionLayer); + durablePythonFunction.addLayers(pythonLayer); + } +} diff --git a/integration-tests/lib/util.ts b/integration-tests/lib/util.ts index 073a86c14..184f8e8d1 100644 --- a/integration-tests/lib/util.ts +++ b/integration-tests/lib/util.ts @@ -10,12 +10,12 @@ export const datadogSecretArn = process.env.DATADOG_API_SECRET_ARN!; export const extensionLayerArn = process.env.EXTENSION_LAYER_ARN!; export const defaultNodeRuntime = lambda.Runtime.NODEJS_24_X; -export const defaultPythonRuntime = lambda.Runtime.PYTHON_3_13; +export const defaultPythonRuntime = lambda.Runtime.PYTHON_3_14; export const defaultJavaRuntime = lambda.Runtime.JAVA_21; export const defaultDotnetRuntime = lambda.Runtime.DOTNET_8; export const defaultNodeLayerArn = process.env.NODE_TRACER_LAYER_ARN || 'arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Node24-x:132'; -export const defaultPythonLayerArn = process.env.PYTHON_TRACER_LAYER_ARN || 'arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Python313-ARM:117'; +export const defaultPythonLayerArn = process.env.PYTHON_TRACER_LAYER_ARN || 'arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Python314-ARM:123'; export const defaultJavaLayerArn = process.env.JAVA_TRACER_LAYER_ARN || 'arn:aws:lambda:us-east-1:464622532012:layer:dd-trace-java:25'; export const defaultDotnetLayerArn = process.env.DOTNET_TRACER_LAYER_ARN || 'arn:aws:lambda:us-east-1:464622532012:layer:dd-trace-dotnet-ARM:23'; diff --git a/integration-tests/tests/durable.test.ts b/integration-tests/tests/durable.test.ts new file mode 100644 index 000000000..4a1d9eff8 --- /dev/null +++ b/integration-tests/tests/durable.test.ts @@ -0,0 +1,112 @@ +import { invokeLambdaAndGetDatadogData, LambdaInvocationDatadogData } from './utils/util'; +import { publishVersion } from './utils/lambda'; +import { getIdentifier } from '../config'; + +describe('Durable Function Log Tests', () => { + describe('Non-Durable Function', () => { + let result: LambdaInvocationDatadogData; + + beforeAll(async () => { + const identifier = getIdentifier(); + const functionName = `integ-${identifier}-durable-python-lambda`; + + console.log(`CloudWatch log group: /aws/lambda/${functionName}`); + console.log('Invoking non-durable Python Lambda...'); + result = await invokeLambdaAndGetDatadogData(functionName, {}, true); + console.log('Non-durable Lambda invocation and data fetching completed'); + }, 300000); // 5 minute timeout + + it('should invoke Lambda successfully and log message should have request_id but no durable execution context', () => { + expect(result.statusCode).toBe(200); + const log = result.logs?.find((log: any) => + log.message.includes('Hello from non-durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.request_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + }); + + it('platform logs should NOT have durable execution context', () => { + for (const marker of ['START RequestId:', 'END RequestId:']) { + const log = result.logs?.find((log: any) => log.message.includes(marker)); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + } + }); + + it('extension logs should NOT have durable execution context', () => { + // Extension logs (from debug!/info!/warn! in the Rust extension code) are forwarded + // in non-durable mode. They arrive without a request_id initially (orphan logs), then + // inherit the invocation request_id when PlatformStart fires. + // DD_LOG_LEVEL=DEBUG ensures these logs appear in Datadog. + const logs = result.logs?.filter((log: any) => + log.message.includes('DD_EXTENSION') + ); + expect(logs).toBeDefined(); + expect(logs?.length).toBeGreaterThan(0); + for (const log of logs ?? []) { + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeUndefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeUndefined(); + } + }); + }); + + // These tests require a Lambda runtime environment that reports 'DurableFunction' in + // PlatformInitStart.runtime_version so that the extension sets is_durable_function = true + // and activates the log-holding / durable-context-decorating pipeline. + describe('Durable Function', () => { + let result: LambdaInvocationDatadogData; + + beforeAll(async () => { + const identifier = getIdentifier(); + const baseFunctionName = `integ-${identifier}-durable-python-durable-lambda`; + + // Durable functions require a qualified ARN (version or alias); publish a new version + // to obtain a valid qualifier. + console.log('Publishing version for durable Python Lambda...'); + const version = await publishVersion(baseFunctionName); + const functionName = `${baseFunctionName}:${version}`; + + console.log('Invoking durable Python Lambda...'); + result = await invokeLambdaAndGetDatadogData(functionName, {}, false); + console.log('Durable Lambda invocation and data fetching completed'); + }, 300000); // 5 minute timeout + + it('function log should have durable execution context', () => { + const log = result.logs?.find((log: any) => + log.message.includes('Hello from durable function!') + ); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + }); + + it('platform logs should have durable execution context', () => { + for (const marker of ['START RequestId:', 'END RequestId:']) { + const log = result.logs?.find((log: any) => log.message.includes(marker)); + expect(log).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + } + }); + + it('extension logs should have durable execution context', () => { + // Extension logs (debug!/info!/warn! from the Rust extension code) arrive without a + // request_id and are stashed as orphan logs. When PlatformStart fires they inherit the + // invocation request_id, then follow the same held-log path as other durable logs: + // held until the aws.lambda span provides durable execution context, then decorated. + // DD_LOG_LEVEL=DEBUG ensures these logs are emitted and appear in Datadog. + const logs = result.logs?.filter((log: any) => + log.message.includes('DD_EXTENSION') + ); + expect(logs).toBeDefined(); + expect(logs?.length).toBeGreaterThan(0); + for (const log of logs ?? []) { + expect(log?.attributes?.attributes?.lambda?.durable_execution_id).toBeDefined(); + expect(log?.attributes?.attributes?.lambda?.durable_execution_name).toBeDefined(); + } + }); + }); +}); diff --git a/integration-tests/tests/utils/lambda.ts b/integration-tests/tests/utils/lambda.ts index d92ce1351..391d724b0 100644 --- a/integration-tests/tests/utils/lambda.ts +++ b/integration-tests/tests/utils/lambda.ts @@ -4,6 +4,9 @@ const lambdaClient = new LambdaClient({ region: 'us-east-1' }); export interface LambdaInvocationResult { requestId: string; + // Lambda execution request ID extracted from tail logs (START RequestId: ...). + // Undefined for durable functions because they do not support LogType=Tail. + executionRequestId?: string; statusCode: number; payload: any; } @@ -45,19 +48,36 @@ export async function invokeLambda( } let responsePayload; + const rawPayload = new TextDecoder().decode(response.Payload); try { - responsePayload = JSON.parse(new TextDecoder().decode(response.Payload)); + responsePayload = rawPayload ? JSON.parse(rawPayload) : null; console.log(`Response payload: ${JSON.stringify(responsePayload)}`); } catch (error: any) { console.error('Failed to parse response payload:', error.message); - console.log('Raw payload:', new TextDecoder().decode(response.Payload)); - throw error; + console.log('Raw payload:', rawPayload); + responsePayload = null; } const requestId: string = response.$metadata.requestId || ''; + // For durable functions, $metadata.requestId is the HTTP orchestration request ID which + // does NOT match the Lambda execution request ID stored as @lambda.request_id in Datadog. + // Durable functions also do not support LogType=Tail, so LogResult is absent. + // When LogResult is available, extract the real execution request ID from the START line. + let executionRequestId: string | undefined; + if (response.LogResult) { + const decodedLogs = Buffer.from(response.LogResult, 'base64').toString('utf-8'); + console.log(`Tail logs: ${decodedLogs}`); + const startMatch = decodedLogs.match(/START RequestId: ([a-f0-9-]+)/); + if (startMatch?.[1]) { + executionRequestId = startMatch[1]; + console.log(`Extracted execution requestId from tail logs: ${executionRequestId}`); + } + } + return { requestId, + executionRequestId, statusCode: response.StatusCode || 200, payload: responsePayload, }; diff --git a/integration-tests/tests/utils/util.ts b/integration-tests/tests/utils/util.ts index 028ec18c1..0b130ffa8 100644 --- a/integration-tests/tests/utils/util.ts +++ b/integration-tests/tests/utils/util.ts @@ -31,7 +31,10 @@ export async function invokeLambdaAndGetDatadogData( const baseFunctionName = functionName.split(':')[0]; const traces = await getTraces(baseFunctionName, result.requestId); - const logs = await getLogs(baseFunctionName, result.requestId); + // Use the Lambda execution request ID (from tail logs) for log filtering when available. + // For durable functions, tail logs are unsupported and executionRequestId is undefined, + // so getLogs falls back to a service-only query and returns all logs for that function. + const logs = await getLogs(baseFunctionName, result.executionRequestId); const lambdaInvocationData: LambdaInvocationDatadogData = { requestId: result.requestId,