diff --git a/.github/workflows/build-datadog-serverless-compat.yml b/.github/workflows/build-datadog-serverless-compat.yml index 13d18ee6..0b04eba6 100644 --- a/.github/workflows/build-datadog-serverless-compat.yml +++ b/.github/workflows/build-datadog-serverless-compat.yml @@ -45,7 +45,7 @@ jobs: retention-days: 3 - if: ${{ inputs.runner == 'windows-2022' }} shell: bash - run: cargo build --release -p datadog-serverless-compat --features windows-pipes + run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics - if: ${{ inputs.runner == 'windows-2022' }} uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2 with: diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index fc640c5d..7863afd2 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -95,7 +95,7 @@ jobs: - shell: bash run: | if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then - cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes + cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics else cargo nextest run --workspace fi diff --git a/Cargo.lock b/Cargo.lock index 19805168..9571b01c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -390,6 +390,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "datadog-metrics-collector" +version = "0.1.0" +dependencies = [ + "dogstatsd", + "libdd-common", + "num_cpus", + "tracing", +] + [[package]] name = "datadog-protos" version = "0.1.0" @@ -409,6 +419,7 @@ name = "datadog-serverless-compat" version = "0.1.0" dependencies = [ "datadog-fips", + "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils", @@ -855,6 +866,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -1494,6 +1511,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.3" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 26f606f9..ce85550b 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -69,6 +69,7 @@ headers,https://github.com/hyperium/headers,MIT,Sean McArthur heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,Without Boats +hermit-abi,https://github.com/hermit-os/hermit-rs,MIT OR Apache-2.0,Stefan Lankes hex,https://github.com/KokaKiwi/rust-hex,MIT OR Apache-2.0,KokaKiwi home,https://github.com/rust-lang/cargo,MIT OR Apache-2.0,Brian Anderson http,https://github.com/hyperium/http,MIT OR Apache-2.0,"Alex Crichton , Carl Lerche , Sean McArthur " @@ -119,6 +120,7 @@ multimap,https://github.com/havarnov/multimap,MIT OR Apache-2.0,Håvar Nøvik , Josh Triplett , The Nushell Project Developers" num-traits,https://github.com/rust-num/num-traits,MIT OR Apache-2.0,The Rust Project Developers +num_cpus,https://github.com/seanmonstar/num_cpus,MIT OR Apache-2.0,Sean McArthur once_cell,https://github.com/matklad/once_cell,MIT OR Apache-2.0,Aleksey Kladov openssl-probe,https://github.com/rustls/openssl-probe,MIT OR Apache-2.0,Alex Crichton ordered-float,https://github.com/reem/rust-ordered-float,MIT,"Jonathan Reem , Matt Brubeck " diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml new file mode 100644 index 00000000..98ee2a48 --- /dev/null +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "datadog-metrics-collector" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" + +[dependencies] +dogstatsd = { path = "../dogstatsd", default-features = true } +num_cpus = "1.16" +tracing = { version = "0.1", default-features = false } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", default-features = false } + +[features] +windows-enhanced-metrics = [] diff --git a/crates/datadog-metrics-collector/src/cpu.rs b/crates/datadog-metrics-collector/src/cpu.rs new file mode 100644 index 00000000..7e6522bb --- /dev/null +++ b/crates/datadog-metrics-collector/src/cpu.rs @@ -0,0 +1,167 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions +//! +//! This module provides OS-agnostic CPU stats collection, CPU usage +//! and limit computation, and metrics submission to Datadog. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use dogstatsd::aggregator::AggregatorHandle; +use dogstatsd::metric::{Metric, MetricValue, SortedTags}; +use libdd_common::azure_app_services; +use std::env; +use tracing::{debug, error}; + +const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.cpu.usage"; +const CPU_LIMIT_METRIC: &str = "azure.functions.enhanced.cpu.limit"; + +/// Computed CPU total and limit metrics +pub struct CpuStats { + pub total: u64, // Cumulative CPU usage in nanoseconds + pub limit: Option, // CPU limit in nanocores + pub defaulted_limit: bool, // Whether CPU limit was defaulted to host CPU count +} + +pub trait CpuStatsReader { + fn read(&self) -> Option; +} + +pub struct CpuMetricsCollector { + reader: Box, + aggregator: AggregatorHandle, + tags: Option, + last_usage_ns: Option, + last_collection_time: std::time::Instant, +} + +impl CpuMetricsCollector { + /// Creates a new CpuMetricsCollector + /// + /// # Arguments + /// + /// * `aggregator` - The aggregator handle to submit metrics to + /// * `tags` - Optional tags to attach to all metrics + pub fn new(aggregator: AggregatorHandle, tags: Option) -> Self { + #[cfg(feature = "windows-enhanced-metrics")] + let reader: Box = Box::new(crate::windows::WindowsCpuStatsReader); + #[cfg(not(feature = "windows-enhanced-metrics"))] + let reader: Box = Box::new(crate::linux::LinuxCpuStatsReader); + Self { + reader, + aggregator, + tags, + last_usage_ns: None, + last_collection_time: std::time::Instant::now(), + } + } + + pub fn collect_and_submit(&mut self) { + if let Some(cpu_stats) = self.reader.read() { + // Submit metrics + let current_usage_ns = cpu_stats.total; + let now_instant = std::time::Instant::now(); + + // Skip first collection + let Some(last_usage_ns) = self.last_usage_ns else { + debug!("First CPU collection, skipping interval"); + self.last_usage_ns = Some(current_usage_ns); + self.last_collection_time = now_instant; + return; + }; + + if current_usage_ns < last_usage_ns { + debug!("Current CPU usage is less than last usage, skipping interval"); + self.last_usage_ns = Some(current_usage_ns); + self.last_collection_time = now_instant; + return; + } + let delta_ns = (current_usage_ns - last_usage_ns) as f64; + self.last_usage_ns = Some(current_usage_ns); + let elapsed_secs = now_instant + .duration_since(self.last_collection_time) + .as_secs_f64(); + self.last_collection_time = now_instant; + + // Divide nanoseconds delta by elapsed time to get usage rate in nanocores + let usage_rate_nc = delta_ns / elapsed_secs; + + let now = std::time::UNIX_EPOCH + .elapsed() + .map(|d| d.as_secs()) + .unwrap_or(0) + .try_into() + .unwrap_or(0); + + let usage_metric = Metric::new( + CPU_USAGE_METRIC.into(), + MetricValue::distribution(usage_rate_nc), + self.tags.clone(), + Some(now), + ); + + if let Err(e) = self.aggregator.insert_batch(vec![usage_metric]) { + error!("Failed to insert CPU usage metric: {}", e); + } + + if let Some(limit) = cpu_stats.limit { + if cpu_stats.defaulted_limit { + debug!("CPU limit defaulted to host CPU count"); + } + let limit_metric = Metric::new( + CPU_LIMIT_METRIC.into(), + MetricValue::distribution(limit), + self.tags.clone(), + Some(now), + ); + if let Err(e) = self.aggregator.insert_batch(vec![limit_metric]) { + error!("Failed to insert CPU limit metric: {}", e); + } + } + } else { + debug!( + "Skipping CPU metrics collection - could not find data to generate CPU usage and limit enhanced metrics" + ); + } + } +} + +pub fn build_cpu_metrics_tags() -> Option { + let mut tag_parts = Vec::new(); + // Azure tags from ddcommon + if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION { + let aas_tags = [ + ("resource_id", aas_metadata.get_resource_id()), + ("resource_group", aas_metadata.get_resource_group()), + ("subscription_id", aas_metadata.get_subscription_id()), + ("name", aas_metadata.get_site_name()), + ]; + for (name, value) in aas_tags { + if value != "unknown" { + tag_parts.push(format!("{}:{}", name, value)); + } + } + } + + // Tags from env vars (not in ddcommon) - origin tag is added by DogStatsD + for (tag_name, env_var) in [ + ("region", "REGION_NAME"), + ("plan_tier", "WEBSITE_SKU"), + ("service", "DD_SERVICE"), + ("env", "DD_ENV"), + ("version", "DD_VERSION"), + ("serverless_compat_version", "DD_SERVERLESS_COMPAT_VERSION"), + ] { + if let Ok(val) = env::var(env_var) { + if !val.is_empty() { + tag_parts.push(format!("{}:{}", tag_name, val)); + } + } + } + + if tag_parts.is_empty() { + return None; + } + SortedTags::parse(&tag_parts.join(",")).ok() +} diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs new file mode 100644 index 00000000..aa565e5f --- /dev/null +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -0,0 +1,14 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod cpu; +#[cfg(not(feature = "windows-enhanced-metrics"))] +pub(crate) mod linux; +#[cfg(feature = "windows-enhanced-metrics")] +pub(crate) mod windows; diff --git a/crates/datadog-metrics-collector/src/linux.rs b/crates/datadog-metrics-collector/src/linux.rs new file mode 100644 index 00000000..d8f7b59d --- /dev/null +++ b/crates/datadog-metrics-collector/src/linux.rs @@ -0,0 +1,273 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions +//! +//! This module provides functionality to read raw CPU statistics from cgroup v1 files +//! and compute the CPU usage and limit in Linux environments. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use crate::cpu::{CpuStats, CpuStatsReader}; +use std::fs; +use std::io; +use tracing::debug; + +const CGROUP_CPU_USAGE_PATH: &str = "/sys/fs/cgroup/cpu/cpuacct.usage"; // Reports the total CPU time, in nanoseconds, consumed by all tasks in this cgroup +const CGROUP_CPUSET_CPUS_PATH: &str = "/sys/fs/cgroup/cpuset/cpuset.cpus"; // Specifies the CPUs that tasks in this cgroup are permitted to access +const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // Specifies a period of time, in microseconds, for how regularly a cgroup's access to CPU resources should be reallocated +const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period + +/// Statistics from cgroup v1 files, normalized to nanoseconds +struct CgroupStats { + total: Option, // Cumulative CPU usage (from cpuacct.usage) in nanoseconds + cpu_count: Option, // Number of accessible logical CPUs (from cpuset.cpus) + scheduler_period: Option, // CFS scheduler period (from cpu.cfs_period_us) in nanoseconds + scheduler_quota: Option, // CFS scheduler quota (from cpu.cfs_quota_us) in nanoseconds +} + +pub struct LinuxCpuStatsReader; + +impl CpuStatsReader for LinuxCpuStatsReader { + fn read(&self) -> Option { + let cgroup_stats = read_cgroup_stats(); + build_cpu_stats(&cgroup_stats) + } +} + +/// Builds CPU stats - rate and limit +fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { + let total = cgroup_stats.total?; + + let (limit_nc, defaulted) = compute_cpu_limit_nc(cgroup_stats); + + Some(CpuStats { + total: total, + limit: Some(limit_nc), + defaulted_limit: defaulted, + }) +} + +/// Reads raw CPU statistics from cgroup v1 files and converts to nanoseconds +fn read_cgroup_stats() -> CgroupStats { + let total = fs::read_to_string(CGROUP_CPU_USAGE_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().ok()); + if total.is_none() { + debug!("Could not read CPU usage from {CGROUP_CPU_USAGE_PATH}"); + } + + let cpu_count = read_cpu_count_from_file(CGROUP_CPUSET_CPUS_PATH).ok(); + if cpu_count.is_none() { + debug!("Could not read CPU count from {CGROUP_CPUSET_CPUS_PATH}"); + } + + let scheduler_period = fs::read_to_string(CGROUP_CPU_PERIOD_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().map(|v| v * 1000).ok()); // Convert from microseconds to nanoseconds + if scheduler_period.is_none() { + debug!("Could not read scheduler period from {CGROUP_CPU_PERIOD_PATH}"); + } + + let scheduler_quota = fs::read_to_string(CGROUP_CPU_QUOTA_PATH) + .ok() + .and_then(|contents| { + contents.trim().parse::().ok().and_then(|quota| { + // Convert from microseconds to nanoseconds + if quota == -1 { + debug!("CFS scheduler quota is -1, setting to None"); + None + } else { + Some((quota * 1000) as u64) + } + }) + }); + + CgroupStats { + total, + cpu_count, + scheduler_period, + scheduler_quota, + } +} + +/// Reads CPU count from cpuset.cpus +/// +/// The cpuset.cpus file contains a comma-separated list, with dashes to represent ranges of CPUs, +/// e.g., "0-2,16" represents CPUs 0, 1, 2, and 16 +/// This function returns the count of CPUs, in this case 4. +fn read_cpu_count_from_file(path: &str) -> Result { + let contents = fs::read_to_string(path)?; + let cpuset_str = contents.trim(); + if cpuset_str.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("File {path} is empty"), + )); + } + + let mut cpu_count: u64 = 0; + + for part in cpuset_str.split(',') { + let range: Vec<&str> = part.split('-').collect(); + if range.len() == 2 { + // Range like "0-3" + let start: u64 = range[0].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + let end: u64 = range[1].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + cpu_count += end - start + 1; + } else { + // Single CPU like "2" + cpu_count += 1; + } + } + Ok(cpu_count) +} + +/// Computes the CPU limit in nanocores, with fallback to host CPU count +fn compute_cpu_limit_nc(cgroup_stats: &CgroupStats) -> (f64, bool) { + match compute_cgroup_cpu_limit_nc(cgroup_stats) { + Some(limit) => (limit, false), + None => { + let host_cpu_count = num_cpus::get() as f64; + (host_cpu_count * 1000000000.0, true) // Convert to nanocores + } + } +} + +/// Computes the CPU limit in nanocores from cgroup statistics +/// Limit is computed using min(CPUSet, CFS CPU Quota) +fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { + let mut limit_nc = None; + + if let Some(cpu_count) = cgroup_stats.cpu_count { + let host_cpu_count = num_cpus::get() as u64; + if cpu_count != host_cpu_count { + let cpuset_limit_nc = cpu_count as f64 * 1000000000.0; // Convert to nanocores + limit_nc = Some(cpuset_limit_nc); + } + } + + if let (Some(scheduler_quota), Some(scheduler_period)) = + (cgroup_stats.scheduler_quota, cgroup_stats.scheduler_period) + { + let quota_limit_nc = 1000000000.0 * (scheduler_quota as f64 / scheduler_period as f64); + match limit_nc { + None => { + limit_nc = Some(quota_limit_nc); + debug!( + "limit_nc is None, setting CPU limit from cfs quota: {} nanocores", + quota_limit_nc + ); + } + Some(current_limit_nc) if quota_limit_nc < current_limit_nc => { + limit_nc = Some(quota_limit_nc); + debug!( + "CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {} nanocores", + quota_limit_nc + ); + } + _ => { + debug!("Keeping cpuset limit: {:?} nanocores", limit_nc); + } + } + } + limit_nc +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_stats( + cpu_count: Option, + scheduler_quota: Option, + scheduler_period: Option, + ) -> CgroupStats { + CgroupStats { + total: Some(0), + cpu_count, + scheduler_quota, + scheduler_period, + } + } + + #[test] + fn test_no_limit_returns_none() { + let stats = make_stats(None, None, None); + assert!(compute_cgroup_cpu_limit_nc(&stats).is_none()); + } + + #[test] + fn test_quota_unlimited_minus_one_returns_none() { + // quota=-1 is filtered out during parsing, so None here means unlimited + let stats = make_stats(None, None, Some(100_000_000)); + assert!(compute_cgroup_cpu_limit_nc(&stats).is_none()); + } + + #[test] + fn test_limited_to_2_cores_by_quota() { + let stats = make_stats(None, Some(200_000_000), Some(100_000_000)); // 200ms / 100ms = 2 cores + let result = compute_cgroup_cpu_limit_nc(&stats); + assert!((result.unwrap() - 2_000_000_000.0).abs() < 1_000.0); // Tolerance of 1,000 nanocores due to floating point arithmetic rounding errors + } + + #[test] + fn test_limited_to_half_core_by_quota() { + let stats = make_stats(None, Some(50_000_000), Some(100_000_000)); // 50ms / 100ms = 0.5 cores + let result = compute_cgroup_cpu_limit_nc(&stats); + assert!((result.unwrap() - 500_000_000.0).abs() < 1_000.0); + } + + #[test] + fn test_read_cpu_count_single() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_single.txt"); + std::fs::write(&path, "0-3\n").unwrap(); + let count = read_cpu_count_from_file(path.to_str().unwrap()).unwrap(); + assert_eq!(count, 4); + } + + #[test] + fn test_read_cpu_count_mixed() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_mixed.txt"); + std::fs::write(&path, "0-2,16\n").unwrap(); + let count = read_cpu_count_from_file(path.to_str().unwrap()).unwrap(); + assert_eq!(count, 4); // 0,1,2 + 16 + } + + #[test] + fn test_read_cpu_count_empty_file() { + let dir = std::env::temp_dir(); + let path = dir.join("cpuset_empty.txt"); + std::fs::write(&path, "").unwrap(); + assert!(read_cpu_count_from_file(path.to_str().unwrap()).is_err()); + } + + #[test] + fn test_compute_cpu_limit_nc_with_quota() { + let stats = make_stats(None, Some(200_000_000), Some(100_000_000)); + let (limit, defaulted) = compute_cpu_limit_nc(&stats); + assert!((limit - 2_000_000_000.0).abs() < 1_000.0); + assert!(!defaulted); + } + + #[test] + fn test_compute_cpu_limit_nc_defaults_to_host() { + let stats = make_stats(None, None, None); + let (limit, defaulted) = compute_cpu_limit_nc(&stats); + let expected = num_cpus::get() as f64 * 1_000_000_000.0; + assert!((limit - expected).abs() < 1_000.0); + assert!(defaulted); + } +} diff --git a/crates/datadog-metrics-collector/src/windows.rs b/crates/datadog-metrics-collector/src/windows.rs new file mode 100644 index 00000000..346b556b --- /dev/null +++ b/crates/datadog-metrics-collector/src/windows.rs @@ -0,0 +1,21 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions +//! +//! This module provides functionality to read raw CPU statistics +//! and compute the CPU usage and limit in Windows environments. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use crate::cpu::{CpuStats, CpuStatsReader}; +use tracing::debug; + +pub struct WindowsCpuStatsReader; + +impl CpuStatsReader for WindowsCpuStatsReader { + fn read(&self) -> Option { + debug!("CPU enhanced metrics are not yet supported on Windows Azure Functions"); + None + } +} diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index ed573669..6fb19a4d 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -8,9 +8,11 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [features] default = [] windows-pipes = ["datadog-trace-agent/windows-pipes", "dogstatsd/windows-pipes"] +windows-enhanced-metrics = ["datadog-metrics-collector/windows-enhanced-metrics"] [dependencies] datadog-trace-agent = { path = "../datadog-trace-agent" } +datadog-metrics-collector = { path = "../datadog-metrics-collector" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" } datadog-fips = { path = "../datadog-fips", default-features = false } dogstatsd = { path = "../dogstatsd", default-features = true } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 6d764815..f4c3009c 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -23,6 +23,8 @@ use datadog_trace_agent::{ trace_processor, }; +use datadog_metrics_collector::cpu::CpuMetricsCollector; + use libdd_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; use datadog_fips::reqwest_adapter::create_reqwest_client_builder; @@ -39,6 +41,7 @@ use dogstatsd::{ use dogstatsd::metric::{SortedTags, EMPTY_TAGS}; use tokio_util::sync::CancellationToken; +const CPU_METRICS_COLLECTION_INTERVAL_SECS: u64 = 1; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; @@ -104,6 +107,12 @@ pub async fn main() { .ok() .and_then(|val| parse_metric_namespace(&val)); + // Only enable enhanced metrics for Azure Functions + let dd_enhanced_metrics = env_type == EnvironmentType::AzureFunction + && env::var("DD_ENHANCED_METRICS_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + let https_proxy = env::var("DD_PROXY_HTTPS") .or_else(|_| env::var("HTTPS_PROXY")) .ok(); @@ -170,53 +179,87 @@ pub async fn main() { } }); - let (metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { - debug!("Starting dogstatsd"); - let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( - dd_dogstatsd_port, - dd_api_key, - dd_site, - https_proxy, - dogstatsd_tags, - dd_statsd_metric_namespace, - #[cfg(all(windows, feature = "windows-pipes"))] - dd_dogstatsd_windows_pipe_name.clone(), - ) - .await; - if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { - info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + let needs_aggregator = dd_use_dogstatsd || dd_enhanced_metrics; + + // The aggregator is shared between dogstatsd and enhanced metrics. + // It is started independently so that either can be enabled without the other. + // Only dogstatsd needs the dogstatsd listener + let (metrics_flusher, aggregator_handle) = if needs_aggregator { + debug!("Creating metrics flusher and aggregator"); + + let (flusher, handle) = + start_aggregator(dd_api_key, dd_site, https_proxy, dogstatsd_tags).await; + + if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let _ = start_dogstatsd_listener( + dd_dogstatsd_port, + handle.clone(), + dd_statsd_metric_namespace, + #[cfg(all(windows, feature = "windows-pipes"))] + dd_dogstatsd_windows_pipe_name.clone(), + ) + .await; + if let Some(ref windows_pipe_name) = dd_dogstatsd_windows_pipe_name { + info!("dogstatsd-pipe: starting to listen on pipe {windows_pipe_name}"); + } else { + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + } } else { - info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + info!("dogstatsd disabled"); } - (metrics_flusher, Some(aggregator_handle)) + (flusher, Some(handle)) } else { - info!("dogstatsd disabled"); + info!("dogstatsd and enhanced metrics disabled"); (None, None) }; + let mut cpu_collector = if dd_enhanced_metrics && metrics_flusher.is_some() { + aggregator_handle.as_ref().map(|handle| { + let tags = datadog_metrics_collector::cpu::build_cpu_metrics_tags(); + CpuMetricsCollector::new(handle.clone(), tags) + }) + } else { + if !dd_enhanced_metrics { + info!("Enhanced metrics disabled"); + } else { + info!("Enhanced metrics enabled but metrics flusher not found"); + } + None + }; + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + let mut cpu_collection_interval = + interval(Duration::from_secs(CPU_METRICS_COLLECTION_INTERVAL_SECS)); flush_interval.tick().await; // discard first tick, which is instantaneous + cpu_collection_interval.tick().await; loop { - flush_interval.tick().await; - - if let Some(metrics_flusher) = metrics_flusher.as_ref() { - debug!("Flushing dogstatsd metrics"); - metrics_flusher.flush().await; + tokio::select! { + _ = flush_interval.tick() => { + if let Some(metrics_flusher) = metrics_flusher.clone() { + debug!("Flushing dogstatsd metrics"); + tokio::spawn(async move { + metrics_flusher.flush().await; + }); + } + } + _ = cpu_collection_interval.tick() => { + if let Some(ref mut collector) = cpu_collector { + collector.collect_and_submit(); + } + } } } } -async fn start_dogstatsd( - port: u16, +async fn start_aggregator( dd_api_key: Option, dd_site: String, https_proxy: Option, dogstatsd_tags: &str, - metric_namespace: Option, - #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, -) -> (CancellationToken, Option, AggregatorHandle) { - // 1. Create the aggregator service +) -> (Option, AggregatorHandle) { + // Create the aggregator service #[allow(clippy::expect_used)] let (service, handle) = AggregatorService::new( SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS), @@ -224,53 +267,18 @@ async fn start_dogstatsd( ) .expect("Failed to create aggregator service"); - // 2. Start the aggregator service in the background + // Start the aggregator service in the background tokio::spawn(service.run()); - #[cfg(all(windows, feature = "windows-pipes"))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - windows_pipe_name, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - - #[cfg(not(all(windows, feature = "windows-pipes")))] - let dogstatsd_config = DogStatsDConfig { - host: AGENT_HOST.to_string(), - port, - metric_namespace, - so_rcvbuf: None, - buffer_size: None, - queue_size: None, - }; - let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); - - // 3. Use handle in DogStatsD (cheap to clone) - let dogstatsd_client = DogStatsD::new( - &dogstatsd_config, - handle.clone(), - dogstatsd_cancel_token.clone(), - ) - .await; - - tokio::spawn(async move { - dogstatsd_client.spin().await; - }); - let metrics_flusher = match dd_api_key { Some(dd_api_key) => { let client = match build_metrics_client(https_proxy, DOGSTATSD_TIMEOUT_DURATION) { Ok(client) => client, Err(e) => { error!("Failed to build HTTP client: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; - let metrics_intake_url_prefix = match Site::new(dd_site) .map_err(|e| e.to_string()) .and_then(|site| { @@ -279,7 +287,7 @@ async fn start_dogstatsd( Ok(prefix) => prefix, Err(e) => { error!("Failed to create metrics intake URL: {e}, won't flush metrics"); - return (dogstatsd_cancel_token, None, handle); + return (None, handle); } }; @@ -299,7 +307,50 @@ async fn start_dogstatsd( } }; - (dogstatsd_cancel_token, metrics_flusher, handle) + (metrics_flusher, handle) +} + +async fn start_dogstatsd_listener( + port: u16, + handle: AggregatorHandle, + metric_namespace: Option, + #[cfg(all(windows, feature = "windows-pipes"))] windows_pipe_name: Option, +) -> CancellationToken { + #[cfg(all(windows, feature = "windows-pipes"))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + windows_pipe_name, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + + #[cfg(not(all(windows, feature = "windows-pipes")))] + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + metric_namespace, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + + // Use handle in DogStatsD (cheap to clone) + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + handle.clone(), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + dogstatsd_cancel_token } fn build_metrics_client( diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index d0c0952d..818766d5 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -18,6 +18,7 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; const DATADOG_PREFIX: &str = "datadog."; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -83,15 +84,17 @@ impl Metric { .join("."); // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) - || metric_name.starts_with(RUNTIME_PREFIX) - { - OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { - OriginService::ServerlessEnhanced - } else { - OriginService::ServerlessCustom - }; + let service = + if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; // Then determine the category based on tags let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") {