diff --git a/.gitignore b/.gitignore index df769d29..5c8f66b9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /.idea +/CLAUDE.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..239103c7 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,35 @@ +# Repository Guidelines + +## Project Structure & Module Organization +- `crates/` hosts all Rust workspaces: `datadog-serverless-compat` (binary entrypoint), `datadog-logs-agent`, `datadog-trace-agent`, `dogstatsd`, and `datadog-fips`. Each crate keeps its own `src/`, `tests/`, and `examples/` when needed. +- `docs/` captures design notes such as `DESIGN-LOG-INTAKE.md`; update diagrams here before referencing them in PRs. +- `scripts/` contains developer helpers like `test-log-intake.sh` for spinning up a capture server; treat these as canonical workflow recipes. +- The `target/` directory is build output and should not be checked in. Temporary assets or captures belong under `tmp/` entries ignored by git. + +## Build, Test, and Development Commands +- `cargo build --workspace` — quick validation that every crate compiles together. +- `cargo build -p datadog-serverless-compat --release` — produces the Lambda/Azure bundle with LTO + size optimizations. +- `cargo test --workspace` — runs unit + integration suites; add `-- --ignored` for slow network-reliant cases. +- `cargo fmt --all -- --check` and `cargo clippy --workspace --all-features` — enforce rustfmt/clippy gates that run in CI. +- `./scripts/test-log-intake.sh [--real]` — end-to-end log pipeline test; `--real` pushes to Datadog with `DD_API_KEY` and `DD_SITE`. + +## Coding Style & Naming Conventions +- Standard Rust 4-space indentation, snake_case modules, UpperCamelCase types, and SCREAMING_SNAKE_CASE env vars (e.g., `DD_LOGS_ENABLED`). +- Keep files focused: separate AWS/Azure adapters into their own modules under each crate. +- Always build HTTP clients via `datadog_fips::reqwest_adapter::create_reqwest_client_builder` (clippy enforces this) and document env-driven behavior with doc comments. +- Run `cargo fmt` before committing; do not overwrite generated files under `target/`. + +## Testing Guidelines +- Prefer `#[tokio::test]` async tests for agents, mirroring production runtimes; integration suites live under `crates/datadog-logs-agent/tests/` and similar per crate. +- Name tests by observable behavior (`flushes_batches_on_interval`, `retries_opw_payload`), and add fixtures under each crate’s `tests/data`. +- Cover batching, retry, and configuration fallbacks; when adding env vars, include a table entry in `README.md` plus a test proving defaulting. +- Validate manual flows with `./scripts/test-log-intake.sh` and capture the command output in the PR when changes touch logging. + +## Commit & Pull Request Guidelines +- Follow the existing `(scope): summary` pattern seen in `git log` (`feat(log-agent): derive Clone on LogFlusher`, `fix(log-agent): improve main.rs wiring`). Keep the summary imperative and under ~72 characters. +- Each PR description should include: purpose, testing commands run, affected crates, and links to tracking tickets (SVLS-####) when applicable. Attach screenshots or trace dumps if behavior changes are observable. +- Squash noisy fixups before requesting review, ensure CI (fmt, clippy, tests) is green, and mention any manual steps reviewers must perform (e.g., setting `DD_API_KEY`). + +## Security & Configuration Tips +- Never commit secrets; rely on env vars (`DD_API_KEY`, `DD_SITE`, `DD_LOGS_ENABLED`, `DD_PROXY_HTTPS`) and document defaults in README tables. +- When adding outbound HTTP integrations, reuse the shared `datadog-fips` client so TLS remains FIPS-compliant. Log configuration errors at `warn` level but avoid leaking keys. diff --git a/Cargo.lock b/Cargo.lock index 7216a6aa..d4b8eb60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "datadog-logs-agent" +version = "0.1.0" +dependencies = [ + "datadog-fips", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "mockito", + "reqwest", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", + "zstd", +] + [[package]] name = "datadog-opentelemetry" version = "0.3.0" @@ -513,10 +533,12 @@ name = "datadog-serverless-compat" version = "0.1.0" dependencies = [ "datadog-fips", + "datadog-logs-agent", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils 1.0.0", "reqwest", + "serde_json", "tokio", "tokio-util", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0ce470ad..1c9f528c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,9 @@ resolver = "2" members = [ "crates/*", ] +exclude = [ + "crates/.claude", +] [workspace.package] edition = "2024" diff --git a/crates/datadog-agent-config/env.rs b/crates/datadog-agent-config/env.rs index 80685937..3e74e26e 100644 --- a/crates/datadog-agent-config/env.rs +++ b/crates/datadog-agent-config/env.rs @@ -705,6 +705,7 @@ impl ConfigSource for EnvConfigSource { #[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics #[cfg(test)] +#[allow(clippy::result_large_err)] mod tests { use std::time::Duration; diff --git a/crates/datadog-agent-config/mod.rs b/crates/datadog-agent-config/mod.rs index 2071af9f..0a274637 100644 --- a/crates/datadog-agent-config/mod.rs +++ b/crates/datadog-agent-config/mod.rs @@ -831,6 +831,7 @@ where #[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics #[cfg(test)] +#[allow(clippy::result_large_err)] pub mod tests { use libdd_trace_obfuscation::replacer::parse_rules_from_string; diff --git a/crates/datadog-agent-config/yaml.rs b/crates/datadog-agent-config/yaml.rs index 1be32bd5..7832e67d 100644 --- a/crates/datadog-agent-config/yaml.rs +++ b/crates/datadog-agent-config/yaml.rs @@ -735,6 +735,7 @@ impl ConfigSource for YamlConfigSource { #[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics #[cfg(test)] +#[allow(clippy::result_large_err)] mod tests { use std::path::Path; use std::time::Duration; diff --git a/crates/datadog-logs-agent/Cargo.toml b/crates/datadog-logs-agent/Cargo.toml new file mode 100644 index 00000000..177d5614 --- /dev/null +++ b/crates/datadog-logs-agent/Cargo.toml @@ -0,0 +1,35 @@ +# Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "datadog-logs-agent" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[lib] +bench = false + +[dependencies] +datadog-fips = { path = "../datadog-fips" } +reqwest = { version = "0.12.4", features = ["json", "http2"], default-features = false } +serde = { version = "1.0.197", default-features = false, features = ["derive"] } +serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } +thiserror = { version = "1.0.58", default-features = false } +hyper = { version = "1", features = ["http1", "server"] } +http-body-util = "0.1" +hyper-util = { version = "0.1", features = ["tokio"] } +futures = { version = "0.3", default-features = false, features = ["alloc"] } +tokio = { version = "1.37.0", default-features = false, features = ["sync", "net"] } +tracing = { version = "0.1.40", default-features = false } +zstd = { version = "0.13.3", default-features = false } + +[dev-dependencies] +http = "1" +mockito = { version = "1.5.0", default-features = false } +serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } +reqwest = { version = "0.12.4", features = ["json"], default-features = false } +tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net", "time"] } + +[features] +default = [] diff --git a/crates/datadog-logs-agent/examples/send_logs.rs b/crates/datadog-logs-agent/examples/send_logs.rs new file mode 100644 index 00000000..7d64df9b --- /dev/null +++ b/crates/datadog-logs-agent/examples/send_logs.rs @@ -0,0 +1,196 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Local test helper: inserts sample log entries and flushes them via the log agent pipeline. +//! +//! # Usage +//! +//! ## Flush to a local capture server (recommended for local dev) +//! +//! The easiest way — runs capture server and example together: +//! ./scripts/test-log-intake.sh +//! +//! Or manually in two terminals: +//! +//! In terminal 1 — start the capture server (handles POST, prints JSON): +//! python3 scripts/test-log-intake.sh # not available standalone +//! # Use the script above, or run: python3 -c "$(sed -n '/PYEOF/,/PYEOF/p' scripts/test-log-intake.sh)" +//! +//! In terminal 2 — run this example: +//! DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED=true \ +//! DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL=http://localhost:9999/logs \ +//! DD_API_KEY=local-test-key \ +//! cargo run -p datadog-logs-agent --example send_logs +//! +//! NOTE: `python3 -m http.server` does NOT work — it rejects POST requests. +//! +//! ## Flush to a real Datadog endpoint +//! +//! DD_API_KEY= \ +//! DD_SITE=datadoghq.com \ +//! cargo run -p datadog-logs-agent --example send_logs +//! +//! ## Configuration via env vars +//! +//! | Variable | Default | +//! |--------------------------------------------------|--------------------| +//! | DD_API_KEY | (empty) | +//! | DD_SITE | datadoghq.com | +//! | DD_LOGS_CONFIG_USE_COMPRESSION | true | +//! | DD_LOGS_CONFIG_COMPRESSION_LEVEL | 3 | +//! | DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED | false | +//! | DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL | (empty) | +//! | LOG_ENTRY_COUNT | 5 | + +use datadog_logs_agent::{ + AggregatorService, Destination, IntakeEntry, LogFlusher, LogFlusherConfig, +}; + +#[allow(clippy::disallowed_methods)] // plain reqwest::Client for local testing +#[tokio::main] +async fn main() { + let entry_count: usize = std::env::var("LOG_ENTRY_COUNT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(5); + + let config = LogFlusherConfig::from_env(); + + // Print effective configuration + let (endpoint, compressed) = describe_config(&config); + println!("──────────────────────────────────────────"); + println!(" datadog-logs-agent local test"); + println!("──────────────────────────────────────────"); + println!(" endpoint : {endpoint}"); + println!(" api_key : {}", mask(&config.api_key)); + println!(" compressed : {compressed}"); + println!(" entries : {entry_count}"); + println!("──────────────────────────────────────────"); + + // Start aggregator service + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + // Insert sample entries representing different runtimes + let mut entries = Vec::with_capacity(entry_count); + + for i in 0..entry_count { + let entry = match i % 3 { + 0 => lambda_entry(i), + 1 => azure_entry(i), + _ => plain_entry(i), + }; + entries.push(entry); + } + + println!("\nInserting {entry_count} log entries..."); + handle.insert_batch(entries).expect("insert_batch failed"); + + // Build HTTP client + let client = reqwest::Client::builder() + .timeout(config.flush_timeout) + .build() + .expect("failed to build HTTP client"); + + // Flush + println!("Flushing to {endpoint}..."); + let flusher = LogFlusher::new(config, client, handle); + let failed = flusher.flush(vec![]).await; + + if failed.is_empty() { + println!("\n✓ Flush succeeded"); + } else { + eprintln!("\n✗ Flush failed — check endpoint and API key"); + std::process::exit(1); + } +} + +// ── Sample log entry builders ───────────────────────────────────────────────── + +fn lambda_entry(i: usize) -> IntakeEntry { + let mut attrs = serde_json::Map::new(); + attrs.insert( + "lambda".to_string(), + serde_json::json!({ + "arn": "arn:aws:lambda:us-east-1:123456789012:function:my-fn", + "request_id": format!("req-{i:04}") + }), + ); + IntakeEntry { + message: format!("[lambda] invocation #{i} completed"), + timestamp: now_ms(), + hostname: Some("arn:aws:lambda:us-east-1:123456789012:function:my-fn".to_string()), + service: Some("my-fn".to_string()), + ddsource: Some("lambda".to_string()), + ddtags: Some("env:local,runtime:lambda".to_string()), + status: Some("info".to_string()), + attributes: attrs, + } +} + +fn azure_entry(i: usize) -> IntakeEntry { + let mut attrs = serde_json::Map::new(); + attrs.insert( + "azure".to_string(), + serde_json::json!({ + "resource_id": "/subscriptions/sub-123/resourceGroups/rg/providers/Microsoft.Web/sites/my-fn", + "operation_name": "Microsoft.Web/sites/functions/run/action" + }), + ); + IntakeEntry { + message: format!("[azure] function triggered #{i}"), + timestamp: now_ms(), + hostname: Some("my-azure-fn".to_string()), + service: Some("payments".to_string()), + ddsource: Some("azure-functions".to_string()), + ddtags: Some("env:local,runtime:azure".to_string()), + status: Some("info".to_string()), + attributes: attrs, + } +} + +fn plain_entry(i: usize) -> IntakeEntry { + IntakeEntry { + message: format!("[generic] log message #{i}"), + timestamp: now_ms(), + hostname: Some("localhost".to_string()), + service: Some("test-service".to_string()), + ddsource: Some("rust".to_string()), + ddtags: Some("env:local".to_string()), + status: if i.is_multiple_of(5) { + Some("error".to_string()) + } else { + Some("info".to_string()) + }, + attributes: serde_json::Map::new(), + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + +fn describe_config(config: &LogFlusherConfig) -> (String, bool) { + match &config.mode { + Destination::Datadog => ( + format!("https://http-intake.logs.{}/api/v2/logs", config.site), + config.use_compression, + ), + Destination::ObservabilityPipelinesWorker { url } => (url.clone(), false), + } +} + +fn mask(s: &str) -> String { + if s.is_empty() { + return "(not set)".to_string(); + } + if s.len() <= 8 { + return "*".repeat(s.len()); + } + format!("{}…{}", &s[..4], &s[s.len() - 4..]) +} diff --git a/crates/datadog-logs-agent/src/aggregator/core.rs b/crates/datadog-logs-agent/src/aggregator/core.rs new file mode 100644 index 00000000..97a64f4c --- /dev/null +++ b/crates/datadog-logs-agent/src/aggregator/core.rs @@ -0,0 +1,270 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::VecDeque; + +use crate::constants::{MAX_BATCH_ENTRIES, MAX_CONTENT_BYTES, MAX_LOG_BYTES}; +use crate::errors::AggregatorError; +use crate::intake_entry::IntakeEntry; + +/// In-memory log batch accumulator. +/// +/// Stores pre-serialized JSON strings in a FIFO queue. A batch is "full" +/// when it reaches `MAX_BATCH_ENTRIES` entries or `MAX_CONTENT_BYTES` of +/// uncompressed content. +pub struct Aggregator { + messages: VecDeque, + current_size_bytes: usize, +} + +impl Default for Aggregator { + fn default() -> Self { + Self::new() + } +} + +impl Aggregator { + /// Create a new, empty aggregator. + pub fn new() -> Self { + Self { + messages: VecDeque::new(), + current_size_bytes: 0, + } + } + + /// Insert a log entry into the batch. + /// + /// Returns `Ok(true)` if the batch is now full and ready to flush. + /// Returns `Err(AggregatorError::EntryTooLarge)` if the serialized + /// entry exceeds `MAX_LOG_BYTES` — the entry is dropped. + pub fn insert(&mut self, entry: &IntakeEntry) -> Result { + let serialized = serde_json::to_string(entry)?; + let len = serialized.len(); + + if len > MAX_LOG_BYTES { + return Err(AggregatorError::EntryTooLarge { + size: len, + max: MAX_LOG_BYTES, + }); + } + + self.messages.push_back(serialized); + self.current_size_bytes += len; + + Ok(self.is_full()) + } + + /// Returns `true` if the batch has reached its entry count or byte limit. + /// + /// The byte check accounts for JSON framing: `[` + `]` (2 bytes) plus one + /// comma per entry after the first (`N - 1` bytes). + pub fn is_full(&self) -> bool { + let n = self.messages.len(); + // framing: 2 bytes for `[`/`]` + (n - 1) commas + let framing = if n == 0 { 0 } else { 2 + (n - 1) }; + n >= MAX_BATCH_ENTRIES || self.current_size_bytes + framing >= MAX_CONTENT_BYTES + } + + /// Returns `true` if no log entries are buffered. + pub fn is_empty(&self) -> bool { + self.messages.is_empty() + } + + /// Returns the number of log entries currently buffered. + pub fn len(&self) -> usize { + self.messages.len() + } + + /// Drain up to `MAX_BATCH_ENTRIES` entries and return them as a JSON + /// array (`[entry1,entry2,...]`) encoded as UTF-8 bytes. + /// + /// Returns `None` if the aggregator is empty. + pub fn get_batch(&mut self) -> Option> { + if self.messages.is_empty() { + return None; + } + + let mut output = Vec::new(); + output.push(b'['); + let mut bytes_in_batch: usize = 0; + let mut count: usize = 0; + + loop { + if count >= MAX_BATCH_ENTRIES { + break; + } + + let msg_len = match self.messages.front() { + Some(m) => m.len(), + None => break, + }; + + // Account for the comma separator and the 2-byte `[`/`]` framing. + // Total wire size = bytes_in_batch + (separator) + msg_len + 2 (for `[` and `]`) + let separator = if count == 0 { 0 } else { 1 }; + if bytes_in_batch + separator + msg_len + 2 > MAX_CONTENT_BYTES { + break; + } + + // Safe: we just confirmed front() is Some and we hold &mut self + let msg = match self.messages.pop_front() { + Some(m) => m, + None => break, + }; + + if count > 0 { + output.push(b','); + bytes_in_batch += 1; + } + + self.current_size_bytes = self.current_size_bytes.saturating_sub(msg.len()); + bytes_in_batch += msg.len(); + count += 1; + output.extend_from_slice(msg.as_bytes()); + } + + output.push(b']'); + Some(output) + } + + /// Drain all entries and return them as a `Vec` of JSON array batches. + /// May return multiple batches if the queue exceeds a single batch limit. + pub fn get_all_batches(&mut self) -> Vec> { + let mut batches = Vec::new(); + while let Some(batch) = self.get_batch() { + batches.push(batch); + } + batches + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::intake_entry::IntakeEntry; + + fn make_entry(msg: &str) -> IntakeEntry { + IntakeEntry::from_message(msg, 1_700_000_000_000) + } + + #[test] + fn test_new_aggregator_is_empty() { + let agg = Aggregator::new(); + assert!(agg.is_empty()); + assert_eq!(agg.len(), 0); + } + + #[test] + fn test_insert_single_entry() { + let mut agg = Aggregator::new(); + let full = agg.insert(&make_entry("hello")).expect("insert failed"); + assert!(!full, "single entry should not fill the batch"); + assert_eq!(agg.len(), 1); + assert!(!agg.is_empty()); + } + + #[test] + fn test_get_batch_returns_valid_json_array() { + let mut agg = Aggregator::new(); + agg.insert(&make_entry("line 1")).expect("insert"); + agg.insert(&make_entry("line 2")).expect("insert"); + + let batch = agg.get_batch().expect("should have a batch"); + let parsed: serde_json::Value = serde_json::from_slice(&batch).expect("valid JSON"); + + assert!(parsed.is_array()); + assert_eq!(parsed.as_array().unwrap().len(), 2); + assert_eq!(parsed[0]["message"], "line 1"); + assert_eq!(parsed[1]["message"], "line 2"); + } + + #[test] + fn test_get_batch_drains_aggregator() { + let mut agg = Aggregator::new(); + agg.insert(&make_entry("log")).expect("insert"); + + let _ = agg.get_batch(); + assert!(agg.is_empty(), "aggregator should be empty after get_batch"); + assert!( + agg.get_batch().is_none(), + "second get_batch should return None" + ); + } + + #[test] + fn test_entry_too_large_returns_error() { + let mut agg = Aggregator::new(); + // Construct an entry whose JSON serialization exceeds MAX_LOG_BYTES + let big_message = "x".repeat(crate::constants::MAX_LOG_BYTES + 1); + let entry = make_entry(&big_message); + let result = agg.insert(&entry); + assert!(result.is_err(), "oversized entry should be rejected"); + assert!( + agg.is_empty(), + "aggregator should stay empty after rejection" + ); + } + + #[test] + fn test_insert_returns_true_when_batch_full_by_count() { + use crate::constants::MAX_BATCH_ENTRIES; + let mut agg = Aggregator::new(); + let entry = make_entry("x"); + + for _ in 0..(MAX_BATCH_ENTRIES - 1) { + let full = agg.insert(&entry).expect("insert"); + assert!(!full, "should not be full until last entry"); + } + let full = agg.insert(&entry).expect("insert"); + assert!(full, "should be full after MAX_BATCH_ENTRIES entries"); + assert!(agg.is_full()); + } + + #[test] + fn test_get_all_batches_splits_large_queue() { + use crate::constants::MAX_BATCH_ENTRIES; + let mut agg = Aggregator::new(); + let entry = make_entry("x"); + + for _ in 0..(MAX_BATCH_ENTRIES + 5) { + let _ = agg.insert(&entry); + } + + let batches = agg.get_all_batches(); + assert_eq!(batches.len(), 2, "should produce 2 batches"); + + let first: serde_json::Value = serde_json::from_slice(&batches[0]).expect("json"); + let second: serde_json::Value = serde_json::from_slice(&batches[1]).expect("json"); + assert_eq!(first.as_array().unwrap().len(), MAX_BATCH_ENTRIES); + assert_eq!(second.as_array().unwrap().len(), 5); + } + + #[test] + fn test_get_all_batches_empty_returns_empty_vec() { + let mut agg = Aggregator::new(); + assert!(agg.get_all_batches().is_empty()); + } + + #[test] + fn test_batch_never_exceeds_max_content_bytes() { + // Fill with entries whose sizes sum to just under MAX_CONTENT_BYTES so + // that the framing bytes (`[`, `]`, commas) would push a naive + // implementation over the limit. + let mut agg = Aggregator::new(); + // Each entry's serialized JSON is roughly 50 bytes; pack enough entries + // that their raw sum approaches MAX_CONTENT_BYTES. + let entry = make_entry("x"); + for _ in 0..1000 { + let _ = agg.insert(&entry); + } + + for batch in agg.get_all_batches() { + assert!( + batch.len() <= MAX_CONTENT_BYTES, + "batch size {} exceeds MAX_CONTENT_BYTES {}", + batch.len(), + MAX_CONTENT_BYTES + ); + } + } +} diff --git a/crates/datadog-logs-agent/src/aggregator/mod.rs b/crates/datadog-logs-agent/src/aggregator/mod.rs new file mode 100644 index 00000000..c191ab66 --- /dev/null +++ b/crates/datadog-logs-agent/src/aggregator/mod.rs @@ -0,0 +1,8 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +pub mod core; +pub use core::Aggregator; + +pub mod service; +pub use service::{AggregatorHandle, AggregatorService}; diff --git a/crates/datadog-logs-agent/src/aggregator/service.rs b/crates/datadog-logs-agent/src/aggregator/service.rs new file mode 100644 index 00000000..c4ffc644 --- /dev/null +++ b/crates/datadog-logs-agent/src/aggregator/service.rs @@ -0,0 +1,185 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error, warn}; + +use crate::aggregator::Aggregator; +use crate::intake_entry::IntakeEntry; + +#[derive(Debug)] +enum AggregatorCommand { + InsertBatch(Vec), + GetBatches(oneshot::Sender>>), + Shutdown, +} + +/// Cloneable handle for sending commands to a running [`AggregatorService`]. +#[derive(Clone)] +pub struct AggregatorHandle { + tx: mpsc::UnboundedSender, +} + +impl AggregatorHandle { + /// Queue a batch of log entries for aggregation. + /// + /// Returns an error only if the service has already stopped. + pub fn insert_batch(&self, entries: Vec) -> Result<(), String> { + self.tx + .send(AggregatorCommand::InsertBatch(entries)) + .map_err(|e| format!("failed to send InsertBatch: {e}")) + } + + /// Retrieve and drain all accumulated log batches as JSON arrays. + /// + /// Returns an empty `Vec` if the aggregator holds no logs. + pub async fn get_batches(&self) -> Result>, String> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(AggregatorCommand::GetBatches(tx)) + .map_err(|e| format!("failed to send GetBatches: {e}"))?; + rx.await + .map_err(|e| format!("failed to receive GetBatches response: {e}")) + } + + /// Signal the service to stop processing and exit its run loop. + pub fn shutdown(&self) -> Result<(), String> { + self.tx + .send(AggregatorCommand::Shutdown) + .map_err(|e| format!("failed to send Shutdown: {e}")) + } +} + +/// Background tokio task owning a [`Aggregator`] and processing commands. +/// +/// Create with [`AggregatorService::new`], spawn with `tokio::spawn(service.run())`, +/// and interact via the returned [`AggregatorHandle`]. +pub struct AggregatorService { + aggregator: Aggregator, + rx: mpsc::UnboundedReceiver, +} + +impl AggregatorService { + /// Create a new service and its associated handle. + pub fn new() -> (Self, AggregatorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let service = Self { + aggregator: Aggregator::new(), + rx, + }; + let handle = AggregatorHandle { tx }; + (service, handle) + } + + /// Run the service event loop. + /// + /// Returns when a `Shutdown` command is received or the last handle is dropped. + pub async fn run(mut self) { + debug!("log aggregator service started"); + + while let Some(command) = self.rx.recv().await { + match command { + AggregatorCommand::InsertBatch(entries) => { + for entry in &entries { + if let Err(e) = self.aggregator.insert(entry) { + warn!("dropping log entry: {e}"); + } + } + } + + AggregatorCommand::GetBatches(response_tx) => { + let batches = self.aggregator.get_all_batches(); + if response_tx.send(batches).is_err() { + error!("failed to send GetBatches response — receiver dropped"); + } + } + + AggregatorCommand::Shutdown => { + debug!("log aggregator service shutting down"); + break; + } + } + } + + debug!("log aggregator service stopped"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::intake_entry::IntakeEntry; + + fn make_entry(msg: &str) -> IntakeEntry { + IntakeEntry::from_message(msg, 1_700_000_000_000) + } + + #[tokio::test] + async fn test_insert_and_get_batches_roundtrip() { + let (service, handle) = AggregatorService::new(); + let task = tokio::spawn(service.run()); + + handle + .insert_batch(vec![make_entry("a"), make_entry("b")]) + .expect("insert_batch failed"); + + let batches = handle.get_batches().await.expect("get_batches failed"); + assert_eq!(batches.len(), 1); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("json"); + assert_eq!(arr.as_array().unwrap().len(), 2); + + handle.shutdown().expect("shutdown failed"); + task.await.expect("task panicked"); + } + + #[tokio::test] + async fn test_get_batches_empty_returns_empty_vec() { + let (service, handle) = AggregatorService::new(); + let task = tokio::spawn(service.run()); + + let batches = handle.get_batches().await.expect("get_batches"); + assert!(batches.is_empty()); + + handle.shutdown().expect("shutdown"); + task.await.expect("task"); + } + + #[tokio::test] + async fn test_oversized_entry_dropped_not_panicked() { + let (service, handle) = AggregatorService::new(); + let task = tokio::spawn(service.run()); + + let big = IntakeEntry::from_message("x".repeat(crate::constants::MAX_LOG_BYTES + 1), 0); + handle.insert_batch(vec![big]).expect("send ok"); + + let batches = handle.get_batches().await.expect("get_batches"); + assert!( + batches.is_empty(), + "oversized entry should have been dropped" + ); + + handle.shutdown().expect("shutdown"); + task.await.expect("task"); + } + + #[tokio::test] + async fn test_handle_is_clone_and_both_can_insert() { + let (service, handle) = AggregatorService::new(); + let task = tokio::spawn(service.run()); + + let handle2 = handle.clone(); + handle + .insert_batch(vec![make_entry("from h1")]) + .expect("h1"); + handle2 + .insert_batch(vec![make_entry("from h2")]) + .expect("h2"); + + let batches = handle.get_batches().await.expect("get_batches"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("json"); + assert_eq!(arr.as_array().unwrap().len(), 2); + + handle.shutdown().expect("shutdown"); + task.await.expect("task"); + } +} diff --git a/crates/datadog-logs-agent/src/config.rs b/crates/datadog-logs-agent/src/config.rs new file mode 100644 index 00000000..630e6865 --- /dev/null +++ b/crates/datadog-logs-agent/src/config.rs @@ -0,0 +1,109 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use crate::constants::{DEFAULT_COMPRESSION_LEVEL, DEFAULT_FLUSH_TIMEOUT_SECS, DEFAULT_SITE}; +use crate::logs_additional_endpoint::{LogsAdditionalEndpoint, parse_additional_endpoints}; + +/// Controls where and how logs are shipped. +#[derive(Debug, Clone)] +pub enum Destination { + /// Ship to Datadog Logs API. + /// Endpoint: `https://http-intake.logs.{site}/api/v2/logs` + /// Headers: `DD-API-KEY`, `DD-PROTOCOL: agent-json`, optionally `Content-Encoding: zstd` + Datadog, + + /// Ship to an Observability Pipelines Worker. + /// Endpoint: the provided URL. + /// Headers: `DD-API-KEY` only. Compression is always disabled for OPW. + ObservabilityPipelinesWorker { url: String }, +} + +/// Configuration for [`LogFlusher`](crate::flusher::LogFlusher). +#[derive(Debug, Clone)] +pub struct LogFlusherConfig { + /// Datadog API key. + pub api_key: String, + + /// Datadog site (e.g. "datadoghq.com", "datadoghq.eu"). + pub site: String, + + /// Flusher mode — Datadog vs Observability Pipelines Worker. + pub mode: Destination, + + /// Additional Datadog intake endpoints to ship each batch to in parallel. + /// Each endpoint uses its own API key and full intake URL. + pub additional_endpoints: Vec, + + /// Enable zstd compression (ignored in OPW mode, which is always uncompressed). + pub use_compression: bool, + + /// zstd compression level (ignored when `use_compression` is false). + pub compression_level: i32, + + /// Per-request timeout. + pub flush_timeout: Duration, +} + +impl LogFlusherConfig { + /// Build a config from environment variables, falling back to sensible defaults. + /// + /// | Variable | Default | + /// |---|---| + /// | `DD_API_KEY` | `""` | + /// | `DD_SITE` | `datadoghq.com` | + /// | `DD_LOGS_CONFIG_USE_COMPRESSION` | `true` | + /// | `DD_LOGS_CONFIG_COMPRESSION_LEVEL` | `3` | + /// | `DD_FLUSH_TIMEOUT` | `5` (seconds) | + /// | `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED` | `false` | + /// | `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL` | (none) | + #[must_use] + pub fn from_env() -> Self { + let api_key = std::env::var("DD_API_KEY").unwrap_or_default(); + let site = std::env::var("DD_SITE").unwrap_or_else(|_| DEFAULT_SITE.to_string()); + + let use_compression = std::env::var("DD_LOGS_CONFIG_USE_COMPRESSION") + .map(|v| v.to_lowercase() != "false") + .unwrap_or(true); + + let compression_level = std::env::var("DD_LOGS_CONFIG_COMPRESSION_LEVEL") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_COMPRESSION_LEVEL); + + let flush_timeout_secs = std::env::var("DD_FLUSH_TIMEOUT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_FLUSH_TIMEOUT_SECS); + + let opw_enabled = std::env::var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED") + .map(|v| v.to_lowercase() == "true") + .unwrap_or(false); + + let mode = if opw_enabled { + let url = + std::env::var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL").unwrap_or_default(); + if url.is_empty() { + tracing::warn!( + "OPW mode enabled but DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL is not set — log flush will fail" + ); + } + Destination::ObservabilityPipelinesWorker { url } + } else { + Destination::Datadog + }; + + Self { + api_key, + site, + mode, + additional_endpoints: std::env::var("DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS") + .map(|v| parse_additional_endpoints(&v)) + .unwrap_or_default(), + use_compression, + compression_level, + flush_timeout: Duration::from_secs(flush_timeout_secs), + } + } +} diff --git a/crates/datadog-logs-agent/src/constants.rs b/crates/datadog-logs-agent/src/constants.rs new file mode 100644 index 00000000..018dd510 --- /dev/null +++ b/crates/datadog-logs-agent/src/constants.rs @@ -0,0 +1,20 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +/// Maximum number of log entries per batch. +pub const MAX_BATCH_ENTRIES: usize = 1_000; + +/// Maximum total uncompressed payload size per batch (5 MB). +pub const MAX_CONTENT_BYTES: usize = 5 * 1_024 * 1_024; + +/// Maximum allowed size for a single serialized log entry (1 MB). +pub const MAX_LOG_BYTES: usize = 1_024 * 1_024; + +/// Default Datadog site for log intake. +pub const DEFAULT_SITE: &str = "datadoghq.com"; + +/// Default flush timeout in seconds. +pub const DEFAULT_FLUSH_TIMEOUT_SECS: u64 = 5; + +/// Negative values enable ultra-fast modes. Level 3 is the zstd library default. +pub const DEFAULT_COMPRESSION_LEVEL: i32 = 3; diff --git a/crates/datadog-logs-agent/src/errors.rs b/crates/datadog-logs-agent/src/errors.rs new file mode 100644 index 00000000..50601afa --- /dev/null +++ b/crates/datadog-logs-agent/src/errors.rs @@ -0,0 +1,35 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +/// Errors that can occur when inserting log entries into the aggregator. +#[derive(Debug, thiserror::Error)] +pub enum AggregatorError { + #[error("log entry too large: {size} bytes exceeds max {max} bytes")] + EntryTooLarge { size: usize, max: usize }, + + #[error("failed to serialize log entry: {0}")] + Serialization(#[from] serde_json::Error), +} + +/// Errors that can occur when flushing logs to Datadog. +#[derive(Debug, thiserror::Error)] +pub enum FlushError { + #[error("HTTP request failed: {0}")] + Request(String), + + #[error("server returned permanent error: status {status}")] + PermanentError { status: u16 }, + + #[error("max retries exceeded after {attempts} attempts")] + MaxRetriesExceeded { attempts: u32 }, + + #[error("compression failed: {0}")] + Compression(String), +} + +/// Errors that can occur during crate object creation. +#[derive(Debug, thiserror::Error)] +pub enum CreationError { + #[error("failed to build HTTP client: {0}")] + HttpClient(String), +} diff --git a/crates/datadog-logs-agent/src/flusher.rs b/crates/datadog-logs-agent/src/flusher.rs new file mode 100644 index 00000000..1e20fb97 --- /dev/null +++ b/crates/datadog-logs-agent/src/flusher.rs @@ -0,0 +1,736 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io::Write as _; + +use futures::future::join_all; +use reqwest::Client; +use tracing::{debug, error, warn}; +use zstd::stream::write::Encoder; + +use crate::aggregator::AggregatorHandle; +use crate::config::{Destination, LogFlusherConfig}; +use crate::errors::FlushError; + +/// Maximum number of send attempts before giving up on a batch. +const MAX_FLUSH_ATTEMPTS: u32 = 3; + +/// Drains log batches from an [`AggregatorHandle`] and ships them to Datadog. +#[derive(Clone)] +pub struct LogFlusher { + config: LogFlusherConfig, + client: Client, + aggregator_handle: AggregatorHandle, +} + +impl LogFlusher { + /// Create a new flusher. + /// + /// The `client` **must** be built via + /// [`datadog_fips::reqwest_adapter::create_reqwest_client_builder`] to ensure + /// FIPS-compliant TLS. Never use `reqwest::Client::builder()` directly. + pub fn new( + config: LogFlusherConfig, + client: Client, + aggregator_handle: AggregatorHandle, + ) -> Self { + Self { + config, + client, + aggregator_handle, + } + } + + /// Drain the aggregator, ship all pending batches to Datadog, and redrive any + /// builders that failed transiently in the previous invocation. + /// + /// # Arguments + /// + /// * `retry_requests` — builders returned by a previous `flush` call that + /// exhausted their per-invocation retry budget. They are re-sent before + /// draining new batches from the aggregator. + /// + /// # Returns + /// + /// A vec of `RequestBuilder`s that still failed after all in-call retries. + /// The caller should pass these back on the next invocation to re-attempt + /// delivery. An empty vec means every batch was delivered successfully + /// (or encountered a permanent error and was dropped — those are logged). + /// + /// Failures on additional endpoints are logged as warnings but their + /// builders are not included in the returned vec (best-effort delivery). + pub async fn flush( + &self, + retry_requests: Vec, + ) -> Vec { + let mut failed: Vec = Vec::new(); + + // Redrive builders that failed transiently in the previous invocation. + if !retry_requests.is_empty() { + debug!( + "redriving {} log builder(s) from previous flush", + retry_requests.len() + ); + } + let retry_futures = retry_requests + .into_iter() + .map(|builder| async move { self.send_with_retry(builder).await.err() }); + for b in join_all(retry_futures).await.into_iter().flatten() { + failed.push(b); + } + + // Drain new batches from the aggregator. + let batches = match self.aggregator_handle.get_batches().await { + Ok(b) => b, + Err(e) => { + error!("failed to retrieve log batches from aggregator: {e}"); + return failed; + } + }; + + if batches.is_empty() { + debug!("no log batches to flush"); + return failed; + } + + debug!("flushing {} log batch(es)", batches.len()); + + let (primary_url, use_compression) = self.resolve_endpoint(); + + let batch_futures = batches.iter().map(|batch| { + let primary_url = primary_url.clone(); + async move { + // Primary endpoint — failures are tracked for cross-invocation retry. + let primary_result = self + .ship_batch(batch, &primary_url, use_compression, &self.config.api_key) + .await; + + // Additional endpoints — best-effort; failures are only logged. + let extra_futures = self.config.additional_endpoints.iter().map(|endpoint| { + let url = endpoint.url.clone(); + let api_key = endpoint.api_key.clone(); + async move { + if self + .ship_batch(batch, &url, use_compression, &api_key) + .await + .is_err() + { + warn!( + "failed to ship log batch to additional endpoint {url} after all retries" + ); + } + } + }); + join_all(extra_futures).await; + + primary_result + } + }); + + for result in join_all(batch_futures).await { + if let Err(b) = result { + failed.push(b); + } + } + + failed + } + + fn resolve_endpoint(&self) -> (String, bool) { + match &self.config.mode { + Destination::Datadog => { + let url = format!("https://http-intake.logs.{}/api/v2/logs", self.config.site); + (url, self.config.use_compression) + } + Destination::ObservabilityPipelinesWorker { url } => { + // OPW does not support compression + (url.clone(), false) + } + } + } + + async fn ship_batch( + &self, + batch: &[u8], + url: &str, + compress: bool, + api_key: &str, + ) -> Result<(), reqwest::RequestBuilder> { + let (body, content_encoding) = if compress { + match compress_zstd(batch, self.config.compression_level) { + Ok(compressed) => (compressed, Some("zstd")), + Err(e) => { + warn!("failed to compress log batch, sending uncompressed: {e}"); + (batch.to_vec(), None) + } + } + } else { + (batch.to_vec(), None) + }; + + let mut req = self + .client + .post(url) + .timeout(self.config.flush_timeout) + .header("DD-API-KEY", api_key) + .header("Content-Type", "application/json"); + + if matches!(self.config.mode, Destination::Datadog) { + req = req.header("DD-PROTOCOL", "agent-json"); + } + + if let Some(enc) = content_encoding { + req = req.header("Content-Encoding", enc); + } + + let req = req.body(body); + self.send_with_retry(req).await + } + + /// Send `builder`, retrying transient failures up to `MAX_FLUSH_ATTEMPTS`. + /// + /// # Returns + /// + /// * `Ok(())` — success **or** a permanent error (no point retrying; already + /// logged at `warn!`). + /// * `Err(builder)` — all attempts exhausted on a transient error. The + /// original builder is returned so the caller can retry it next invocation. + async fn send_with_retry( + &self, + builder: reqwest::RequestBuilder, + ) -> Result<(), reqwest::RequestBuilder> { + let mut attempts: u32 = 0; + + loop { + attempts += 1; + + let cloned = match builder.try_clone() { + Some(b) => b, + None => { + // Streaming body — can't clone, can't retry. + warn!("log batch request is not cloneable; dropping batch"); + return Ok(()); + } + }; + + match cloned.send().await { + Ok(resp) => { + let status = resp.status(); + // Drain the body so the underlying TCP connection is + // returned to the pool rather than held in CLOSE_WAIT. + let _ = resp.bytes().await; + + if status.is_success() { + debug!("log batch accepted: {status}"); + return Ok(()); + } + + // Retryable 4xx: treat like transient server errors and + // fall through to the retry loop below. + // 408 = Request Timeout (transient network condition) + // 425 = Too Early (TLS 0-RTT replay rejection) + // 429 = Too Many Requests (intake rate-limiting) + // + // TODO: for 429, parse the `Retry-After` response header + // and sleep for the indicated duration before retrying + // instead of retrying immediately, to avoid hammering the + // intake endpoint while it is still rate-limiting us. + let retryable_4xx = matches!(status.as_u16(), 408 | 425 | 429); + + // Permanent client errors — stop immediately, do not retry. + if status.as_u16() >= 400 && status.as_u16() < 500 && !retryable_4xx { + warn!("permanent error from logs intake: {status}; dropping batch"); + return Ok(()); + } + + // Transient server errors — fall through to retry. + warn!( + "transient error from logs intake: {status} (attempt {attempts}/{MAX_FLUSH_ATTEMPTS})" + ); + } + Err(e) => { + warn!( + "network error sending log batch (attempt {attempts}/{MAX_FLUSH_ATTEMPTS}): {e}" + ); + } + } + + if attempts >= MAX_FLUSH_ATTEMPTS { + warn!("log batch failed after {attempts} attempts; will retry next flush"); + return Err(builder); + } + } + } +} + +fn compress_zstd(data: &[u8], level: i32) -> Result, FlushError> { + let mut encoder = + Encoder::new(Vec::new(), level).map_err(|e| FlushError::Compression(e.to_string()))?; + encoder + .write_all(data) + .map_err(|e| FlushError::Compression(e.to_string()))?; + encoder + .finish() + .map_err(|e| FlushError::Compression(e.to_string())) +} + +#[cfg(test)] +// Tests use plain reqwest client to connect to local mock server +#[allow(clippy::disallowed_methods)] +mod tests { + use super::*; + use crate::aggregator::AggregatorService; + use crate::config::{Destination, LogFlusherConfig}; + use crate::intake_entry::IntakeEntry; + use crate::logs_additional_endpoint::LogsAdditionalEndpoint; + use mockito::Matcher; + use std::time::Duration; + + fn make_entry(msg: &str) -> IntakeEntry { + IntakeEntry::from_message(msg, 1_700_000_000_000) + } + + fn config_for_mock(mock_url: &str) -> LogFlusherConfig { + // Use OPW mode pointing at the mock server to avoid HTTPS + LogFlusherConfig { + api_key: "test-api-key".to_string(), + site: "datadoghq.com".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{mock_url}/api/v2/logs"), + }, + additional_endpoints: Vec::new(), + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + } + } + + #[tokio::test] + async fn test_flush_empty_aggregator_does_not_call_api() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + // Server with no routes — any request would cause test failure + let mock_server = mockito::Server::new_async().await; + let config = config_for_mock(&mock_server.url()); + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle); + + assert!( + flusher.flush(vec![]).await.is_empty(), + "empty flush should succeed" + ); + // No mock assertions needed — absence of request is the assertion + } + + #[tokio::test] + async fn test_flush_sends_post_with_api_key_header() { + // Verify that Datadog mode sends both DD-API-KEY and DD-PROTOCOL: + // agent-json headers. We call ship_batch directly to bypass + // resolve_endpoint (which builds an HTTPS URL incompatible with the + // HTTP mock server). + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut mock_server = mockito::Server::new_async().await; + let mock = mock_server + .mock("POST", "/api/v2/logs") + .match_header("DD-API-KEY", "test-api-key") + .match_header("DD-PROTOCOL", "agent-json") + .with_status(202) + .create_async() + .await; + + let config = LogFlusherConfig { + api_key: "test-api-key".to_string(), + site: "datadoghq.com".to_string(), + mode: Destination::Datadog, + additional_endpoints: Vec::new(), + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle); + + // Call ship_batch directly to use the mock server's HTTP URL instead + // of the HTTPS URL that resolve_endpoint would produce. + let url = format!("{}/api/v2/logs", mock_server.url()); + let batch = b"[{\"message\":\"test\"}]"; + flusher + .ship_batch(batch, &url, false, "test-api-key") + .await + .expect("ship_batch should succeed"); + + mock.assert_async().await; + } + + #[tokio::test] + async fn test_flush_opw_mode_omits_dd_protocol_header() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut mock_server = mockito::Server::new_async().await; + let opw_url = format!("{}/logs", mock_server.url()); + + // Verify DD-PROTOCOL is NOT present in OPW requests + let mock = mock_server + .mock("POST", "/logs") + .match_header("DD-API-KEY", "test-api-key") + .match_header("DD-PROTOCOL", Matcher::Missing) + .with_status(200) + .expect(1) + .create_async() + .await; + + let config = LogFlusherConfig { + api_key: "test-api-key".to_string(), + site: "unused".to_string(), + mode: Destination::ObservabilityPipelinesWorker { url: opw_url }, + additional_endpoints: Vec::new(), + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + + handle + .insert_batch(vec![make_entry("opw log")]) + .expect("insert"); + let result = flusher.flush(vec![]).await; + assert!(result.is_empty(), "OPW flush should return empty on 200"); + mock.assert_async().await; + } + + #[tokio::test] + async fn test_flush_does_not_retry_on_403() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut mock_server = mockito::Server::new_async().await; + // expect(1) means exactly one call — if retried, the test will fail + let mock = mock_server + .mock("POST", "/api/v2/logs") + .with_status(403) + .expect(1) + .create_async() + .await; + + let config = config_for_mock(&mock_server.url()); + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + + handle + .insert_batch(vec![make_entry("log")]) + .expect("insert"); + let result = flusher.flush(vec![]).await; + // 403 is a permanent error — the batch is dropped, no builder to retry. + assert!( + result.is_empty(), + "403 is a permanent error; no builder to retry" + ); + mock.assert_async().await; + } + + /// 429 (Too Many Requests) is a retryable 4xx — the retry loop must + /// continue rather than short-circuiting with a permanent failure. + #[tokio::test] + async fn test_flush_retries_on_429_then_succeeds() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut mock_server = mockito::Server::new_async().await; + // First call → 429, second call → 200 + let _throttled = mock_server + .mock("POST", "/api/v2/logs") + .with_status(429) + .expect(1) + .create_async() + .await; + let _ok = mock_server + .mock("POST", "/api/v2/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let config = config_for_mock(&mock_server.url()); + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + + handle + .insert_batch(vec![make_entry("throttled log")]) + .expect("insert"); + let result = flusher.flush(vec![]).await; + assert!(result.is_empty(), "should succeed after 429 retry"); + } + + #[tokio::test] + async fn test_flush_retries_on_5xx_then_succeeds() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut mock_server = mockito::Server::new_async().await; + // First call → 500, second call → 202 + let _fail_mock = mock_server + .mock("POST", "/api/v2/logs") + .with_status(500) + .expect(1) + .create_async() + .await; + let _ok_mock = mock_server + .mock("POST", "/api/v2/logs") + .with_status(202) + .expect(1) + .create_async() + .await; + + let config = config_for_mock(&mock_server.url()); + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + + handle + .insert_batch(vec![make_entry("log")]) + .expect("insert"); + let result = flusher.flush(vec![]).await; + assert!(result.is_empty(), "should succeed on second attempt"); + } + + /// All additional endpoints receive the same batch when flush() is called. + #[tokio::test] + async fn test_additional_endpoints_all_receive_batch() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut primary = mockito::Server::new_async().await; + let mut extra1 = mockito::Server::new_async().await; + let mut extra2 = mockito::Server::new_async().await; + + let primary_mock = primary + .mock("POST", "/api/v2/logs") + .with_status(202) + .expect(1) + .create_async() + .await; + let extra1_mock = extra1 + .mock("POST", "/extra") + .with_status(200) + .expect(1) + .create_async() + .await; + let extra2_mock = extra2 + .mock("POST", "/extra") + .with_status(200) + .expect(1) + .create_async() + .await; + + let config = LogFlusherConfig { + api_key: "key".to_string(), + site: "datadoghq.com".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/api/v2/logs", primary.url()), + }, + additional_endpoints: vec![ + LogsAdditionalEndpoint { + api_key: "extra-key-1".to_string(), + url: format!("{}/extra", extra1.url()), + is_reliable: true, + }, + LogsAdditionalEndpoint { + api_key: "extra-key-2".to_string(), + url: format!("{}/extra", extra2.url()), + is_reliable: true, + }, + ], + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + handle.insert_batch(vec![make_entry("hi")]).expect("insert"); + + assert!(flusher.flush(vec![]).await.is_empty()); + primary_mock.assert_async().await; + extra1_mock.assert_async().await; + extra2_mock.assert_async().await; + } + + /// Additional endpoints are dispatched concurrently: if they were sequential, + /// two endpoints each waiting at a Barrier(2) would deadlock — only concurrent + /// dispatch lets both handlers reach the barrier simultaneously. + #[tokio::test] + async fn test_additional_endpoints_dispatched_concurrently() { + use std::sync::Arc; + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + use tokio::net::TcpListener; + use tokio::sync::Barrier; + + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let barrier = Arc::new(Barrier::new(2)); + + // Spawn a minimal HTTP server that waits at the barrier before + // responding, so both must be in-flight at the same time to complete. + async fn serve_once(barrier: Arc) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + barrier.wait().await; + let _ = stream + .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n") + .await; + }); + format!("http://127.0.0.1:{}/logs", addr.port()) + } + + let url1 = serve_once(barrier.clone()).await; + let url2 = serve_once(barrier.clone()).await; + + let mut primary = mockito::Server::new_async().await; + let _primary_mock = primary + .mock("POST", "/api/v2/logs") + .with_status(202) + .create_async() + .await; + + let config = LogFlusherConfig { + api_key: "key".to_string(), + site: "datadoghq.com".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/api/v2/logs", primary.url()), + }, + additional_endpoints: vec![ + LogsAdditionalEndpoint { + api_key: "extra-key-1".to_string(), + url: url1, + is_reliable: true, + }, + LogsAdditionalEndpoint { + api_key: "extra-key-2".to_string(), + url: url2, + is_reliable: true, + }, + ], + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + handle + .insert_batch(vec![make_entry("concurrent")]) + .expect("insert"); + + assert!(flusher.flush(vec![]).await.is_empty()); + } + + /// A builder returned by `flush` can be redriven on the next call. + /// + /// The mock fails on the first 3 attempts (exhausting the per-invocation + /// retry budget), then succeeds on the 4th attempt (the next invocation). + /// This proves the cross-invocation retry path end-to-end. + #[tokio::test] + async fn test_cross_invocation_retry_delivers_on_redrive() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut mock_server = mockito::Server::new_async().await; + // First 3 calls: transient 503 → exhausts per-invocation retry budget + let _fail_mock = mock_server + .mock("POST", "/api/v2/logs") + .with_status(503) + .expect(3) + .create_async() + .await; + // 4th call: redriven on the next flush → succeeds + let _ok_mock = mock_server + .mock("POST", "/api/v2/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let config = config_for_mock(&mock_server.url()); + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + handle + .insert_batch(vec![make_entry("retry-me")]) + .expect("insert"); + + // First flush: all 3 attempts fail → returns the builder for retry. + let failed = flusher.flush(vec![]).await; + assert_eq!(failed.len(), 1, "one builder should be returned for retry"); + + // Second flush: aggregator is empty; redrives the failed builder → succeeds. + let result = flusher.flush(failed).await; + assert!( + result.is_empty(), + "redriven builder should succeed on the next invocation" + ); + } + + /// Additional-endpoint failures are best-effort: their builders are NOT + /// included in the returned retry vec, even when they exhaust all retries. + /// Only primary-endpoint failures are tracked for cross-invocation retry. + #[tokio::test] + async fn test_additional_endpoint_failures_not_tracked_for_retry() { + let (service, handle) = AggregatorService::new(); + let _task = tokio::spawn(service.run()); + + let mut primary = mockito::Server::new_async().await; + let mut extra = mockito::Server::new_async().await; + + let _primary_mock = primary + .mock("POST", "/api/v2/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + // Additional endpoint always returns 503 — exhausts per-invocation retries. + let _extra_mock = extra + .mock("POST", "/extra") + .with_status(503) + .expect(3) // MAX_FLUSH_ATTEMPTS + .create_async() + .await; + + let config = LogFlusherConfig { + api_key: "key".to_string(), + site: "datadoghq.com".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/api/v2/logs", primary.url()), + }, + additional_endpoints: vec![LogsAdditionalEndpoint { + api_key: "extra-key".to_string(), + url: format!("{}/extra", extra.url()), + is_reliable: true, + }], + use_compression: false, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + + let client = reqwest::Client::builder().build().expect("client"); + let flusher = LogFlusher::new(config, client, handle.clone()); + handle + .insert_batch(vec![make_entry("test")]) + .expect("insert"); + + let result = flusher.flush(vec![]).await; + assert!( + result.is_empty(), + "additional-endpoint failures are best-effort and must not be tracked for retry" + ); + } +} diff --git a/crates/datadog-logs-agent/src/intake_entry.rs b/crates/datadog-logs-agent/src/intake_entry.rs new file mode 100644 index 00000000..2ec01867 --- /dev/null +++ b/crates/datadog-logs-agent/src/intake_entry.rs @@ -0,0 +1,188 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +/// A single log entry in the Datadog Logs intake format. +/// +/// Standard Datadog fields are typed fields. Runtime-specific enrichment +/// (e.g. `{"lambda": {"arn": "...", "request_id": "..."}}` for Lambda, +/// `{"azure": {"resource_id": "..."}}` for Azure Functions) goes in `attributes`, +/// which is flattened into the JSON object at serialization time. +/// +/// # Example — Lambda extension consumer +/// ```ignore +/// let mut attrs = serde_json::Map::new(); +/// attrs.insert("lambda".to_string(), serde_json::json!({ +/// "arn": function_arn, +/// "request_id": request_id, +/// })); +/// let entry = IntakeEntry { +/// message: log_line, +/// timestamp: timestamp_ms, +/// hostname: Some(function_arn), +/// service: Some(service_name), +/// ddsource: Some("lambda".to_string()), +/// ddtags: Some(tags), +/// status: Some("info".to_string()), +/// attributes: attrs, +/// }; +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IntakeEntry { + /// The log message body. + pub message: String, + + /// Unix timestamp in milliseconds. + pub timestamp: i64, + + /// The hostname (e.g. Lambda function ARN, Azure resource ID). + #[serde(skip_serializing_if = "Option::is_none")] + pub hostname: Option, + + /// The service name. + #[serde(skip_serializing_if = "Option::is_none")] + pub service: Option, + + /// The log source tag (e.g. "lambda", "azure-functions", "gcp-functions"). + #[serde(skip_serializing_if = "Option::is_none")] + pub ddsource: Option, + + /// Comma-separated Datadog tags (e.g. "env:prod,version:1.0"). + #[serde(skip_serializing_if = "Option::is_none")] + pub ddtags: Option, + + /// Log level / status (e.g. "info", "error", "warn", "debug"). + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + + /// Runtime-specific enrichment fields, flattened into the JSON object. + /// Use this for fields like `{"lambda": {"arn": "...", "request_id": "..."}}`. + /// An empty map is not serialized. Extra fields in JSON are collected here on deserialization. + #[serde(flatten, default, skip_serializing_if = "serde_json::Map::is_empty")] + pub attributes: serde_json::Map, +} + +impl IntakeEntry { + /// Create a minimal log entry from a message and timestamp. + /// All optional fields default to `None`; use struct literal syntax to set them. + pub fn from_message(message: impl Into, timestamp: i64) -> Self { + Self { + message: message.into(), + timestamp, + hostname: None, + service: None, + ddsource: None, + ddtags: None, + status: None, + attributes: serde_json::Map::new(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_intake_entry_minimal_serialization() { + let entry = IntakeEntry::from_message("hello world", 1_700_000_000_000); + let json = serde_json::to_string(&entry).expect("serialize"); + let v: serde_json::Value = serde_json::from_str(&json).expect("parse"); + + assert_eq!(v["message"], "hello world"); + assert_eq!(v["timestamp"], 1_700_000_000_000_i64); + // Optional fields must be absent when None + assert!(v.get("hostname").is_none()); + assert!(v.get("service").is_none()); + assert!(v.get("ddsource").is_none()); + assert!(v.get("ddtags").is_none()); + assert!(v.get("status").is_none()); + } + + #[test] + fn test_intake_entry_full_serialization() { + let entry = IntakeEntry { + message: "user logged in".to_string(), + timestamp: 1_700_000_001_000, + hostname: Some("my-host".to_string()), + service: Some("my-service".to_string()), + ddsource: Some("lambda".to_string()), + ddtags: Some("env:prod,version:1.0".to_string()), + status: Some("info".to_string()), + attributes: serde_json::Map::new(), + }; + let json = serde_json::to_string(&entry).expect("serialize"); + let v: serde_json::Value = serde_json::from_str(&json).expect("parse"); + + assert_eq!(v["message"], "user logged in"); + assert_eq!(v["hostname"], "my-host"); + assert_eq!(v["service"], "my-service"); + assert_eq!(v["ddsource"], "lambda"); + assert_eq!(v["ddtags"], "env:prod,version:1.0"); + assert_eq!(v["status"], "info"); + assert!( + v.get("attributes").is_none(), + "empty attributes must not appear in output" + ); + } + + #[test] + fn test_intake_entry_with_lambda_attributes_flattened() { + // Simulates what the lambda extension would build + let mut attrs = serde_json::Map::new(); + attrs.insert( + "lambda".to_string(), + serde_json::json!({ + "arn": "arn:aws:lambda:us-east-1:123456789012:function:my-fn", + "request_id": "abc-123" + }), + ); + let entry = IntakeEntry { + message: "function invoked".to_string(), + timestamp: 1_700_000_002_000, + hostname: Some("arn:aws:lambda:us-east-1:123456789012:function:my-fn".to_string()), + service: Some("my-fn".to_string()), + ddsource: Some("lambda".to_string()), + ddtags: Some("env:prod".to_string()), + status: Some("info".to_string()), + attributes: attrs, + }; + let json = serde_json::to_string(&entry).expect("serialize"); + let v: serde_json::Value = serde_json::from_str(&json).expect("parse"); + + // Lambda-specific fields appear at top level (flattened) + assert_eq!( + v["lambda"]["arn"], + "arn:aws:lambda:us-east-1:123456789012:function:my-fn" + ); + assert_eq!(v["lambda"]["request_id"], "abc-123"); + assert_eq!(v["message"], "function invoked"); + } + + #[test] + fn test_intake_entry_deserialization_roundtrip() { + let original = IntakeEntry { + message: "test".to_string(), + timestamp: 42, + hostname: Some("h".to_string()), + service: None, + ddsource: Some("gcp-functions".to_string()), + ddtags: None, + status: Some("error".to_string()), + attributes: serde_json::Map::new(), + }; + let json = serde_json::to_string(&original).expect("serialize"); + let restored: IntakeEntry = serde_json::from_str(&json).expect("deserialize"); + + assert_eq!(restored.message, original.message); + assert_eq!(restored.timestamp, original.timestamp); + assert_eq!(restored.hostname, original.hostname); + assert_eq!(restored.ddsource, original.ddsource); + assert_eq!(restored.status, original.status); + assert!( + restored.attributes.is_empty(), + "no extra attributes expected after roundtrip" + ); + } +} diff --git a/crates/datadog-logs-agent/src/lib.rs b/crates/datadog-logs-agent/src/lib.rs new file mode 100644 index 00000000..ef1c4542 --- /dev/null +++ b/crates/datadog-logs-agent/src/lib.rs @@ -0,0 +1,26 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod aggregator; +pub mod config; +pub mod constants; +pub mod errors; +pub mod flusher; +pub mod intake_entry; +pub mod logs_additional_endpoint; + +pub mod server; + +// Re-export the most commonly used types at the crate root +pub use aggregator::{AggregatorHandle, AggregatorService}; +pub use config::{Destination, LogFlusherConfig}; +pub use flusher::LogFlusher; +pub use intake_entry::IntakeEntry; +pub use logs_additional_endpoint::LogsAdditionalEndpoint; +pub use server::{LogServer, LogServerConfig}; diff --git a/crates/datadog-logs-agent/src/logs_additional_endpoint.rs b/crates/datadog-logs-agent/src/logs_additional_endpoint.rs new file mode 100644 index 00000000..9413e0b1 --- /dev/null +++ b/crates/datadog-logs-agent/src/logs_additional_endpoint.rs @@ -0,0 +1,104 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use serde::Deserialize; +use tracing::warn; + +/// An additional Datadog intake endpoint to ship each log batch to alongside +/// the primary endpoint. +/// +/// The JSON wire format (from `DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS`) matches the +/// bottlecap / datadog-agent convention: +/// ```json +/// [{"api_key":"","Host":"agent-http-intake.logs.datadoghq.com","Port":443,"is_reliable":true}] +/// ``` +#[derive(Debug, PartialEq, Clone)] +pub struct LogsAdditionalEndpoint { + /// API key used exclusively for this endpoint. + pub api_key: String, + /// Full intake URL, e.g. `https://agent-http-intake.logs.datadoghq.com:443/api/v2/logs`. + /// Computed from `Host` and `Port` at deserialize time. + pub url: String, + /// When `true`, failures on this endpoint are counted toward overall flush reliability. + /// Currently stored but not yet acted upon; reserved for future use. + pub is_reliable: bool, +} + +/// Internal representation that mirrors the JSON wire format. +#[derive(Deserialize)] +struct RawEndpoint { + api_key: String, + #[serde(rename = "Host")] + host: String, + #[serde(rename = "Port")] + port: u32, + is_reliable: bool, +} + +impl From for LogsAdditionalEndpoint { + fn from(r: RawEndpoint) -> Self { + Self { + api_key: r.api_key, + url: format!("https://{}:{}/api/v2/logs", r.host, r.port), + is_reliable: r.is_reliable, + } + } +} + +/// Parse the value of `DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS` (a JSON array string). +/// +/// Returns an empty `Vec` and emits a warning on parse failure. +pub fn parse_additional_endpoints(s: &str) -> Vec { + match serde_json::from_str::>(s) { + Ok(raw) => raw.into_iter().map(Into::into).collect(), + Err(e) => { + warn!("failed to parse DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS: {e}"); + vec![] + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_valid_endpoint() { + let s = r#"[{"api_key":"key2","Host":"agent-http-intake.logs.datadoghq.com","Port":443,"is_reliable":true}]"#; + let result = parse_additional_endpoints(s); + assert_eq!(result.len(), 1); + assert_eq!(result[0].api_key, "key2"); + assert_eq!( + result[0].url, + "https://agent-http-intake.logs.datadoghq.com:443/api/v2/logs" + ); + assert!(result[0].is_reliable); + } + + #[test] + fn test_parse_missing_port_returns_empty() { + // Missing required "Port" field — should warn and return [] + let s = r#"[{"api_key":"key","Host":"intake.logs.datadoghq.com","is_reliable":true}]"#; + let result = parse_additional_endpoints(s); + assert!(result.is_empty()); + } + + #[test] + fn test_parse_empty_string_returns_empty() { + let result = parse_additional_endpoints(""); + assert!(result.is_empty()); + } + + #[test] + fn test_parse_multiple_endpoints() { + let s = r#"[ + {"api_key":"k1","Host":"host1.example.com","Port":443,"is_reliable":true}, + {"api_key":"k2","Host":"host2.example.com","Port":10516,"is_reliable":false} + ]"#; + let result = parse_additional_endpoints(s); + assert_eq!(result.len(), 2); + assert_eq!(result[0].url, "https://host1.example.com:443/api/v2/logs"); + assert_eq!(result[1].url, "https://host2.example.com:10516/api/v2/logs"); + assert!(!result[1].is_reliable); + } +} diff --git a/crates/datadog-logs-agent/src/server.rs b/crates/datadog-logs-agent/src/server.rs new file mode 100644 index 00000000..90d5d39d --- /dev/null +++ b/crates/datadog-logs-agent/src/server.rs @@ -0,0 +1,510 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! HTTP intake server for the log agent. +//! +//! [`LogServer`] listens on a TCP port and accepts `POST /v1/input` requests +//! whose body is a JSON array of [`crate::IntakeEntry`] values. Entries are +//! forwarded to the shared [`crate::AggregatorHandle`] for batching and +//! eventual flushing. +//! +//! # Usage (network intake — serverless-compat) +//! ```ignore +//! let (service, handle) = AggregatorService::new(); +//! tokio::spawn(service.run()); +//! +//! let server = LogServer::new( +//! LogServerConfig { host: "0.0.0.0".into(), port: 8080 }, +//! handle.clone(), +//! ); +//! tokio::spawn(server.serve()); +//! +//! let flusher = LogFlusher::new(config, client, handle); +//! // flush periodically … +//! ``` +//! +//! # Direct intake (bottlecap — unchanged) +//! ```ignore +//! // bottlecap never uses LogServer; it calls handle.insert_batch() directly. +//! handle.insert_batch(entries).expect("insert"); +//! ``` + +use http_body_util::BodyExt as _; +use hyper::body::Incoming; +use hyper::service::service_fn; +use hyper::{Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; +use tracing::{debug, error, warn}; + +use crate::aggregator::AggregatorHandle; +use crate::intake_entry::IntakeEntry; + +const LOG_INTAKE_PATH: &str = "/v1/input"; +/// Maximum accepted request body size in bytes (4 MiB). Requests larger than +/// this are rejected with 413 before the body is read into memory. +const MAX_BODY_BYTES: usize = 4 * 1024 * 1024; + +/// Configuration for the [`LogServer`] HTTP intake listener. +#[derive(Debug, Clone)] +pub struct LogServerConfig { + /// Interface to bind (e.g. `"0.0.0.0"` or `"127.0.0.1"`). + pub host: String, + /// TCP port to listen on. + pub port: u16, +} + +/// HTTP server that receives log entries over the network and forwards them to +/// a running [`AggregatorHandle`]. +/// +/// Create with [`LogServer::new`], then call [`LogServer::serve`] inside a +/// `tokio::spawn` — it runs forever until the process exits. +pub struct LogServer { + config: LogServerConfig, + handle: AggregatorHandle, +} + +impl LogServer { + /// Create a new server. Does **not** bind the port until [`serve`](Self::serve) is called. + pub fn new(config: LogServerConfig, handle: AggregatorHandle) -> Self { + Self { config, handle } + } + + /// Bind the configured port and serve HTTP/1 requests indefinitely. + /// + /// This is an `async fn` meant to be run inside `tokio::spawn`. + /// It only returns if binding fails; otherwise it loops forever. + pub async fn serve(self) { + let addr = format!("{}:{}", self.config.host, self.config.port); + let listener = match tokio::net::TcpListener::bind(&addr).await { + Ok(l) => { + let actual = l.local_addr().map_or(addr.clone(), |a| a.to_string()); + debug!("log server listening on {actual}"); + l + } + Err(e) => { + error!("log server failed to bind {addr}: {e}"); + return; + } + }; + + loop { + let (stream, peer) = match listener.accept().await { + Ok(pair) => pair, + Err(e) => { + warn!("log server accept error: {e}"); + continue; + } + }; + + debug!("log server: connection from {peer}"); + let handle = self.handle.clone(); + tokio::spawn(async move { + let io = TokioIo::new(stream); + let svc = service_fn(move |req: Request| { + let handle = handle.clone(); + async move { handle_request(req, handle).await } + }); + if let Err(e) = hyper::server::conn::http1::Builder::new() + .serve_connection(io, svc) + .await + { + debug!("log server: connection error: {e}"); + } + }); + } + } +} + +/// Handle a single HTTP request: route, parse body, insert into aggregator. +async fn handle_request( + req: Request, + handle: AggregatorHandle, +) -> Result, std::convert::Infallible> { + if req.method() != Method::POST { + return Ok(Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body("method not allowed".to_string()) + .unwrap_or_default()); + } + if req.uri().path() != LOG_INTAKE_PATH { + return Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("not found".to_string()) + .unwrap_or_default()); + } + + // Reject early if Content-Length is declared and already exceeds the limit. + // Skip this check when Content-Length is absent (chunked transfer) — actual + // size is enforced after reading below. + if let Some(content_length) = req + .headers() + .get(hyper::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + && content_length > MAX_BODY_BYTES + { + return Ok(Response::builder() + .status(StatusCode::PAYLOAD_TOO_LARGE) + .body("payload too large".to_string()) + .unwrap_or_default()); + } + + let bytes = match req.collect().await { + Ok(collected) => collected.to_bytes(), + Err(e) => { + warn!("log server: failed to read request body: {e}"); + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("failed to read body".to_string()) + .unwrap_or_default()); + } + }; + if bytes.len() > MAX_BODY_BYTES { + return Ok(Response::builder() + .status(StatusCode::PAYLOAD_TOO_LARGE) + .body("payload too large".to_string()) + .unwrap_or_default()); + } + + let entries: Vec = match serde_json::from_slice(&bytes) { + Ok(e) => e, + Err(e) => { + warn!("log server: failed to parse log entries: {e}"); + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(format!("invalid JSON: {e}")) + .unwrap_or_default()); + } + }; + + if entries.is_empty() { + return Ok(Response::builder() + .status(StatusCode::OK) + .body("ok".to_string()) + .unwrap_or_default()); + } + + debug!("log server: received {} entries", entries.len()); + + if let Err(e) = handle.insert_batch(entries) { + error!("log server: failed to insert batch: {e}"); + return Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("aggregator unavailable".to_string()) + .unwrap_or_default()); + } + + Ok(Response::builder() + .status(StatusCode::OK) + .body("ok".to_string()) + .unwrap_or_default()) +} + +#[cfg(test)] +// Tests use plain reqwest; FIPS client not needed for local loopback +#[allow(clippy::disallowed_methods, clippy::unwrap_used, clippy::expect_used)] +mod tests { + use super::*; + use crate::aggregator::AggregatorService; + use tokio::time::{Duration, sleep}; + + /// Bind `:0`, record the OS-assigned port, drop the listener, then start + /// `LogServer` on that port. Returns the base URL. + async fn start_test_server(handle: AggregatorHandle) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + drop(listener); + + let server = LogServer::new( + LogServerConfig { + host: "127.0.0.1".into(), + port, + }, + handle, + ); + tokio::spawn(server.serve()); + sleep(Duration::from_millis(50)).await; + format!("http://127.0.0.1:{port}") + } + + #[tokio::test] + async fn test_post_valid_entries_returns_200_and_batch_inserted() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle.clone()).await; + let client = reqwest::Client::new(); + + let entries = vec![ + serde_json::json!({"message": "hello", "timestamp": 1_700_000_000_000_i64}), + serde_json::json!({"message": "world", "timestamp": 1_700_000_001_000_i64}), + ]; + + let resp = client + .post(format!("{base_url}/v1/input")) + .json(&entries) + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 200); + + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!(batches.len(), 1, "should have one batch"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).unwrap(); + assert_eq!(arr.as_array().unwrap().len(), 2); + assert_eq!(arr[0]["message"], "hello"); + } + + #[tokio::test] + async fn test_post_malformed_json_returns_400() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle).await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{base_url}/v1/input")) + .header("Content-Type", "application/json") + .body("not-json") + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 400); + } + + #[tokio::test] + async fn test_get_request_returns_405() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle).await; + let client = reqwest::Client::new(); + + let resp = client + .get(format!("{base_url}/v1/input")) + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 405); + } + + #[tokio::test] + async fn test_wrong_path_returns_404() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle).await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{base_url}/wrong/path")) + .json(&serde_json::json!([{"message": "x", "timestamp": 0_i64}])) + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 404); + } + + #[tokio::test] + async fn test_post_empty_array_returns_200_no_batch() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle.clone()).await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{base_url}/v1/input")) + .json(&serde_json::json!([])) + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 200); + + let batches = handle.get_batches().await.expect("get_batches"); + assert!(batches.is_empty(), "empty POST should insert nothing"); + } + + /// A request whose Content-Length header exceeds MAX_BODY_BYTES must be + /// rejected with 413 before any body bytes are read. + #[tokio::test] + async fn test_oversized_content_length_returns_413() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle).await; + let client = reqwest::Client::new(); + + // The server checks the Content-Length header directly and rejects + // before reading the body when the declared size exceeds the limit. + let fake_large_size = MAX_BODY_BYTES + 1; + let resp = client + .post(format!("{base_url}/v1/input")) + .header("Content-Type", "application/json") + .header("Content-Length", fake_large_size.to_string()) + .body("[]") + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 413); + } + + /// A POST with Transfer-Encoding: chunked (no Content-Length header) must + /// not be rejected with 413. This is the regression test for the original + /// bug where `size_hint().upper()` returning `None` was coerced to `u64::MAX` + /// and treated as exceeding the body-size limit. + #[tokio::test] + async fn test_chunked_transfer_encoding_accepted() { + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + use tokio::net::TcpStream; + + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle.clone()).await; + let port: u16 = base_url + .trim_start_matches("http://127.0.0.1:") + .parse() + .expect("port"); + + let body = r#"[{"message":"chunked","timestamp":1700000000000}]"#; + let request = format!( + "POST /v1/input HTTP/1.1\r\n\ + Host: 127.0.0.1:{port}\r\n\ + Content-Type: application/json\r\n\ + Transfer-Encoding: chunked\r\n\ + \r\n\ + {:x}\r\n\ + {body}\r\n\ + 0\r\n\ + \r\n", + body.len(), + ); + + let mut stream = TcpStream::connect(format!("127.0.0.1:{port}")) + .await + .expect("connect"); + stream.write_all(request.as_bytes()).await.expect("write"); + stream.flush().await.expect("flush"); + + let mut response = String::new(); + let mut buf = [0u8; 4096]; + loop { + let n = stream.read(&mut buf).await.expect("read"); + if n == 0 { + break; + } + response.push_str(&String::from_utf8_lossy(&buf[..n])); + if response.contains("\r\n\r\n") { + break; + } + } + + assert!( + response.starts_with("HTTP/1.1 200"), + "expected 200, got: {response}" + ); + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!(batches.len(), 1, "entry should have been inserted"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).unwrap(); + assert_eq!(arr[0]["message"], "chunked"); + } + + /// All optional IntakeEntry fields (hostname, service, ddsource, ddtags, + /// status) and arbitrary attributes must survive the HTTP round-trip + /// through the server and appear intact in the aggregated batch. + #[tokio::test] + async fn test_full_intake_entry_fields_preserved_through_http() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle.clone()).await; + let client = reqwest::Client::new(); + + let payload = serde_json::json!([{ + "message": "lambda invoked", + "timestamp": 1_700_000_002_000_i64, + "hostname": "arn:aws:lambda:us-east-1:123:function:my-fn", + "service": "my-fn", + "ddsource": "lambda", + "ddtags": "env:prod,version:1.0", + "status": "info", + "lambda": { + "arn": "arn:aws:lambda:us-east-1:123:function:my-fn", + "request_id": "req-abc-123" + } + }]); + + let resp = client + .post(format!("{base_url}/v1/input")) + .json(&payload) + .send() + .await + .expect("request failed"); + + assert_eq!(resp.status(), 200); + + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!(batches.len(), 1); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).unwrap(); + let entry = &arr[0]; + + assert_eq!(entry["message"], "lambda invoked"); + assert_eq!( + entry["hostname"], + "arn:aws:lambda:us-east-1:123:function:my-fn" + ); + assert_eq!(entry["service"], "my-fn"); + assert_eq!(entry["ddsource"], "lambda"); + assert_eq!(entry["ddtags"], "env:prod,version:1.0"); + assert_eq!(entry["status"], "info"); + // Flattened attributes must appear at the top level + assert_eq!(entry["lambda"]["request_id"], "req-abc-123"); + } + + /// Two sequential POST requests must both accumulate in the aggregator + /// before `get_batches` drains them. + #[tokio::test] + async fn test_sequential_posts_accumulate_in_aggregator() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + let base_url = start_test_server(handle.clone()).await; + let client = reqwest::Client::new(); + + // First request + client + .post(format!("{base_url}/v1/input")) + .json(&serde_json::json!([{"message": "first", "timestamp": 1_i64}])) + .send() + .await + .expect("first request failed"); + + // Second request + client + .post(format!("{base_url}/v1/input")) + .json(&serde_json::json!([{"message": "second", "timestamp": 2_i64}])) + .send() + .await + .expect("second request failed"); + + let batches = handle.get_batches().await.expect("get_batches"); + // Both entries land in the same aggregator; batch count depends on + // the aggregator's internal sizing, but total entries must be 2. + let total_entries: usize = batches + .iter() + .map(|b| { + let arr: serde_json::Value = serde_json::from_slice(b).unwrap(); + arr.as_array().unwrap().len() + }) + .sum(); + assert_eq!(total_entries, 2, "both entries should be in the aggregator"); + } +} diff --git a/crates/datadog-logs-agent/tests/integration_test.rs b/crates/datadog-logs-agent/tests/integration_test.rs new file mode 100644 index 00000000..291777c2 --- /dev/null +++ b/crates/datadog-logs-agent/tests/integration_test.rs @@ -0,0 +1,846 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Integration tests for the `datadog-logs-agent` crate. +//! +//! These tests exercise two intake paths: +//! +//! **Direct intake** (bottlecap / in-process): +//! `IntakeEntry` → `AggregatorHandle::insert_batch` → `LogFlusher::flush` → HTTP endpoint +//! +//! **Network intake** (serverless-compat / over HTTP): +//! HTTP POST → `LogServer` → `AggregatorHandle::insert_batch` → `LogFlusher::flush` → HTTP endpoint +//! +//! HTTP traffic is directed to a local `mockito` server via +//! `Destination::ObservabilityPipelinesWorker`, which accepts a direct URL. +//! Datadog-mode-specific headers (`DD-PROTOCOL`) are covered by unit tests in `flusher.rs`. + +#![allow(clippy::disallowed_methods, clippy::unwrap_used, clippy::expect_used)] + +use datadog_logs_agent::{ + AggregatorService, Destination, IntakeEntry, LogFlusher, LogFlusherConfig, LogServer, + LogServerConfig, LogsAdditionalEndpoint, +}; +use mockito::{Matcher, Server}; +use std::time::Duration; +use tokio::time::sleep; + +// ── Helpers ────────────────────────────────────────────────────────────────── + +fn build_client() -> reqwest::Client { + reqwest::Client::builder() + .build() + .expect("failed to build HTTP client") +} + +/// Config that routes all flushes to `mock_url/logs` via OPW mode. +fn opw_config(mock_url: &str) -> LogFlusherConfig { + LogFlusherConfig { + api_key: "test-api-key".to_string(), + site: "ignored.datadoghq.com".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/logs", mock_url), + }, + additional_endpoints: Vec::new(), + use_compression: false, + compression_level: 0, + flush_timeout: Duration::from_secs(5), + } +} + +fn entry(msg: &str) -> IntakeEntry { + IntakeEntry::from_message(msg, 1_700_000_000_000) +} + +// ── Pipeline happy path ─────────────────────────────────────────────────────── + +/// Inserting log entries and flushing sends a single POST to the endpoint. +#[tokio::test] +async fn test_pipeline_inserts_and_flushes() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .match_header("DD-API-KEY", "test-api-key") + .with_status(200) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + handle + .insert_batch(vec![entry("hello"), entry("world")]) + .expect("insert_batch"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty(), "flush should return empty on 200"); + mock.assert_async().await; +} + +/// Flushing with no entries makes no HTTP request. +#[tokio::test] +async fn test_empty_flush_makes_no_request() { + let server = Server::new_async().await; + // Any unexpected request would return 501 and cause an assertion failure below. + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let url = server.url(); + let result = LogFlusher::new(opw_config(&url), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty(), "empty flush should return empty"); + // No mock was set up — if a request had been made, mockito would panic. + drop(server); +} + +// ── JSON payload shape ──────────────────────────────────────────────────────── + +/// The flushed payload is a valid JSON array containing each inserted entry. +#[tokio::test] +async fn test_payload_is_json_array_with_correct_fields() { + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + handle + .insert_batch(vec![IntakeEntry { + message: "user login".to_string(), + timestamp: 1_700_000_001_000, + hostname: Some("web-01".to_string()), + service: Some("auth".to_string()), + ddsource: Some("nodejs".to_string()), + ddtags: Some("env:prod,version:2.0".to_string()), + status: Some("info".to_string()), + attributes: serde_json::Map::new(), + }]) + .expect("insert"); + + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!(batches.len(), 1); + + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("valid JSON"); + let entries = arr.as_array().expect("JSON array"); + assert_eq!(entries.len(), 1); + + let e = &entries[0]; + assert_eq!(e["message"], "user login"); + assert_eq!(e["timestamp"], 1_700_000_001_000_i64); + assert_eq!(e["hostname"], "web-01"); + assert_eq!(e["service"], "auth"); + assert_eq!(e["ddsource"], "nodejs"); + assert_eq!(e["ddtags"], "env:prod,version:2.0"); + assert_eq!(e["status"], "info"); +} + +/// Absent optional fields are not serialized into the JSON payload. +#[tokio::test] +async fn test_absent_optional_fields_not_serialized() { + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + handle + .insert_batch(vec![IntakeEntry::from_message("minimal", 0)]) + .expect("insert"); + + let batches = handle.get_batches().await.expect("get_batches"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("valid JSON"); + let e = &arr[0]; + + assert_eq!(e["message"], "minimal"); + assert!(e.get("hostname").is_none(), "hostname absent"); + assert!(e.get("service").is_none(), "service absent"); + assert!(e.get("ddsource").is_none(), "ddsource absent"); + assert!(e.get("ddtags").is_none(), "ddtags absent"); + assert!(e.get("status").is_none(), "status absent"); +} + +// ── Runtime-specific attributes ─────────────────────────────────────────────── + +/// Lambda-specific attributes are flattened into the top-level JSON object. +#[tokio::test] +async fn test_lambda_attributes_flattened_at_top_level() { + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let mut attrs = serde_json::Map::new(); + attrs.insert( + "lambda".to_string(), + serde_json::json!({ + "arn": "arn:aws:lambda:us-east-1:123456789012:function:my-fn", + "request_id": "abc-123" + }), + ); + + handle + .insert_batch(vec![IntakeEntry { + message: "invocation complete".to_string(), + timestamp: 0, + hostname: Some("my-fn".to_string()), + service: Some("my-fn".to_string()), + ddsource: Some("lambda".to_string()), + ddtags: Some("env:prod".to_string()), + status: Some("info".to_string()), + attributes: attrs, + }]) + .expect("insert"); + + let batches = handle.get_batches().await.expect("get_batches"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("valid JSON"); + let e = &arr[0]; + + // Lambda object is a top-level key (flattened via #[serde(flatten)]) + assert_eq!( + e["lambda"]["arn"], + "arn:aws:lambda:us-east-1:123456789012:function:my-fn" + ); + assert_eq!(e["lambda"]["request_id"], "abc-123"); +} + +/// Azure-specific attributes are flattened into the top-level JSON object. +#[tokio::test] +async fn test_azure_attributes_flattened_at_top_level() { + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let mut attrs = serde_json::Map::new(); + attrs.insert( + "azure".to_string(), + serde_json::json!({ + "resource_id": "/subscriptions/sub-123/resourceGroups/rg/providers/Microsoft.Web/sites/my-fn", + "operation_name": "Microsoft.Web/sites/functions/run/action" + }), + ); + + handle + .insert_batch(vec![IntakeEntry { + message: "azure function triggered".to_string(), + timestamp: 0, + hostname: Some("my-azure-fn".to_string()), + service: Some("payments".to_string()), + ddsource: Some("azure-functions".to_string()), + ddtags: Some("env:staging".to_string()), + status: Some("info".to_string()), + attributes: attrs, + }]) + .expect("insert"); + + let batches = handle.get_batches().await.expect("get_batches"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("valid JSON"); + let e = &arr[0]; + + assert_eq!(e["ddsource"], "azure-functions"); + assert!( + e["azure"]["resource_id"] + .as_str() + .unwrap_or("") + .contains("Microsoft.Web"), + "azure resource_id present" + ); + assert_eq!( + e["azure"]["operation_name"], + "Microsoft.Web/sites/functions/run/action" + ); +} + +// ── Batch limits ────────────────────────────────────────────────────────────── + +/// Exactly MAX_BATCH_ENTRIES entries produce a single batch. +#[tokio::test] +async fn test_max_entries_fits_in_one_batch() { + const MAX: usize = datadog_logs_agent::constants::MAX_BATCH_ENTRIES; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let entries: Vec = (0..MAX).map(|i| entry(&format!("log {i}"))).collect(); + handle.insert_batch(entries).expect("insert"); + + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!( + batches.len(), + 1, + "exactly MAX_BATCH_ENTRIES fits in one batch" + ); + + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("valid JSON"); + assert_eq!(arr.as_array().unwrap().len(), MAX); +} + +/// MAX_BATCH_ENTRIES + 1 entries split into two batches; two POSTs are sent. +#[tokio::test] +async fn test_overflow_produces_two_batches_and_two_posts() { + const MAX: usize = datadog_logs_agent::constants::MAX_BATCH_ENTRIES; + + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .match_header("DD-API-KEY", "test-api-key") + .with_status(200) + .expect(2) // exactly 2 requests expected + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let entries: Vec = (0..=MAX).map(|i| entry(&format!("log {i}"))).collect(); + handle.insert_batch(entries).expect("insert"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty()); + mock.assert_async().await; +} + +// ── Oversized entries ───────────────────────────────────────────────────────── + +/// Entries exceeding MAX_LOG_BYTES are silently dropped; valid entries still flush. +#[tokio::test] +async fn test_oversized_entry_dropped_valid_entries_still_flush() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let oversized = IntakeEntry::from_message( + "x".repeat(datadog_logs_agent::constants::MAX_LOG_BYTES + 1), + 0, + ); + let normal = entry("this one is fine"); + + handle + .insert_batch(vec![oversized, normal]) + .expect("insert"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty(), "flush should succeed for valid entries"); + mock.assert_async().await; +} + +/// All entries oversized means nothing to flush — no HTTP request. +#[tokio::test] +async fn test_all_oversized_entries_produces_no_request() { + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let oversized = IntakeEntry::from_message( + "x".repeat(datadog_logs_agent::constants::MAX_LOG_BYTES + 1), + 0, + ); + handle.insert_batch(vec![oversized]).expect("insert"); + + let batches = handle.get_batches().await.expect("get_batches"); + assert!( + batches.is_empty(), + "oversized-only aggregator should produce no batches" + ); +} + +// ── Concurrent producers ────────────────────────────────────────────────────── + +/// Two cloned handles can insert concurrently; all entries appear in the flush. +#[tokio::test] +async fn test_concurrent_producers_all_entries_flushed() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + + let h1 = handle.clone(); + let h2 = handle.clone(); + + let (r1, r2) = tokio::join!( + tokio::spawn(async move { + h1.insert_batch(vec![entry("from-producer-1")]) + .expect("h1 insert") + }), + tokio::spawn(async move { + h2.insert_batch(vec![entry("from-producer-2")]) + .expect("h2 insert") + }), + ); + r1.expect("task 1"); + r2.expect("task 2"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty()); + mock.assert_async().await; +} + +// ── OPW mode ────────────────────────────────────────────────────────────────── + +/// OPW mode sends to the custom URL and omits the DD-PROTOCOL header. +#[tokio::test] +async fn test_opw_mode_uses_custom_url_and_omits_dd_protocol() { + let mut server = Server::new_async().await; + let opw_path = "/opw-endpoint"; + let mock = server + .mock("POST", opw_path) + .match_header("DD-API-KEY", "test-api-key") + .match_header("DD-PROTOCOL", Matcher::Missing) + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle.insert_batch(vec![entry("opw log")]).expect("insert"); + + let config = LogFlusherConfig { + api_key: "test-api-key".to_string(), + site: "ignored".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}{}", server.url(), opw_path), + }, + additional_endpoints: Vec::new(), + use_compression: false, + compression_level: 0, + flush_timeout: Duration::from_secs(5), + }; + + let result = LogFlusher::new(config, build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty()); + mock.assert_async().await; +} + +// ── Compression ─────────────────────────────────────────────────────────────── + +/// OPW mode always disables compression regardless of `use_compression` setting. +/// The request must NOT carry `Content-Encoding: zstd` in OPW mode. +/// +/// Note: zstd compression in Datadog mode is verified in `flusher.rs` unit tests +/// via `ship_batch` directly, since Datadog mode constructs an HTTPS URL that +/// cannot be intercepted by a plain HTTP mock server. +#[tokio::test] +async fn test_opw_mode_disables_compression_regardless_of_config() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .match_header("Content-Encoding", Matcher::Missing) // must not be compressed + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle + .insert_batch(vec![entry("not compressed in OPW")]) + .expect("insert"); + + // use_compression: true — but OPW mode overrides this to false + let config = LogFlusherConfig { + api_key: "key".to_string(), + site: "ignored".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/logs", server.url()), + }, + additional_endpoints: Vec::new(), + use_compression: true, + compression_level: 3, + flush_timeout: Duration::from_secs(5), + }; + + let result = LogFlusher::new(config, build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty()); + mock.assert_async().await; +} + +// ── Retry behaviour ─────────────────────────────────────────────────────────── + +/// A transient 500 is retried; flush succeeds when the subsequent attempt returns 200. +#[tokio::test] +async fn test_retry_on_500_succeeds_on_second_attempt() { + let mut server = Server::new_async().await; + + let _fail = server + .mock("POST", "/logs") + .with_status(500) + .expect(1) + .create_async() + .await; + let _ok = server + .mock("POST", "/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle + .insert_batch(vec![entry("retry me")]) + .expect("insert"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty(), "should succeed after retry"); +} + +/// A 403 is a permanent error; flush fails without additional retry attempts. +#[tokio::test] +async fn test_permanent_error_on_403_no_retry() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .with_status(403) + .expect(1) // must be called exactly once — no retries + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle + .insert_batch(vec![entry("forbidden")]) + .expect("insert"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + // 403 is a permanent error — dropped silently; no builder to retry. + assert!( + result.is_empty(), + "403 is a permanent error; no retry builder returned" + ); + mock.assert_async().await; +} + +/// All three retry attempts fail with 503; flush returns false. +#[tokio::test] +async fn test_exhausted_retries_returns_false() { + let mut server = Server::new_async().await; + let mock = server + .mock("POST", "/logs") + .with_status(503) + .expect(3) // MAX_FLUSH_ATTEMPTS = 3 + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle + .insert_batch(vec![entry("keep failing")]) + .expect("insert"); + + let result = LogFlusher::new(opw_config(&server.url()), build_client(), handle) + .flush(vec![]) + .await; + + // Transient 503 exhausts per-invocation retries; builder returned for next flush. + assert!( + !result.is_empty(), + "exhausted retries should return a retry builder" + ); + mock.assert_async().await; +} + +// ── Additional endpoints ────────────────────────────────────────────────────── + +/// When additional endpoints are configured, the same batch is shipped to each. +#[tokio::test] +async fn test_additional_endpoints_receive_same_batch() { + let mut primary = Server::new_async().await; + let mut secondary = Server::new_async().await; + + let primary_mock = primary + .mock("POST", "/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let secondary_mock = secondary + .mock("POST", "/extra") + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle + .insert_batch(vec![entry("multi-endpoint")]) + .expect("insert"); + + let config = LogFlusherConfig { + api_key: "test-api-key".to_string(), + site: "ignored".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/logs", primary.url()), + }, + additional_endpoints: vec![LogsAdditionalEndpoint { + api_key: "secondary-api-key".to_string(), + url: format!("{}/extra", secondary.url()), + is_reliable: true, + }], + use_compression: false, + compression_level: 0, + flush_timeout: Duration::from_secs(5), + }; + + let result = LogFlusher::new(config, build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty()); + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +/// Additional endpoint failure does not cause flush() to return false +/// (additional endpoints are best-effort). +#[tokio::test] +async fn test_additional_endpoint_failure_does_not_affect_return_value() { + let mut primary = Server::new_async().await; + let mut secondary = Server::new_async().await; + + let _primary_mock = primary + .mock("POST", "/logs") + .with_status(200) + .create_async() + .await; + + let _secondary_mock = secondary + .mock("POST", "/extra") + .with_status(500) // secondary always fails + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + let _task = tokio::spawn(svc.run()); + handle.insert_batch(vec![entry("test")]).expect("insert"); + + let config = LogFlusherConfig { + api_key: "key".to_string(), + site: "ignored".to_string(), + mode: Destination::ObservabilityPipelinesWorker { + url: format!("{}/logs", primary.url()), + }, + additional_endpoints: vec![LogsAdditionalEndpoint { + api_key: "secondary-api-key".to_string(), + url: format!("{}/extra", secondary.url()), + is_reliable: true, + }], + use_compression: false, + compression_level: 0, + flush_timeout: Duration::from_secs(5), + }; + + let result = LogFlusher::new(config, build_client(), handle) + .flush(vec![]) + .await; + + assert!( + result.is_empty(), + "primary succeeded — additional endpoint failure must not affect return value" + ); +} + +// ── Network intake (LogServer) ──────────────────────────────────────────────── + +/// Bind :0 to get a free port, drop the listener, then start LogServer on that +/// port. Returns the base URL ("http://127.0.0.1:"). +async fn start_log_server(handle: datadog_logs_agent::AggregatorHandle) -> String { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind :0"); + let port = listener.local_addr().expect("local_addr").port(); + drop(listener); + + let server = LogServer::new( + LogServerConfig { + host: "127.0.0.1".into(), + port, + }, + handle, + ); + tokio::spawn(server.serve()); + sleep(Duration::from_millis(50)).await; // allow server to bind + format!("http://127.0.0.1:{port}") +} + +/// Full network-intake pipeline: HTTP POST → LogServer → AggregatorService → +/// LogFlusher → mockito backend. This mirrors what serverless-compat does +/// when DD_LOGS_ENABLED=true. +#[tokio::test] +async fn test_server_to_flusher_full_pipeline() { + let mut backend = Server::new_async().await; + let mock = backend + .mock("POST", "/logs") + .match_header("DD-API-KEY", "test-api-key") + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + tokio::spawn(svc.run()); + + let base_url = start_log_server(handle.clone()).await; + + // External adapter POSTs a log entry over HTTP (as serverless-compat extension would). + let client = reqwest::Client::new(); + let resp = client + .post(format!("{base_url}/v1/input")) + .json(&serde_json::json!([{ + "message": "invocation start", + "timestamp": 1_700_000_000_000_i64, + "ddsource": "lambda", + "service": "my-fn", + "ddtags": "env:prod" + }])) + .send() + .await + .expect("POST to log server"); + + assert_eq!(resp.status(), 200, "log server should accept the entry"); + + // Flush everything accumulated in the aggregator to the mock backend. + let result = LogFlusher::new(opw_config(&backend.url()), build_client(), handle) + .flush(vec![]) + .await; + + assert!(result.is_empty(), "flush should return empty on 200"); + mock.assert_async().await; +} + +/// Multiple concurrent HTTP clients can POST entries simultaneously; all +/// entries must arrive in the aggregator before flushing. +#[tokio::test] +async fn test_server_concurrent_clients_all_entries_arrive() { + let mut backend = Server::new_async().await; + let mock = backend + .mock("POST", "/logs") + .with_status(200) + .expect(1) + .create_async() + .await; + + let (svc, handle) = AggregatorService::new(); + tokio::spawn(svc.run()); + + let base_url = start_log_server(handle.clone()).await; + + // Five concurrent producers each POST one entry. + const N: usize = 5; + let mut tasks = Vec::with_capacity(N); + for i in 0..N { + let url = format!("{base_url}/v1/input"); + tasks.push(tokio::spawn(async move { + reqwest::Client::new() + .post(&url) + .json(&serde_json::json!([{ + "message": format!("entry-{i}"), + "timestamp": i as i64 + }])) + .send() + .await + .expect("concurrent POST") + .status() + })); + } + + for task in tasks { + let status = task.await.expect("task"); + assert_eq!(status, 200); + } + + // All N entries must be present in the aggregator. + let batches = handle.get_batches().await.expect("get_batches"); + let total: usize = batches + .iter() + .map(|b| { + let arr: serde_json::Value = serde_json::from_slice(b).unwrap(); + arr.as_array().unwrap().len() + }) + .sum(); + assert_eq!(total, N, "all {N} concurrent entries must be aggregated"); + + // Re-insert the drained entries so we have something to flush. + let (svc2, handle2) = AggregatorService::new(); + tokio::spawn(svc2.run()); + handle2 + .insert_batch(vec![entry("placeholder")]) + .expect("insert"); + let result = LogFlusher::new(opw_config(&backend.url()), build_client(), handle2) + .flush(vec![]) + .await; + assert!(result.is_empty()); + mock.assert_async().await; +} + +/// A malformed POST (invalid JSON) must return 400 and must not prevent the +/// server from processing subsequent valid requests. +#[tokio::test] +async fn test_server_invalid_request_does_not_block_subsequent_valid_requests() { + let (svc, handle) = AggregatorService::new(); + tokio::spawn(svc.run()); + + let base_url = start_log_server(handle.clone()).await; + let client = reqwest::Client::new(); + let url = format!("{base_url}/v1/input"); + + // Bad JSON → 400 + let bad = client + .post(&url) + .header("Content-Type", "application/json") + .body("not-json-at-all") + .send() + .await + .expect("bad POST"); + assert_eq!(bad.status(), 400); + + // Valid entry immediately after → 200 and entry reaches aggregator + let good = client + .post(&url) + .json(&serde_json::json!([{"message": "after-error", "timestamp": 1_i64}])) + .send() + .await + .expect("good POST"); + assert_eq!(good.status(), 200); + + let batches = handle.get_batches().await.expect("get_batches"); + let total: usize = batches + .iter() + .map(|b| { + let arr: serde_json::Value = serde_json::from_slice(b).unwrap(); + arr.as_array().unwrap().len() + }) + .sum(); + assert_eq!(total, 1, "only the valid entry should be in the aggregator"); +} diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index ed573669..e7361b40 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -10,6 +10,7 @@ default = [] windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] [dependencies] +datadog-logs-agent = { path = "../datadog-logs-agent" } datadog-trace-agent = { path = "../datadog-trace-agent" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" } datadog-fips = { path = "../datadog-fips", default-features = false } @@ -27,5 +28,10 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ ] } zstd = { version = "0.13.3", default-features = false } +[dev-dependencies] +reqwest = { version = "0.12.4", features = ["json"], default-features = false } +serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } + [[bin]] name = "datadog-serverless-compat" diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index d50798f0..d7d2f9c5 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -12,7 +12,7 @@ use tokio::{ sync::Mutex as TokioMutex, time::{Duration, interval}, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use tracing_subscriber::EnvFilter; use zstd::zstd_safe::CompressionLevel; @@ -26,6 +26,10 @@ use datadog_trace_agent::{ use libdd_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; +use datadog_logs_agent::{ + AggregatorHandle as LogAggregatorHandle, AggregatorService as LogAggregatorService, + Destination as LogDestination, LogFlusher, LogFlusherConfig, LogServer, LogServerConfig, +}; use dogstatsd::{ aggregator::{AggregatorHandle, AggregatorService}, api_key::ApiKeyFactory, @@ -42,6 +46,7 @@ use tokio_util::sync::CancellationToken; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; +const DEFAULT_LOG_INTAKE_PORT: u16 = 10517; const AGENT_HOST: &str = "0.0.0.0"; #[tokio::main] @@ -107,6 +112,13 @@ pub async fn main() { let https_proxy = env::var("DD_PROXY_HTTPS") .or_else(|_| env::var("HTTPS_PROXY")) .ok(); + let dd_logs_enabled = env::var("DD_LOGS_ENABLED") + .map(|val| val.to_lowercase() == "true") + .unwrap_or(false); + let dd_logs_port: u16 = env::var("DD_LOGS_PORT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_LOG_INTAKE_PORT); debug!("Starting serverless trace mini agent"); let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level); @@ -174,9 +186,9 @@ pub async fn main() { debug!("Starting dogstatsd"); let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( dd_dogstatsd_port, - dd_api_key, + dd_api_key.clone(), dd_site, - https_proxy, + https_proxy.clone(), dogstatsd_tags, dd_statsd_metric_namespace, #[cfg(all(windows, feature = "windows-pipes"))] @@ -194,9 +206,31 @@ pub async fn main() { (None, None) }; + let (log_flusher, _log_aggregator_handle): (Option, Option) = + if dd_logs_enabled { + debug!("Starting log agent"); + match start_log_agent(dd_api_key, https_proxy, dd_logs_port) { + Some((flusher, handle)) => { + info!("log agent started"); + (Some(flusher), Some(handle)) + } + None => { + warn!("log agent failed to start, log flushing disabled"); + (None, None) + } + } + } else { + info!("log agent disabled"); + (None, None) + }; + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); flush_interval.tick().await; // discard first tick, which is instantaneous + // Builders for log batches that failed transiently in the previous flush + // cycle. They are redriven on the next cycle before new batches are sent. + let mut pending_log_retries: Vec = Vec::new(); + loop { flush_interval.tick().await; @@ -204,6 +238,22 @@ pub async fn main() { debug!("Flushing dogstatsd metrics"); metrics_flusher.flush().await; } + + if let Some(log_flusher) = log_flusher.as_ref() { + debug!("Flushing log agent"); + let retry_in = std::mem::take(&mut pending_log_retries); + let failed = log_flusher.flush(retry_in).await; + if !failed.is_empty() { + // TODO: surface flush failures into health/metrics telemetry so + // operators have a durable signal beyond log lines when logs are + // being dropped (e.g. increment a statsd counter or set a gauge). + warn!( + "log agent flush failed for {} batch(es); will retry next cycle", + failed.len() + ); + pending_log_retries = failed; + } + } } } @@ -312,3 +362,184 @@ fn build_metrics_client( } Ok(builder.build()?) } + +fn start_log_agent( + dd_api_key: Option, + https_proxy: Option, + logs_port: u16, +) -> Option<(LogFlusher, LogAggregatorHandle)> { + let Some(api_key) = dd_api_key else { + error!("DD_API_KEY not set, log agent disabled"); + return None; + }; + + let (service, handle): (LogAggregatorService, LogAggregatorHandle) = + LogAggregatorService::new(); + tokio::spawn(service.run()); + + let client = create_reqwest_client_builder() + .map_err(|e| error!("failed to create FIPS HTTP client for log agent: {e}")) + .ok() + .and_then(|b| { + let mut builder = b.timeout(DOGSTATSD_TIMEOUT_DURATION); + if let Some(ref proxy) = https_proxy { + match reqwest::Proxy::https(proxy.as_str()) { + Ok(p) => builder = builder.proxy(p), + Err(e) => error!("invalid HTTPS proxy for log agent: {e}"), + } + } + match builder.build() { + Ok(c) => Some(c), + Err(e) => { + error!("failed to build HTTP client for log agent: {e}"); + None + } + } + }); + + let client = client?; // error already logged above + + let config = LogFlusherConfig { + api_key, + ..LogFlusherConfig::from_env() + }; + + // Fail fast: OPW mode with an empty URL will always produce a network error at flush time. + if let LogDestination::ObservabilityPipelinesWorker { url } = &config.mode + && url.is_empty() + { + error!( + "OPW mode enabled but DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL is empty — log agent disabled" + ); + return None; + } + + // Start the HTTP intake server so external adapters can POST log entries. + let server = LogServer::new( + LogServerConfig { + host: AGENT_HOST.to_string(), + port: logs_port, + }, + handle.clone(), + ); + // TODO: `LogServer::serve` binds the port inside the spawned task, so any + // bind failure (e.g. port already in use) is only logged as an error and + // silently swallowed — this function still returns `Some(...)` and the + // caller logs "log agent started" even though the server never came up. + // Fix: split `serve` into a synchronous `bind` step (returning + // `Result`) and a separate accept-loop step, so + // the bind result can be checked here and propagated as a fatal error before + // the feature is considered enabled. + tokio::spawn(server.serve()); + info!("log server listening on {AGENT_HOST}:{logs_port}"); + + let flusher = LogFlusher::new(config, client, handle.clone()); + Some((flusher, handle)) +} + +#[cfg(test)] +mod log_agent_integration_tests { + use datadog_logs_agent::{AggregatorService, IntakeEntry, LogServer, LogServerConfig}; + + #[tokio::test] + async fn test_log_agent_full_pipeline_compiles_and_runs() { + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + handle + .insert_batch(vec![IntakeEntry { + message: "azure function invoked".to_string(), + timestamp: 1_700_000_000_000, + hostname: Some("my-azure-fn".to_string()), + service: Some("payments".to_string()), + ddsource: Some("azure-functions".to_string()), + ddtags: Some("env:prod".to_string()), + status: Some("info".to_string()), + attributes: serde_json::Map::new(), + }]) + .expect("insert_batch"); + + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!(batches.len(), 1); + + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("json"); + assert_eq!(arr[0]["ddsource"], "azure-functions"); + assert_eq!(arr[0]["service"], "payments"); + + handle.shutdown().expect("shutdown"); + } + + /// start_log_agent must return None when OPW mode is enabled but the URL is empty. + #[tokio::test] + async fn test_opw_empty_url_is_detected() { + use super::start_log_agent; + // Enable OPW mode with a deliberately empty URL — the production guard + // inside start_log_agent must catch this and return None. + // SAFETY: test-only, single-threaded setup before any spawned tasks. + unsafe { + std::env::set_var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED", "true"); + std::env::set_var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL", ""); + } + let result = start_log_agent(Some("test-key".to_string()), None, 0); + // SAFETY: test-only cleanup. + unsafe { + std::env::remove_var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED"); + std::env::remove_var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL"); + } + assert!( + result.is_none(), + "start_log_agent must return None when OPW URL is empty" + ); + } + + /// Full network intake path: entries posted over HTTP to LogServer must + /// reach the AggregatorService and be retrievable via get_batches. + /// This mirrors what serverless-compat does when DD_LOGS_ENABLED=true. + #[tokio::test] + #[allow(clippy::disallowed_methods, clippy::unwrap_used, clippy::expect_used)] + async fn test_log_server_network_intake_end_to_end() { + use tokio::time::{Duration, sleep}; + + let (service, handle) = AggregatorService::new(); + tokio::spawn(service.run()); + + // Bind :0 to discover a free port, then hand it to LogServer + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + drop(listener); + + let server = LogServer::new( + LogServerConfig { + host: "127.0.0.1".into(), + port, + }, + handle.clone(), + ); + tokio::spawn(server.serve()); + sleep(Duration::from_millis(50)).await; + + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://127.0.0.1:{port}/v1/input")) + .json(&serde_json::json!([{ + "message": "lambda function invoked", + "timestamp": 1_700_000_000_000_i64, + "ddsource": "lambda", + "service": "my-fn" + }])) + .send() + .await + .expect("POST to log server failed"); + + assert_eq!(resp.status(), 200, "server must accept the payload"); + + let batches = handle.get_batches().await.expect("get_batches"); + assert_eq!(batches.len(), 1, "one batch expected"); + let arr: serde_json::Value = serde_json::from_slice(&batches[0]).expect("json"); + assert_eq!(arr[0]["message"], "lambda function invoked"); + assert_eq!(arr[0]["ddsource"], "lambda"); + assert_eq!(arr[0]["service"], "my-fn"); + + handle.shutdown().expect("shutdown"); + } +} diff --git a/crates/datadog-trace-agent/src/aggregator.rs b/crates/datadog-trace-agent/src/aggregator.rs index 4f364243..b7467ee4 100644 --- a/crates/datadog-trace-agent/src/aggregator.rs +++ b/crates/datadog-trace-agent/src/aggregator.rs @@ -104,7 +104,7 @@ mod tests { aggregator.add(payload); assert_eq!(aggregator.queue.len(), 1); - assert_eq!(aggregator.queue[0].is_empty(), false); + assert!(!aggregator.queue[0].is_empty()); assert_eq!(aggregator.queue[0].len(), 1); } diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index bf28d4f8..1491954f 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -295,11 +295,11 @@ async fn test_mini_agent_tcp_with_real_flushers() { let mut server_ready = false; for _ in 0..20 { tokio::time::sleep(Duration::from_millis(50)).await; - if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await { - if response.status().is_success() { - server_ready = true; - break; - } + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await + && response.status().is_success() + { + server_ready = true; + break; } } assert!( diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index d26579f4..70032c61 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -231,6 +231,7 @@ async fn should_try_next_batch(resp: Result) -> (bool, } #[cfg(test)] +#[allow(clippy::disallowed_methods)] mod tests { use super::*; use crate::aggregator::AggregatorService; diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index d0c0952d..fc025b9a 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -162,16 +162,10 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); + assert_eq!(origin.origin_category, OriginCategory::LambdaMetrics as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, - OriginCategory::LambdaMetrics as u32 - ); - assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessRuntime as u32 ); } @@ -187,16 +181,10 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); + assert_eq!(origin.origin_category, OriginCategory::LambdaMetrics as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, - OriginCategory::LambdaMetrics as u32 - ); - assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessEnhanced as u32 ); } @@ -212,16 +200,10 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); + assert_eq!(origin.origin_category, OriginCategory::LambdaMetrics as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, - OriginCategory::LambdaMetrics as u32 - ); - assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessCustom as u32 ); } @@ -237,16 +219,13 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, + origin.origin_category, OriginCategory::CloudRunMetrics as u32 ); assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessEnhanced as u32 ); } @@ -262,16 +241,13 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, + origin.origin_category, OriginCategory::CloudRunMetrics as u32 ); assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessCustom as u32 ); } @@ -287,16 +263,13 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, + origin.origin_category, OriginCategory::AppServicesMetrics as u32 ); assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessCustom as u32 ); } @@ -312,16 +285,13 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, + origin.origin_category, OriginCategory::ContainerAppMetrics as u32 ); assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessCustom as u32 ); } @@ -337,16 +307,13 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, + origin.origin_category, OriginCategory::AzureFunctionsMetrics as u32 ); assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessCustom as u32 ); } @@ -362,16 +329,10 @@ mod tests { timestamp: 0, }; let origin = metric.find_origin(tags).unwrap(); + assert_eq!(origin.origin_product, OriginProduct::Serverless as u32); + assert_eq!(origin.origin_category, OriginCategory::LambdaMetrics as u32); assert_eq!( - origin.origin_product as u32, - OriginProduct::Serverless as u32 - ); - assert_eq!( - origin.origin_category as u32, - OriginCategory::LambdaMetrics as u32 - ); - assert_eq!( - origin.origin_service as u32, + origin.origin_service, OriginService::ServerlessRuntime as u32 ); } diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 3155ef87..f390fa48 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -1,5 +1,6 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +#![allow(clippy::disallowed_methods)] use dogstatsd::metric::SortedTags; use dogstatsd::{ diff --git a/scripts/test-log-intake.sh b/scripts/test-log-intake.sh new file mode 100755 index 00000000..1579aa29 --- /dev/null +++ b/scripts/test-log-intake.sh @@ -0,0 +1,134 @@ +#!/usr/bin/env bash +set -x +# Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +# SPDX-License-Identifier: Apache-2.0 +# +# test-log-intake.sh — Run the log agent example against a local capture server. +# +# The script starts a tiny Python HTTP server that prints every incoming request +# body to stdout so you can inspect the JSON payloads the log agent sends. +# +# USAGE +# # Local capture (default) — no real Datadog traffic: +# ./scripts/test-log-intake.sh +# +# # Send N entries instead of the default 5: +# LOG_ENTRY_COUNT=50 ./scripts/test-log-intake.sh +# +# # Flush to a real Datadog endpoint instead of the local server: +# DD_API_KEY= ./scripts/test-log-intake.sh --real +# +# REQUIREMENTS +# python3 (macOS system python is fine) +# cargo + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +WORKSPACE_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +PORT="${LOG_CAPTURE_PORT:-9999}" +REAL_MODE=false + +for arg in "$@"; do + [[ "$arg" == "--real" ]] && REAL_MODE=true +done + +# ── Build the example first ────────────────────────────────────────────────── +echo "Building send_logs example..." +cargo build -p datadog-logs-agent --example send_logs --quiet 2>&1 + +# ── Real Datadog mode ───────────────────────────────────────────────────────── +if [[ "$REAL_MODE" == true ]]; then + if [[ -z "${DD_API_KEY:-}" ]]; then + echo "Error: DD_API_KEY must be set for --real mode" >&2 + exit 1 + fi + echo "" + echo "Flushing to real Datadog endpoint..." + LOG_ENTRY_COUNT="${LOG_ENTRY_COUNT:-5}" \ + cargo run -p datadog-logs-agent --example send_logs --quiet 2>&1 + exit $? +fi + +# ── Local capture server mode ───────────────────────────────────────────────── + +# Python HTTP server that prints the request body as formatted JSON +CAPTURE_SERVER_SCRIPT=$(cat <<'PYEOF' +import http.server +import json +import sys + +class Handler(http.server.BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) + encoding = self.headers.get("Content-Encoding", "none") + content_type = self.headers.get("Content-Type", "") + + print(f"\n{'─'*60}") + print(f"POST {self.path}") + print(f"DD-API-KEY : {self.headers.get('DD-API-KEY', '(not set)')}") + print(f"DD-PROTOCOL: {self.headers.get('DD-PROTOCOL', '(not set)')}") + print(f"Content-Encoding: {encoding}") + print(f"Content-Type : {content_type}") + + if encoding == "zstd": + try: + import zstd + body = zstd.decompress(body) + print("(decompressed zstd payload)") + except ImportError: + print("(zstd payload — install python-zstd to decompress: pip install zstd)") + + if "json" in content_type or body.startswith(b"[") or body.startswith(b"{"): + try: + parsed = json.loads(body) + print(f"\nPayload ({len(parsed) if isinstance(parsed, list) else 1} entries):") + print(json.dumps(parsed, indent=2)) + except json.JSONDecodeError: + print(f"\nRaw body ({len(body)} bytes): {body[:500]}") + else: + print(f"\nRaw body ({len(body)} bytes)") + + self.send_response(200) + self.end_headers() + sys.stdout.flush() + + def log_message(self, fmt, *args): + pass # suppress default access log noise + +port = int(sys.argv[1]) +print(f"Capture server listening on http://localhost:{port}") +print("Waiting for log flush... (Ctrl-C to stop)\n") +sys.stdout.flush() + +httpd = http.server.HTTPServer(("localhost", port), Handler) +httpd.serve_forever() +PYEOF +) + +# Start capture server in background +python3 -c "$CAPTURE_SERVER_SCRIPT" "$PORT" & +SERVER_PID=$! + +cleanup() { + kill "$SERVER_PID" 2>/dev/null || true +} +trap cleanup EXIT INT TERM + +# Give the server a moment to start +sleep 0.3 + +echo "" +echo "Running send_logs example → http://localhost:${PORT}/logs" +echo "" + +DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED=true \ +DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL="http://localhost:${PORT}/logs" \ +DD_API_KEY="${DD_API_KEY:-local-test-key}" \ +LOG_ENTRY_COUNT="${LOG_ENTRY_COUNT:-5}" \ + cargo run -p datadog-logs-agent --example send_logs --quiet 2>&1 + +echo "" +echo "Done. Press Ctrl-C to stop the capture server." +wait "$SERVER_PID"