Skip to content

feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95

Open
litianningdatadog wants to merge 41 commits intomainfrom
tianning.li/SVLS-8573-datadog-log-agent
Open

feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95
litianningdatadog wants to merge 41 commits intomainfrom
tianning.li/SVLS-8573-datadog-log-agent

Conversation

@litianningdatadog
Copy link
Contributor

@litianningdatadog litianningdatadog commented Mar 10, 2026

https://datadoghq.atlassian.net/browse/SVLS-8573

Background

This change converts log intake from an embedded Bottlecap component into a standalone crate. The ultimate goal is to enable serverless-compat to support log forwarding.

Summary

  • Adds a new datadog-log-agent library crate providing generic log aggregation and flushing to the Datadog Logs API
  • Wires the log agent into the datadog-serverless-compat binary so serverless runtimes (Lambda, Azure Functions, etc.) can ship logs
  • Integrates LogFlusher into datadog-serverless-compat's flush pipeline alongside traces and metrics

This change is safe to release because DD_LOGS_ENABLED=false by default (i.e., log intake is disabled unless explicitly enabled).

Key components

datadog-log-agent crate:

  • LogEntry — flat serde struct with standard Datadog fields + #[serde(flatten)] attributes map for runtime-specific enrichment (e.g. lambda.arn, azure.resource_id)
  • LogAggregator — size/count-bounded batch collector (≤1000 entries, ≤5 MB per batch)
  • AggregatorService / AggregatorHandle — channel-based async service; callers insert via handle, flusher drains via get_batches()
  • LogFlusher — ships batches to /api/v2/logs with zstd compression, configurable retry (3 attempts), OPW mode, and additional endpoints
  • LogFlusherConfig::from_env() — reads all config from environment variables with sensible defaults

datadog-serverless-compat integration:

  • start_log_agent() spawns the aggregator and wires a LogFlusher into the flush pipeline
  • Disabled by default — requires DD_LOGS_ENABLED=true
  • Supports OPW mode via DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED / DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL

Test plan

  • Unit tests: cargo test -p datadog-log-agent (39 tests across aggregator, flusher, config, log_entry)
  • Integration tests: cargo test -p datadog-log-agent --test integration_test (covers full pipeline, batching, retries, OPW mode)
  • Manual tests
# build serverless-compat
cargo build -p datadog-serverless-compat

# start the service
AWS_LAMBDA_FUNCTION_NAME=local-test DD_API_KEY=<YOUR_API_KEY> DD_LOGS_ENABLED=true DD_LOG_LEVEL=debug ./target/debug/datadog-serverless-compat

# pump logs in another terminal
hey -n 10000 -c 50   -m POST  -H "Content-Type: application/json"   -d '[{"message": "stress test log", "timestamp": '$(date +%s000)', "service": "my-fn", "ddsource": "lambda"}]' http://localhost:10517/v1/input

Check DD Logs
image

  • Size impact: local build of release flavor: 5.8MB with the change vs. 5.7MB without the change

This change was made with significant help from Claude Code.

@litianningdatadog litianningdatadog requested review from a team as code owners March 10, 2026 19:06
@litianningdatadog litianningdatadog requested review from Lewis-E and duncanista and removed request for a team March 10, 2026 19:06
@duncanista duncanista requested a review from Copilot March 10, 2026 19:08
@litianningdatadog litianningdatadog marked this pull request as draft March 10, 2026 19:10
@litianningdatadog litianningdatadog force-pushed the tianning.li/SVLS-8573-datadog-log-agent branch from 7652f6f to eda2718 Compare March 10, 2026 19:13
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new datadog-log-agent crate to batch/flush logs to Datadog (or OPW) and wires a LogFlusher into the existing datadog-serverless-compat periodic flush loop.

Changes:

  • Added datadog-log-agent crate (log entry model, batch aggregator service, flusher, env-based config) plus integration tests.
  • Integrated log flushing into datadog-serverless-compat behind DD_LOGS_ENABLED=true.
  • Updated workspace/build metadata (Cargo workspace exclude, lockfile, gitignore).

Reviewed changes

Copilot reviewed 16 out of 18 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
crates/datadog-serverless-compat/src/main.rs Adds log-agent startup + periodic flush; includes new tests in binary crate.
crates/datadog-serverless-compat/Cargo.toml Adds dependency on datadog-log-agent and dev-deps for new tests.
crates/datadog-log-agent/Cargo.toml New crate manifest with reqwest/serde/tokio/tracing deps and dev-deps.
crates/datadog-log-agent/src/lib.rs New crate root with module exports and re-exports.
crates/datadog-log-agent/src/log_entry.rs Defines LogEntry serde model with flattened runtime attributes + unit tests.
crates/datadog-log-agent/src/constants.rs Adds batching and default config constants (entry count/byte limits, defaults).
crates/datadog-log-agent/src/errors.rs Adds error types for aggregation/flush (and creation).
crates/datadog-log-agent/src/config.rs Adds LogFlusherConfig + from_env() and FlusherMode (Datadog vs OPW).
crates/datadog-log-agent/src/aggregator/mod.rs Aggregator module wiring and public re-exports.
crates/datadog-log-agent/src/aggregator/core.rs Implements in-memory batch aggregation and JSON-array batch building.
crates/datadog-log-agent/src/aggregator/service.rs Adds async command-driven aggregator service + handle API.
crates/datadog-log-agent/src/flusher.rs Implements batch shipping with retry, optional compression, OPW behavior + tests.
crates/datadog-log-agent/tests/integration_test.rs End-to-end integration tests using mockito OPW endpoint.
Cargo.toml Excludes crates/.claude from workspace members.
Cargo.lock Adds datadog-log-agent and serverless-compat dep updates.
.gitignore Ignores /CLAUDE.md.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

litianningdatadog and others added 13 commits March 12, 2026 11:29
…errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- add generic LogEntry with flatten attributes for runtime enrichment
- add features table and fix zstd compression level docs
…ection

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n tally

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l pattern

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nd OPW mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…inary

- Clean public re-exports in datadog-log-agent lib.rs
- Add datadog-log-agent dependency to datadog-serverless-compat
- Wire log agent startup in main.rs following DogStatsD pattern
- Respect DD_LOGS_ENABLED env var (default: true)
- Use FIPS-compliant HTTP client via create_reqwest_client_builder
- Flush logs on same interval as DogStatsD metrics
- Add integration test verifying full pipeline compiles and runs
- Update CLAUDE.md with log-agent architecture and env vars

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…key, use crate re-exports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- disable log agent by default — require explicit DD_LOGS_ENABLED=true
- log the actual error when reqwest client build() fails in start_log_agent
- fail fast in start_log_agent when OPW URL is empty
- apply rustfmt to integration test formatting.
litianningdatadog and others added 21 commits March 12, 2026 11:37
Cover the full HTTP→LogServer→AggregatorService→LogFlusher→backend
pipeline, concurrent client ingestion, and error recovery after a
malformed request.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All 4xx responses were previously short-circuited as permanent failures,
causing rate-limited (429) and timed-out (408) batches to be silently
dropped.  These are transient conditions that should go through the
existing retry loop.

TODO: parse Retry-After header on 429 to add proper backoff.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously the boolean returned by LogFlusher::flush() was silently
discarded, giving operators no signal when logs were being dropped.
Now logs a warning on each failed flush cycle.

TODO: expose as a statsd counter/gauge for durable telemetry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eader instead of size_hint

size_hint().upper() returns None for chunked/streaming bodies; coercing
that to u64::MAX caused every request without a Content-Length header to
be rejected with 413 before any body bytes were read.

Replace the pre-read guard with a direct Content-Length header parse:
reject early only when the header is present and exceeds MAX_BODY_BYTES,
and fall through to the post-read bytes.len() check otherwise.

Adds a regression test that sends a raw Transfer-Encoding: chunked
request (no Content-Length) via TcpStream and asserts 200 + correct
aggregator insertion.
…t bind-failure gap

- Extract DEFAULT_LOG_INTAKE_PORT = 10517 constant (was hardcoded 8080)
- Add TODO explaining that LogServer::serve binds inside the spawned task,
  so a port-conflict failure is silently swallowed while the caller still
  returns Some(...) and logs "log agent started"
- Replace sequential for-loop over additional_endpoints with join_all()
- Add futures crate dependency for join_all
- Add unit tests: one verifying all endpoints receive the batch, one
  using Barrier(2) to prove concurrent in-flight dispatch

Rationale: LogFlusherConfig documented additional_endpoints as shipped
"in parallel" but the implementation was sequential — this aligns the
implementation with the documented contract

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Replace sequential for-batch loop with join_all over all batches
- Each batch now ships to primary and extras concurrently in parallel
- Collect per-batch primary results via join_all then fold with .all()

Rationale: multiple batches were flushed one at a time; concurrent
dispatch reduces total flush latency when the aggregator produces
more than one batch

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Call resp.bytes().await after receiving a response to consume the body
- Ensures the TCP connection is returned to the pool instead of lingering
  in CLOSE_WAIT, which would exhaust the connection pool under high flush
  frequency

Rationale: reqwest reuses connections only after the response body is
fully consumed; skipping this keeps connections open unnecessarily

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Add LogsAdditionalEndpoint {api_key, url, is_reliable} matching the
  bottlecap/datadog-agent wire format (Host+Port deserialized to url)
- Add parse_additional_endpoints() and read DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS
  in LogFlusherConfig::from_env()
- Update ship_batch to accept explicit api_key param so each additional
  endpoint uses its own key instead of the primary key
- Re-export LogsAdditionalEndpoint from crate root
- Update all test fixtures to use the new struct

Rationale: aligns with the datadog-lambda-extension bottlecap model where
each additional endpoint authenticates independently with its own API key
- Replace ? propagation with match in ship_batch compression block
- On compress error, warn and send raw bytes without Content-Encoding header
- Avoids dropping the batch entirely due to a transient encoder failure

Rationale: compression failures are rare (OOM, corrupted encoder state)
and silently dropping the batch is worse than sending it uncompressed
…passback

- Change flush() -> bool to flush(Vec<RequestBuilder>) -> Vec<RequestBuilder>
- send_with_retry returns Err(builder) on transient exhaustion instead of FlushError
- serverless-compat flush loop stores and redrives failed builders each cycle
- Additional endpoint failures remain best-effort (not tracked for retry)
- Add tests: cross-invocation redrive succeeds, additional endpoint failures excluded

Rationale: aligns with bottlecap FlushingService retry pattern; batches that
hit transient intake errors survive across Lambda invocations instead of being
silently dropped after MAX_FLUSH_ATTEMPTS

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Include `[`/`]` (2 bytes) and comma separators in `is_full()` and `get_batch()` overflow guards
- Batch wire size could silently exceed MAX_CONTENT_BYTES by up to N+1 bytes

Rationale: JSON array framing is part of the wire payload but was not counted in the 5 MB cap checks

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
… check

- Old test only asserted String::new() is empty — never called production code
- New test calls start_log_agent() with OPW enabled + empty URL and asserts None

Rationale: the test was a no-op; it now exercises the actual guard in start_log_agent()

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Delete _assert_log_flusher_constructible and its unused imports
- size_of checks never fail and provide no constructibility guarantee

Rationale: cargo check and existing integration tests already cover API visibility; the dead function only created maintenance noise

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…ession_level range from docs

- Rename LogEntry::new → LogEntry::from_message and update all call sites
- Remove inaccurate "1–21" range from compression_level doc comment

Rationale: new() implies all fields are provided; from_message makes the partial construction explicit per Rust API Guidelines

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…take_entry

- Rename struct, file, and all call sites across the codebase
- Name now references the Datadog Logs Intake API format explicitly

Rationale: reviewer feedback — the name should reflect the Intake Log format; LogEntry was too generic

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…mmand

- Rename LogAggregator → Aggregator and LogAggregatorCommand → AggregatorCommand
- No need for Log prefix inside the logs crate

Rationale: reviewer feedback — redundant Log prefix within datadog-log-agent crate

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…] to from_env

- FlusherMode renamed to Destination — it describes where logs are sent, not how
- #[must_use] added to LogFlusherConfig::from_env() to catch ignored return values

Rationale: reviewer feedback — FlusherMode name is misleading; Destination is more accurate

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
@litianningdatadog litianningdatadog force-pushed the tianning.li/SVLS-8573-datadog-log-agent branch from 341facc to 445909d Compare March 12, 2026 15:52
- Wrap env::set_var/remove_var in unsafe blocks (Rust 2024 requirement)
- Collapse nested if-let + if into let-chain patterns
- Replace match Ok/Err with .err(), if let Err(_) with .is_err()
- Use .is_multiple_of() and assert!(!…) idioms
- Remove redundant as u32 casts on already-u32 fields
- Suppress result_large_err for external figment::Error in test modules
- Suppress disallowed_methods for reqwest::Client::builder in tests

Rationale: cargo clippy --workspace --all-targets and cargo fmt reported errors and warnings that needed to be resolved

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Add `pub type LogEntry = IntakeEntry` alias
- Add `pub type FlusherMode = Destination` alias

Rationale: bottlecap's datadog-log-agent adoption (SVLS-8573) was
written against these names. The crate used IntakeEntry/Destination
but never exported the aliases the plan specified, leaving bottlecap
unable to compile against the crate.
Bottlecap now uses IntakeEntry and Destination directly,
so the aliases are not needed.
@litianningdatadog litianningdatadog marked this pull request as ready for review March 12, 2026 17:14
- Renamed crates/datadog-log-agent/ directory to crates/datadog-logs-agent/
- Updated package name in crate Cargo.toml
- Updated dependency reference in datadog-serverless-compat/Cargo.toml
- Updated all use datadog_log_agent:: identifiers in main.rs, examples, and tests
- Updated references in scripts/test-log-intake.sh and AGENTS.md

Rationale: Align crate name with the plural "logs" convention used elsewhere in the codebase.

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
@litianningdatadog litianningdatadog force-pushed the tianning.li/SVLS-8573-datadog-log-agent branch from 563457d to 8cb5daa Compare March 12, 2026 20:20
timestamp: 0,
};
let origin = metric.find_origin(tags).unwrap();
assert_eq!(origin.origin_product, OriginProduct::Serverless as u32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we having formatting changes for this? Not related to this PR, but can we standardize this, a bit annoying when looking at diffs for a PR to see what actually changed.

@jchrostek-dd
Copy link
Contributor

@litianningdatadog Is this just a copy paste of what is in the datadog-lambda-extension?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants