diff --git a/include/datadog/environment.h b/include/datadog/environment.h index f2846b37..1d96a223 100644 --- a/include/datadog/environment.h +++ b/include/datadog/environment.h @@ -69,6 +69,7 @@ namespace environment { MACRO(DD_VERSION, STRING, "") \ MACRO(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED, BOOLEAN, true) \ MACRO(DD_TELEMETRY_HEARTBEAT_INTERVAL, DECIMAL, 10) \ + MACRO(DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL, DECIMAL, 86400) \ MACRO(DD_TELEMETRY_METRICS_ENABLED, BOOLEAN, true) \ MACRO(DD_TELEMETRY_METRICS_INTERVAL_SECONDS, DECIMAL, 60) \ MACRO(DD_TELEMETRY_DEBUG, BOOLEAN, false) \ diff --git a/include/datadog/telemetry/configuration.h b/include/datadog/telemetry/configuration.h index 51693f26..865cbd81 100644 --- a/include/datadog/telemetry/configuration.h +++ b/include/datadog/telemetry/configuration.h @@ -29,6 +29,11 @@ struct Configuration { // Interval at which the heartbeat payload will be sent. // Can be overriden by `DD_TELEMETRY_HEARTBEAT_INTERVAL` environment variable. tracing::Optional heartbeat_interval_seconds; + // Interval at which the extended heartbeat payload will be sent. + // Default: 86400s (24h). + // Can be overriden by `DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL` environment + // variable. + tracing::Optional extended_heartbeat_interval_seconds; // `integration_name` is the name of the product integrating this library. // Example: "nginx", "envoy" or "istio". tracing::Optional integration_name; @@ -52,6 +57,7 @@ struct FinalizedConfiguration { bool report_logs; std::chrono::steady_clock::duration metrics_interval; std::chrono::steady_clock::duration heartbeat_interval; + std::chrono::steady_clock::duration extended_heartbeat_interval; std::string integration_name; std::string integration_version; std::vector products; diff --git a/src/datadog/telemetry/configuration.cpp b/src/datadog/telemetry/configuration.cpp index cc8d2e85..fdea034d 100644 --- a/src/datadog/telemetry/configuration.cpp +++ b/src/datadog/telemetry/configuration.cpp @@ -48,6 +48,16 @@ tracing::Expected load_telemetry_env_config() { env_cfg.heartbeat_interval_seconds = *maybe_value; } + if (auto extended_heartbeat_interval_seconds = + lookup(environment::DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL)) { + auto maybe_value = parse_double(*extended_heartbeat_interval_seconds); + if (auto error = maybe_value.if_error()) { + return *error; + } + + env_cfg.extended_heartbeat_interval_seconds = *maybe_value; + } + return env_cfg; } @@ -112,6 +122,19 @@ tracing::Expected finalize_config( std::chrono::duration_cast( std::chrono::duration(heartbeat_interval.second)); + // extended_heartbeat_interval_seconds + auto extended_heartbeat_interval = + pick(env_config->extended_heartbeat_interval_seconds, + user_config.extended_heartbeat_interval_seconds, 86400); + if (extended_heartbeat_interval.second <= 0.) { + return Error{Error::Code::OUT_OF_RANGE_INTEGER, + "Telemetry extended heartbeat polling interval must be a " + "positive value"}; + } + result.extended_heartbeat_interval = + std::chrono::duration_cast( + std::chrono::duration(extended_heartbeat_interval.second)); + // integration_name std::tie(origin, result.integration_name) = pick(env_config->integration_name, user_config.integration_name, diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index d9464dd7..35675113 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -198,6 +198,39 @@ nlohmann::json encode_distributions( return j; } +nlohmann::json encode_configuration_field(const ConfigMetadata& config_metadata, + std::size_t seq_id) { + auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, + {"value", config_metadata.value}, + {"seq_id", seq_id}}; + + switch (config_metadata.origin) { + case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE: + j["origin"] = "env_var"; + break; + case ConfigMetadata::Origin::CODE: + j["origin"] = "code"; + break; + case ConfigMetadata::Origin::REMOTE_CONFIG: + j["origin"] = "remote_config"; + break; + case ConfigMetadata::Origin::DEFAULT: + j["origin"] = "default"; + break; + } + + if (config_metadata.error) { + // clang-format off + j["error"] = { + {"code", config_metadata.error->code}, + {"message", config_metadata.error->message} + }; + // clang-format on + } + + return j; +} + } // namespace Telemetry::Telemetry(FinalizedConfiguration config, @@ -214,6 +247,16 @@ Telemetry::Telemetry(FinalizedConfiguration config, clock_(std::move(clock)), scheduler_(event_scheduler), host_info_(get_host_info()) { + // Initialize current_configuration_ from the product configurations provided + // at startup. Each config name maps to a vector ordered by precedence + // (lowest to highest), so we take the last entry as the effective value. + for (const auto& product : config_.products) { + for (const auto& [config_name, config_metadatas] : product.configurations) { + if (!config_metadatas.empty()) { + current_configuration_[config_name] = config_metadatas.back(); + } + } + } app_started(); schedule_tasks(); } @@ -227,6 +270,12 @@ void Telemetry::schedule_tasks() { tasks_.emplace_back(scheduler_->schedule_recurring_event( config_.metrics_interval, [this]() mutable { capture_metrics(); })); } + + tasks_.emplace_back(scheduler_->schedule_recurring_event( + config_.extended_heartbeat_interval, + [this]() { + send_payload("app-extended-heartbeat", extended_heartbeat_payload()); + })); } Telemetry::~Telemetry() { @@ -249,6 +298,7 @@ Telemetry::Telemetry(Telemetry&& rhs) rates_(std::move(rhs.rates_)), rates_snapshot_(std::move(rhs.rates_snapshot_)), distributions_(std::move(rhs.distributions_)), + current_configuration_(std::move(rhs.current_configuration_)), seq_id_(rhs.seq_id_), config_seq_ids_(rhs.config_seq_ids_), host_info_(rhs.host_info_) { @@ -274,6 +324,7 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(distributions_, rhs.distributions_); std::swap(seq_id_, rhs.seq_id_); std::swap(config_seq_ids_, rhs.config_seq_ids_); + std::swap(current_configuration_, rhs.current_configuration_); std::swap(host_info_, rhs.host_info_); schedule_tasks(); } @@ -425,13 +476,13 @@ void Telemetry::send_payload(StringView request_type, std::string payload) { void Telemetry::send_configuration_change() { if (configuration_snapshot_.empty()) return; - std::vector current_configuration; - std::swap(current_configuration, configuration_snapshot_); + std::vector pending; + std::swap(pending, configuration_snapshot_); auto configuration_json = nlohmann::json::array(); - for (const auto& config_metadata : current_configuration) { + for (const auto& config_metadata : pending) { configuration_json.emplace_back( - generate_configuration_field(config_metadata)); + report_new_config_field(config_metadata)); } auto telemetry_body = @@ -591,7 +642,7 @@ std::string Telemetry::app_started_payload() { // if (config_metadata.value.empty()) continue; for (const auto& config_metadata : config_metadatas) { configuration_json.emplace_back( - generate_configuration_field(config_metadata)); + report_new_config_field(config_metadata)); } } @@ -678,6 +729,36 @@ std::string Telemetry::app_started_payload() { return batch.dump(); } +std::string Telemetry::extended_heartbeat_payload() { + auto configuration_json = nlohmann::json::array(); + for (const auto& [_, config_metadata] : current_configuration_) { + // Use the current seq_id without incrementing — this is a snapshot + // refresh, not a new change event. + auto seq_id = config_seq_ids_[config_metadata.name]; + configuration_json.emplace_back( + encode_configuration_field(config_metadata, seq_id)); + } + + auto payload = nlohmann::json{{"configuration", configuration_json}}; + + if (!config_.integration_name.empty()) { + payload["integrations"] = nlohmann::json::array({ + nlohmann::json{{"name", config_.integration_name}, + {"version", config_.integration_version}, + {"enabled", true}}, + }); + } + + auto extended_heartbeat = nlohmann::json{ + {"request_type", "app-extended-heartbeat"}, + {"payload", std::move(payload)}, + }; + + auto batch = generate_telemetry_body("message-batch"); + batch["payload"] = nlohmann::json::array({std::move(extended_heartbeat)}); + return batch.dump(); +} + nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) { std::time_t tracer_time = std::chrono::duration_cast( clock_().wall.time_since_epoch()) @@ -711,49 +792,21 @@ nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) { }); } -nlohmann::json Telemetry::generate_configuration_field( +nlohmann::json Telemetry::report_new_config_field( const ConfigMetadata& config_metadata) { // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can // detect between non set fields. config_seq_ids_[config_metadata.name] += 1; - auto seq_id = config_seq_ids_[config_metadata.name]; - - auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, - {"value", config_metadata.value}, - {"seq_id", seq_id}}; - - switch (config_metadata.origin) { - case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE: - j["origin"] = "env_var"; - break; - case ConfigMetadata::Origin::CODE: - j["origin"] = "code"; - break; - case ConfigMetadata::Origin::REMOTE_CONFIG: - j["origin"] = "remote_config"; - break; - case ConfigMetadata::Origin::DEFAULT: - j["origin"] = "default"; - break; - } - - if (config_metadata.error) { - // clang-format off - j["error"] = { - {"code", config_metadata.error->code}, - {"message", config_metadata.error->message} - }; - // clang-format on - } - - return j; + return encode_configuration_field(config_metadata, + config_seq_ids_[config_metadata.name]); } void Telemetry::capture_configuration_change( const std::vector& new_configuration) { - configuration_snapshot_.insert(configuration_snapshot_.begin(), - new_configuration.begin(), - new_configuration.end()); + for (const auto& config : new_configuration) { + configuration_snapshot_.push_back(config); + current_configuration_[config.name] = config; + } } void Telemetry::capture_metrics() { diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 7c92db3b..dd35e1bb 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -57,7 +57,13 @@ class Telemetry final { distributions_; /// Configuration + // Pending configuration changes to be sent in the next + // `app-client-configuration-change` event. std::vector configuration_snapshot_; + // Full current configuration state (initialized from app-started, updated on + // every configuration change). Used for `app-extended-heartbeat`. + std::unordered_map + current_configuration_; std::mutex log_mutex_; std::vector logs_; @@ -143,7 +149,7 @@ class Telemetry final { tracing::Optional stacktrace = tracing::nullopt); nlohmann::json generate_telemetry_body(std::string request_type); - nlohmann::json generate_configuration_field( + nlohmann::json report_new_config_field( const tracing::ConfigMetadata& config_metadata); // Constructs an `app-started` message using information provided when @@ -155,6 +161,9 @@ class Telemetry final { // Constructs a message-batch containing `app-closing`, and if metrics have // been modified, a `generate-metrics` message. std::string app_closing_payload(); + // Constructs a message-batch containing `app-extended-heartbeat` with the + // full current configuration state. + std::string extended_heartbeat_payload(); }; } // namespace datadog::telemetry diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 83590773..1c8fd3f8 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -46,11 +46,17 @@ struct FakeEventScheduler : public EventScheduler { size_t count_tasks = 0; std::function heartbeat_callback = nullptr; std::function metrics_callback = nullptr; + std::function extended_heartbeat_callback = nullptr; Optional heartbeat_interval; Optional metrics_interval; + Optional extended_heartbeat_interval; bool cancelled = false; // NOTE: White box testing. This is a limitation of the event scheduler API. + // Task ordering from Telemetry::schedule_tasks(): + // 0: heartbeat (always) + // 1: metrics capture (only when report_metrics=true) + // 1 or 2: extended heartbeat (always; index depends on report_metrics) Cancel schedule_recurring_event(std::chrono::steady_clock::duration interval, std::function callback) override { if (count_tasks == 0) { @@ -59,6 +65,9 @@ struct FakeEventScheduler : public EventScheduler { } else if (count_tasks == 1) { metrics_callback = callback; metrics_interval = interval; + } else if (count_tasks == 2) { + extended_heartbeat_callback = callback; + extended_heartbeat_interval = interval; } count_tasks++; return [this]() { cancelled = true; }; @@ -74,6 +83,11 @@ struct FakeEventScheduler : public EventScheduler { metrics_callback(); } + void trigger_extended_heartbeat() { + assert(extended_heartbeat_callback != nullptr); + extended_heartbeat_callback(); + } + std::string config() const override { return nlohmann::json::object({{"type", "FakeEventScheduler"}}).dump(); } @@ -858,6 +872,75 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } } +TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry extended heartbeat") { + auto logger = std::make_shared(); + auto client = std::make_shared(); + auto scheduler = std::make_shared(); + + const TracerSignature tracer_signature{ + /* runtime_id = */ RuntimeID::generate(), + /* service = */ "testsvc", + /* environment = */ "test"}; + + auto url = HTTPClient::URL::parse("http://localhost:8000"); + + SECTION("sends full configuration state") { + Product product; + product.name = Product::Name::tracing; + product.enabled = true; + product.version = tracer_version; + product.configurations = { + {ConfigName::SERVICE_NAME, + {ConfigMetadata(ConfigName::SERVICE_NAME, "default-service", + ConfigMetadata::Origin::DEFAULT), + ConfigMetadata(ConfigName::SERVICE_NAME, "env-service", + ConfigMetadata::Origin::ENVIRONMENT_VARIABLE)}}, + {ConfigName::REPORT_TRACES, + {ConfigMetadata(ConfigName::REPORT_TRACES, "true", + ConfigMetadata::Origin::DEFAULT)}}, + }; + + Configuration cfg; + cfg.products.emplace_back(std::move(product)); + + Telemetry telemetry{*finalize_config(cfg), tracer_signature, logger, client, + scheduler, *url}; + + // Simulate a remote config update changing SERVICE_NAME and REPORT_TRACES + const std::vector rc_update{ + {ConfigName::SERVICE_NAME, "rc-service", + ConfigMetadata::Origin::REMOTE_CONFIG}, + }; + telemetry.capture_configuration_change(rc_update); + + client->clear(); + scheduler->trigger_extended_heartbeat(); + + auto payload = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(payload)); + REQUIRE(payload["request_type"] == "message-batch"); + + auto extended_hb = + find_payload(payload["payload"], "app-extended-heartbeat"); + REQUIRE(extended_hb.has_value()); + + auto configuration = (*extended_hb)["payload"]["configuration"]; + REQUIRE(configuration.is_array()); + + // Full state: SERVICE_NAME should reflect the remote config override + bool found_service_name = false; + for (const auto& entry : configuration) { + if (entry["name"] == "service") { + found_service_name = true; + CHECK(entry["value"] == "rc-service"); + CHECK(entry["origin"] == "remote_config"); + } + } + CHECK(found_service_name); + } + +} + TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { // Cases: // - when `report_metrics` is set to false. No metrics are reported. @@ -885,25 +968,32 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { Telemetry telemetry(*final_cfg, tracer_signature, logger, client, scheduler, *url); - CHECK(scheduler->metrics_callback == nullptr); - CHECK(scheduler->metrics_interval == nullopt); + // When report_metrics=false, only 2 tasks are registered: + // 0: heartbeat, 1: extended_heartbeat (no metrics capture task). + CHECK(scheduler->count_tasks == 2); + CHECK(scheduler->extended_heartbeat_callback == nullptr); } SECTION("intervals are respected") { Configuration cfg; cfg.metrics_interval_seconds = .5; cfg.heartbeat_interval_seconds = 30; + cfg.extended_heartbeat_interval_seconds = 3600; auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); Telemetry telemetry(*final_cfg, tracer_signature, logger, client, scheduler, *url); + CHECK(scheduler->heartbeat_callback != nullptr); + CHECK(scheduler->heartbeat_interval == 30s); + CHECK(scheduler->metrics_callback != nullptr); CHECK(scheduler->metrics_interval == 500ms); - CHECK(scheduler->heartbeat_callback != nullptr); - CHECK(scheduler->metrics_interval != 30s); + CHECK(scheduler->extended_heartbeat_callback != nullptr); + CHECK(scheduler->extended_heartbeat_interval == + std::chrono::milliseconds(3600000)); } SECTION("disabling logs reporting do not collect logs") {