Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
05ea425
feat(logs): track durable execution context and gate log flushing on it
lym953 Mar 2, 2026
c34304c
refactor(logs): replace pending_durable_logs vec with held_logs map k…
lym953 Mar 2, 2026
6d7c68a
refactor(logs): remove PlatformStart fallback for unset is_durable_fu…
lym953 Mar 2, 2026
299e972
refactor(logs): extract durable ID map capacity into a named constant
lym953 Mar 2, 2026
98d76af
fix(logs): resolve clippy errors in durable function log processor
lym953 Mar 3, 2026
0ca047e
feat(logs): detect durable function context from aws.lambda span tags
lym953 Mar 3, 2026
a7d0f00
fix(logs): prefix durable execution tags with lambda.
lym953 Mar 3, 2026
6cae5ce
fix(logs): add durable execution context as lambda attributes instead…
lym953 Mar 3, 2026
6a32778
fmt
lym953 Mar 3, 2026
9cfd5ce
chore: remove debug logging added during durable function development
lym953 Mar 3, 2026
59a12bc
Modify comments and rename variables
lym953 Mar 4, 2026
41442f2
Rename DURABLE_CONTEXT_MAP_CAPACITY
lym953 Mar 4, 2026
ae1d91c
Update comments
lym953 Mar 4, 2026
beb39d2
Make resolve_held_logs_on_durable_function_set() take is_durable
lym953 Mar 4, 2026
5211191
Fix comment: flush -> mark as ready to be aggregated
lym953 Mar 4, 2026
09fd968
Rename: update_durable_map() -> self.processor.insert_to_durable_map()
lym953 Mar 4, 2026
3edba2a
Rename a function
lym953 Mar 4, 2026
1204f93
Modify comments. Add error handling.
lym953 Mar 4, 2026
2582c99
fmt
lym953 Mar 4, 2026
6889344
Check error log
lym953 Mar 4, 2026
a72f713
Collapse if condition
lym953 Mar 4, 2026
1e118a7
Add integration test
lym953 Mar 5, 2026
c3626f3
Bump python layer version from 117 to 123
lym953 Mar 5, 2026
3038c3d
Add durable to test-suites.yaml
lym953 Mar 6, 2026
b686408
Add platform and extension log assertions to durable integration tests
lym953 Mar 6, 2026
52094ca
fix(durable-integ-test): correct attribute path and durable invocation
lym953 Mar 6, 2026
ce4d1f8
fix(merge): resolve conflict in main.rs between durable-id-map and main
lym953 Mar 6, 2026
c889d30
fix(durable-integ-test): return None from durable handler to satisfy …
lym953 Mar 6, 2026
fe1ecbe
Change durable context map capacity from 5 to 100
lym953 Mar 6, 2026
15f7d76
feat(logs): fast-path durable SDK logs via executionArn field
lym953 Mar 6, 2026
4f15c7e
fmt
lym953 Mar 6, 2026
b0b08b2
refactor(logs): set DURABLE_CONTEXT_MAP_CAPACITY to 500
lym953 Mar 9, 2026
63f1937
fix(durable-integ-test): use Python 3.14 runtime and layer; log CW lo…
lym953 Mar 9, 2026
daecc9f
fix(logs): destructure durable_context_tx from LogsAgent::new in inte…
lym953 Mar 9, 2026
b0bdec4
fix(integ-tests): use Lambda execution requestId for Datadog log filt…
lym953 Mar 9, 2026
52624e9
refactor: route durable context through lifecycle processor instead o…
lym953 Mar 10, 2026
5d55d8e
fmt
lym953 Mar 10, 2026
6f0b412
fix: update quinn-proto to 0.11.14 (RUSTSEC-2026-0037) and fix clippy…
lym953 Mar 10, 2026
d9e5af3
rename: lambda_function.py → lambda_non_durable_function.py in durabl…
lym953 Mar 11, 2026
96a512a
Modify comment and log message
lym953 Mar 11, 2026
c65b22c
refactor(integ-tests): reduce duplicate code in durable.test.ts
lym953 Mar 11, 2026
93f4a3e
refactor(integ-tests): simplify durable tests to a single invocation
lym953 Mar 11, 2026
207bbf9
refactor: forward durable context via a dedicated ProcessorCommand
lym953 Mar 12, 2026
422e193
Update comments
lym953 Mar 12, 2026
cd0f017
refactor(logs): add process_durable_context_update to LogsProcessor
lym953 Mar 12, 2026
cdbf1e9
fmt
lym953 Mar 12, 2026
f638e6b
fmt and renaming
lym953 Mar 12, 2026
bf94ca5
refactor(logs): introduce DurableContextUpdate and DurableExecutionCo…
lym953 Mar 12, 2026
16ab0da
fmt and change comments
lym953 Mar 12, 2026
e5bcb87
merge: resolve conflict in util.ts — keep env-var overrides, update P…
lym953 Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitlab/datasources/test-suites.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ test_suites:
- name: otlp
- name: snapstart
- name: lmi
- name: durable
15 changes: 3 additions & 12 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 25 additions & 11 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use bottlecap::{
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
},
flusher::LogsFlusher,
lambda::DurableContextUpdate,
},
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
proxy::{interceptor, should_start_proxy},
Expand Down Expand Up @@ -298,15 +299,20 @@ async fn extension_loop_active(
// and shares the connection pool.
let shared_client = bottlecap::http::get_client(config);

let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
aws_config.is_managed_instance_mode(),
&shared_client,
);
let (
logs_agent_channel,
logs_flusher,
logs_agent_cancel_token,
logs_aggregator_handle,
durable_context_tx,
) = start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
aws_config.is_managed_instance_mode(),
&shared_client,
);

let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(
tags_provider.clone(),
Expand All @@ -325,6 +331,7 @@ async fn extension_loop_active(
Arc::clone(&aws_config),
metrics_aggregator_handle.clone(),
Arc::clone(&propagator),
durable_context_tx,
);
tokio::spawn(async move {
invocation_processor_service.run().await;
Expand Down Expand Up @@ -1039,14 +1046,15 @@ fn start_logs_agent(
LogsFlusher,
CancellationToken,
LogsAggregatorHandle,
Sender<DurableContextUpdate>,
) {
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
// Start service in background
tokio::spawn(async move {
aggregator_service.run().await;
});

let (mut agent, tx) = LogsAgent::new(
let (mut agent, tx, durable_context_tx) = LogsAgent::new(
Arc::clone(tags_provider),
Arc::clone(config),
event_bus,
Expand All @@ -1068,7 +1076,13 @@ fn start_logs_agent(
config.clone(),
client.clone(),
);
(tx, flusher, cancel_token, aggregator_handle)
(
tx,
flusher,
cancel_token,
aggregator_handle,
durable_context_tx,
)
}

#[allow(clippy::type_complexity)]
Expand Down
48 changes: 45 additions & 3 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use libdd_trace_protobuf::pb::Span;
use libdd_trace_utils::tracer_header_tags;
use serde_json::Value;
use tokio::time::Instant;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};

use tokio::sync::mpsc;

use crate::{
config::{self, aws::AwsConfig},
Expand All @@ -24,6 +26,7 @@ use crate::{
span_inferrer::{self, SpanInferrer},
triggers::get_default_service_name,
},
logs::lambda::DurableContextUpdate,
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
proc::{
self, CPUData, NetworkData,
Expand Down Expand Up @@ -88,6 +91,10 @@ pub struct Processor {
/// Tracks whether if first invocation after init has been received in Managed Instance mode.
/// Used to determine if we should search for the empty context on an invocation.
awaiting_first_invocation: bool,
/// Sender used to forward durable execution context extracted from `aws.lambda` spans to the
/// logs agent. Decouples the trace agent from the logs agent: the trace agent sends spans
/// to the lifecycle processor, which extracts durable context and relays it here.
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
}

impl Processor {
Expand All @@ -98,6 +105,7 @@ impl Processor {
aws_config: Arc<AwsConfig>,
metrics_aggregator: dogstatsd::aggregator::AggregatorHandle,
propagator: Arc<DatadogCompositePropagator>,
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
) -> Self {
let resource = tags_provider
.get_canonical_resource_name()
Expand Down Expand Up @@ -127,6 +135,7 @@ impl Processor {
dynamic_tags: HashMap::new(),
active_invocations: 0,
awaiting_first_invocation: false,
durable_context_tx,
}
}

Expand Down Expand Up @@ -1342,6 +1351,23 @@ impl Processor {
self.context_buffer.add_tracer_span(request_id, span);
}
}

/// Forwards durable execution context extracted from an `aws.lambda` span to the logs
/// pipeline so it can release held logs and tag them with durable execution metadata.
pub fn forward_durable_context(
&mut self,
request_id: &str,
execution_id: &str,
execution_name: &str,
) {
if let Err(e) = self.durable_context_tx.try_send(DurableContextUpdate {
request_id: request_id.to_owned(),
execution_id: execution_id.to_owned(),
execution_name: execution_name.to_owned(),
}) {
error!("Invocation Processor | Failed to forward durable context to logs agent: {e}");
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1387,7 +1413,15 @@ mod tests {
tokio::spawn(service.run());

let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
Processor::new(tags_provider, config, aws_config, handle, propagator)
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
Processor::new(
tags_provider,
config,
aws_config,
handle,
propagator,
durable_context_tx,
)
}

#[test]
Expand Down Expand Up @@ -1924,7 +1958,15 @@ mod tests {

let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));

let processor = Processor::new(tags_provider, config, aws_config, handle, propagator);
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
let processor = Processor::new(
tags_provider,
config,
aws_config,
handle,
propagator,
durable_context_tx,
);

assert!(
processor.is_managed_instance_mode(),
Expand Down
34 changes: 34 additions & 0 deletions bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
context::{Context, ReparentingInfo},
processor::Processor,
},
logs::lambda::DurableContextUpdate,
tags::provider,
traces::{
context::SpanContext, propagation::DatadogCompositePropagator,
Expand Down Expand Up @@ -111,6 +112,11 @@ pub enum ProcessorCommand {
AddTracerSpan {
span: Box<Span>,
},
ForwardDurableContext {
request_id: String,
execution_id: String,
execution_name: String,
},
OnOutOfMemoryError {
timestamp: i64,
},
Expand Down Expand Up @@ -380,6 +386,21 @@ impl InvocationProcessorHandle {
.await
}

pub async fn forward_durable_context(
&self,
request_id: String,
execution_id: String,
execution_name: String,
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
self.sender
.send(ProcessorCommand::ForwardDurableContext {
request_id,
execution_id,
execution_name,
})
.await
}

pub async fn on_out_of_memory_error(
&self,
timestamp: i64,
Expand Down Expand Up @@ -430,6 +451,7 @@ impl InvocationProcessorService {
aws_config: Arc<AwsConfig>,
metrics_aggregator_handle: AggregatorHandle,
propagator: Arc<DatadogCompositePropagator>,
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
) -> (InvocationProcessorHandle, Self) {
let (sender, receiver) = mpsc::channel(1000);

Expand All @@ -439,6 +461,7 @@ impl InvocationProcessorService {
aws_config,
metrics_aggregator_handle,
propagator,
durable_context_tx,
);

let handle = InvocationProcessorHandle { sender };
Expand Down Expand Up @@ -588,6 +611,17 @@ impl InvocationProcessorService {
ProcessorCommand::AddTracerSpan { span } => {
self.processor.add_tracer_span(&span);
}
ProcessorCommand::ForwardDurableContext {
request_id,
execution_id,
execution_name,
} => {
self.processor.forward_durable_context(
&request_id,
&execution_id,
&execution_name,
);
}
ProcessorCommand::OnOutOfMemoryError { timestamp } => {
self.processor.on_out_of_memory_error(timestamp);
}
Expand Down
12 changes: 10 additions & 2 deletions bottlecap/src/logs/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ use crate::{LAMBDA_RUNTIME_SLUG, config};

const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100);

use crate::logs::lambda::DurableContextUpdate;

#[allow(clippy::module_name_repetitions)]
pub struct LogsAgent {
rx: mpsc::Receiver<TelemetryEvent>,
durable_context_rx: mpsc::Receiver<DurableContextUpdate>,
processor: LogsProcessor,
aggregator_handle: AggregatorHandle,
cancel_token: CancellationToken,
Expand All @@ -28,7 +31,7 @@ impl LogsAgent {
event_bus: Sender<Event>,
aggregator_handle: AggregatorHandle,
is_managed_instance_mode: bool,
) -> (Self, Sender<TelemetryEvent>) {
) -> (Self, Sender<TelemetryEvent>, Sender<DurableContextUpdate>) {
let processor = LogsProcessor::new(
Arc::clone(&datadog_config),
tags_provider,
Expand All @@ -38,16 +41,18 @@ impl LogsAgent {
);

let (tx, rx) = mpsc::channel::<TelemetryEvent>(1000);
let (durable_context_tx, durable_context_rx) = mpsc::channel::<DurableContextUpdate>(500);
let cancel_token = CancellationToken::new();

let agent = Self {
rx,
durable_context_rx,
processor,
aggregator_handle,
cancel_token,
};

(agent, tx)
(agent, tx, durable_context_tx)
}

pub async fn spin(&mut self) {
Expand All @@ -56,6 +61,9 @@ impl LogsAgent {
Some(event) = self.rx.recv() => {
self.processor.process(event, &self.aggregator_handle).await;
}
Some(update) = self.durable_context_rx.recv() => {
self.processor.process_durable_context_update(update, &self.aggregator_handle);
}
() = self.cancel_token.cancelled() => {
debug!("LOGS_AGENT | Received shutdown signal, draining remaining events");

Expand Down
4 changes: 4 additions & 0 deletions bottlecap/src/logs/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand All @@ -130,6 +131,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand All @@ -156,6 +158,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand Down Expand Up @@ -196,6 +199,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/logs/aggregator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ mod tests {
lambda: Lambda {
arn: "arn".to_string(),
request_id: Some("request_id".to_string()),
..Lambda::default()
},
timestamp: 0,
status: "status".to_string(),
Expand Down
Loading
Loading