feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95
feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95litianningdatadog wants to merge 41 commits intomainfrom
Conversation
7652f6f to
eda2718
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new datadog-log-agent crate to batch/flush logs to Datadog (or OPW) and wires a LogFlusher into the existing datadog-serverless-compat periodic flush loop.
Changes:
- Added
datadog-log-agentcrate (log entry model, batch aggregator service, flusher, env-based config) plus integration tests. - Integrated log flushing into
datadog-serverless-compatbehindDD_LOGS_ENABLED=true. - Updated workspace/build metadata (Cargo workspace exclude, lockfile, gitignore).
Reviewed changes
Copilot reviewed 16 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/datadog-serverless-compat/src/main.rs | Adds log-agent startup + periodic flush; includes new tests in binary crate. |
| crates/datadog-serverless-compat/Cargo.toml | Adds dependency on datadog-log-agent and dev-deps for new tests. |
| crates/datadog-log-agent/Cargo.toml | New crate manifest with reqwest/serde/tokio/tracing deps and dev-deps. |
| crates/datadog-log-agent/src/lib.rs | New crate root with module exports and re-exports. |
| crates/datadog-log-agent/src/log_entry.rs | Defines LogEntry serde model with flattened runtime attributes + unit tests. |
| crates/datadog-log-agent/src/constants.rs | Adds batching and default config constants (entry count/byte limits, defaults). |
| crates/datadog-log-agent/src/errors.rs | Adds error types for aggregation/flush (and creation). |
| crates/datadog-log-agent/src/config.rs | Adds LogFlusherConfig + from_env() and FlusherMode (Datadog vs OPW). |
| crates/datadog-log-agent/src/aggregator/mod.rs | Aggregator module wiring and public re-exports. |
| crates/datadog-log-agent/src/aggregator/core.rs | Implements in-memory batch aggregation and JSON-array batch building. |
| crates/datadog-log-agent/src/aggregator/service.rs | Adds async command-driven aggregator service + handle API. |
| crates/datadog-log-agent/src/flusher.rs | Implements batch shipping with retry, optional compression, OPW behavior + tests. |
| crates/datadog-log-agent/tests/integration_test.rs | End-to-end integration tests using mockito OPW endpoint. |
| Cargo.toml | Excludes crates/.claude from workspace members. |
| Cargo.lock | Adds datadog-log-agent and serverless-compat dep updates. |
| .gitignore | Ignores /CLAUDE.md. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
…errors Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- add generic LogEntry with flatten attributes for runtime enrichment - add features table and fix zstd compression level docs
…ection Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n tally Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l pattern Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nd OPW mode Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…inary - Clean public re-exports in datadog-log-agent lib.rs - Add datadog-log-agent dependency to datadog-serverless-compat - Wire log agent startup in main.rs following DogStatsD pattern - Respect DD_LOGS_ENABLED env var (default: true) - Use FIPS-compliant HTTP client via create_reqwest_client_builder - Flush logs on same interval as DogStatsD metrics - Add integration test verifying full pipeline compiles and runs - Update CLAUDE.md with log-agent architecture and env vars Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…key, use crate re-exports Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…atching, retries, and OPW mode
- disable log agent by default — require explicit DD_LOGS_ENABLED=true - log the actual error when reqwest client build() fails in start_log_agent - fail fast in start_log_agent when OPW URL is empty - apply rustfmt to integration test formatting.
Cover the full HTTP→LogServer→AggregatorService→LogFlusher→backend pipeline, concurrent client ingestion, and error recovery after a malformed request. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All 4xx responses were previously short-circuited as permanent failures, causing rate-limited (429) and timed-out (408) batches to be silently dropped. These are transient conditions that should go through the existing retry loop. TODO: parse Retry-After header on 429 to add proper backoff. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously the boolean returned by LogFlusher::flush() was silently discarded, giving operators no signal when logs were being dropped. Now logs a warning on each failed flush cycle. TODO: expose as a statsd counter/gauge for durable telemetry. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eader instead of size_hint size_hint().upper() returns None for chunked/streaming bodies; coercing that to u64::MAX caused every request without a Content-Length header to be rejected with 413 before any body bytes were read. Replace the pre-read guard with a direct Content-Length header parse: reject early only when the header is present and exceeds MAX_BODY_BYTES, and fall through to the post-read bytes.len() check otherwise. Adds a regression test that sends a raw Transfer-Encoding: chunked request (no Content-Length) via TcpStream and asserts 200 + correct aggregator insertion.
…t bind-failure gap - Extract DEFAULT_LOG_INTAKE_PORT = 10517 constant (was hardcoded 8080) - Add TODO explaining that LogServer::serve binds inside the spawned task, so a port-conflict failure is silently swallowed while the caller still returns Some(...) and logs "log agent started"
- Replace sequential for-loop over additional_endpoints with join_all() - Add futures crate dependency for join_all - Add unit tests: one verifying all endpoints receive the batch, one using Barrier(2) to prove concurrent in-flight dispatch Rationale: LogFlusherConfig documented additional_endpoints as shipped "in parallel" but the implementation was sequential — this aligns the implementation with the documented contract This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Replace sequential for-batch loop with join_all over all batches - Each batch now ships to primary and extras concurrently in parallel - Collect per-batch primary results via join_all then fold with .all() Rationale: multiple batches were flushed one at a time; concurrent dispatch reduces total flush latency when the aggregator produces more than one batch This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Call resp.bytes().await after receiving a response to consume the body - Ensures the TCP connection is returned to the pool instead of lingering in CLOSE_WAIT, which would exhaust the connection pool under high flush frequency Rationale: reqwest reuses connections only after the response body is fully consumed; skipping this keeps connections open unnecessarily This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Add LogsAdditionalEndpoint {api_key, url, is_reliable} matching the
bottlecap/datadog-agent wire format (Host+Port deserialized to url)
- Add parse_additional_endpoints() and read DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS
in LogFlusherConfig::from_env()
- Update ship_batch to accept explicit api_key param so each additional
endpoint uses its own key instead of the primary key
- Re-export LogsAdditionalEndpoint from crate root
- Update all test fixtures to use the new struct
Rationale: aligns with the datadog-lambda-extension bottlecap model where
each additional endpoint authenticates independently with its own API key
- Replace ? propagation with match in ship_batch compression block - On compress error, warn and send raw bytes without Content-Encoding header - Avoids dropping the batch entirely due to a transient encoder failure Rationale: compression failures are rare (OOM, corrupted encoder state) and silently dropping the batch is worse than sending it uncompressed
…passback - Change flush() -> bool to flush(Vec<RequestBuilder>) -> Vec<RequestBuilder> - send_with_retry returns Err(builder) on transient exhaustion instead of FlushError - serverless-compat flush loop stores and redrives failed builders each cycle - Additional endpoint failures remain best-effort (not tracked for retry) - Add tests: cross-invocation redrive succeeds, additional endpoint failures excluded Rationale: aligns with bottlecap FlushingService retry pattern; batches that hit transient intake errors survive across Lambda invocations instead of being silently dropped after MAX_FLUSH_ATTEMPTS This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Include `[`/`]` (2 bytes) and comma separators in `is_full()` and `get_batch()` overflow guards - Batch wire size could silently exceed MAX_CONTENT_BYTES by up to N+1 bytes Rationale: JSON array framing is part of the wire payload but was not counted in the 5 MB cap checks This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
… check - Old test only asserted String::new() is empty — never called production code - New test calls start_log_agent() with OPW enabled + empty URL and asserts None Rationale: the test was a no-op; it now exercises the actual guard in start_log_agent() This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Delete _assert_log_flusher_constructible and its unused imports - size_of checks never fail and provide no constructibility guarantee Rationale: cargo check and existing integration tests already cover API visibility; the dead function only created maintenance noise This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…ession_level range from docs - Rename LogEntry::new → LogEntry::from_message and update all call sites - Remove inaccurate "1–21" range from compression_level doc comment Rationale: new() implies all fields are provided; from_message makes the partial construction explicit per Rust API Guidelines This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…take_entry - Rename struct, file, and all call sites across the codebase - Name now references the Datadog Logs Intake API format explicitly Rationale: reviewer feedback — the name should reflect the Intake Log format; LogEntry was too generic This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…mmand - Rename LogAggregator → Aggregator and LogAggregatorCommand → AggregatorCommand - No need for Log prefix inside the logs crate Rationale: reviewer feedback — redundant Log prefix within datadog-log-agent crate This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
…] to from_env - FlusherMode renamed to Destination — it describes where logs are sent, not how - #[must_use] added to LogFlusherConfig::from_env() to catch ignored return values Rationale: reviewer feedback — FlusherMode name is misleading; Destination is more accurate This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
341facc to
445909d
Compare
- Wrap env::set_var/remove_var in unsafe blocks (Rust 2024 requirement) - Collapse nested if-let + if into let-chain patterns - Replace match Ok/Err with .err(), if let Err(_) with .is_err() - Use .is_multiple_of() and assert!(!…) idioms - Remove redundant as u32 casts on already-u32 fields - Suppress result_large_err for external figment::Error in test modules - Suppress disallowed_methods for reqwest::Client::builder in tests Rationale: cargo clippy --workspace --all-targets and cargo fmt reported errors and warnings that needed to be resolved This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Add `pub type LogEntry = IntakeEntry` alias - Add `pub type FlusherMode = Destination` alias Rationale: bottlecap's datadog-log-agent adoption (SVLS-8573) was written against these names. The crate used IntakeEntry/Destination but never exported the aliases the plan specified, leaving bottlecap unable to compile against the crate.
Bottlecap now uses IntakeEntry and Destination directly, so the aliases are not needed.
- Renamed crates/datadog-log-agent/ directory to crates/datadog-logs-agent/ - Updated package name in crate Cargo.toml - Updated dependency reference in datadog-serverless-compat/Cargo.toml - Updated all use datadog_log_agent:: identifiers in main.rs, examples, and tests - Updated references in scripts/test-log-intake.sh and AGENTS.md Rationale: Align crate name with the plural "logs" convention used elsewhere in the codebase. This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
563457d to
8cb5daa
Compare
| timestamp: 0, | ||
| }; | ||
| let origin = metric.find_origin(tags).unwrap(); | ||
| assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); |
There was a problem hiding this comment.
Why are we having formatting changes for this? Not related to this PR, but can we standardize this, a bit annoying when looking at diffs for a PR to see what actually changed.
|
@litianningdatadog Is this just a copy paste of what is in the datadog-lambda-extension? |
https://datadoghq.atlassian.net/browse/SVLS-8573
Background
This change converts log intake from an embedded Bottlecap component into a standalone crate. The ultimate goal is to enable serverless-compat to support log forwarding.
Summary
datadog-log-agentlibrary crate providing generic log aggregation and flushing to the Datadog Logs APIdatadog-serverless-compatbinary so serverless runtimes (Lambda, Azure Functions, etc.) can ship logsLogFlusherintodatadog-serverless-compat's flush pipeline alongside traces and metricsThis change is safe to release because DD_LOGS_ENABLED=false by default (i.e., log intake is disabled unless explicitly enabled).
Key components
datadog-log-agentcrate:LogEntry— flat serde struct with standard Datadog fields +#[serde(flatten)]attributes map for runtime-specific enrichment (e.g.lambda.arn,azure.resource_id)LogAggregator— size/count-bounded batch collector (≤1000 entries, ≤5 MB per batch)AggregatorService/AggregatorHandle— channel-based async service; callers insert via handle, flusher drains viaget_batches()LogFlusher— ships batches to/api/v2/logswith zstd compression, configurable retry (3 attempts), OPW mode, and additional endpointsLogFlusherConfig::from_env()— reads all config from environment variables with sensible defaultsdatadog-serverless-compatintegration:start_log_agent()spawns the aggregator and wires aLogFlusherinto the flush pipelineDD_LOGS_ENABLED=trueDD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED/DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URLTest plan
cargo test -p datadog-log-agent(39 tests across aggregator, flusher, config, log_entry)cargo test -p datadog-log-agent --test integration_test(covers full pipeline, batching, retries, OPW mode)Check DD Logs

This change was made with significant help from Claude Code.