Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/datadog/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace environment {
MACRO(DD_VERSION, STRING, "") \
MACRO(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED, BOOLEAN, true) \
MACRO(DD_TELEMETRY_HEARTBEAT_INTERVAL, DECIMAL, 10) \
MACRO(DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL, DECIMAL, 86400) \
MACRO(DD_TELEMETRY_METRICS_ENABLED, BOOLEAN, true) \
MACRO(DD_TELEMETRY_METRICS_INTERVAL_SECONDS, DECIMAL, 60) \
MACRO(DD_TELEMETRY_DEBUG, BOOLEAN, false) \
Expand Down
6 changes: 6 additions & 0 deletions include/datadog/telemetry/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct Configuration {
// Interval at which the heartbeat payload will be sent.
// Can be overriden by `DD_TELEMETRY_HEARTBEAT_INTERVAL` environment variable.
tracing::Optional<double> heartbeat_interval_seconds;
// Interval at which the extended heartbeat payload will be sent.
// Default: 86400s (24h).
// Can be overriden by `DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL` environment
// variable.
tracing::Optional<double> extended_heartbeat_interval_seconds;
// `integration_name` is the name of the product integrating this library.
// Example: "nginx", "envoy" or "istio".
tracing::Optional<std::string> integration_name;
Expand All @@ -52,6 +57,7 @@ struct FinalizedConfiguration {
bool report_logs;
std::chrono::steady_clock::duration metrics_interval;
std::chrono::steady_clock::duration heartbeat_interval;
std::chrono::steady_clock::duration extended_heartbeat_interval;
std::string integration_name;
std::string integration_version;
std::vector<Product> products;
Expand Down
23 changes: 23 additions & 0 deletions src/datadog/telemetry/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ tracing::Expected<Configuration> load_telemetry_env_config() {
env_cfg.heartbeat_interval_seconds = *maybe_value;
}

if (auto extended_heartbeat_interval_seconds =
lookup(environment::DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL)) {
auto maybe_value = parse_double(*extended_heartbeat_interval_seconds);
if (auto error = maybe_value.if_error()) {
return *error;
}

env_cfg.extended_heartbeat_interval_seconds = *maybe_value;
}

return env_cfg;
}

Expand Down Expand Up @@ -112,6 +122,19 @@ tracing::Expected<FinalizedConfiguration> finalize_config(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double>(heartbeat_interval.second));

// extended_heartbeat_interval_seconds
auto extended_heartbeat_interval =
pick(env_config->extended_heartbeat_interval_seconds,
user_config.extended_heartbeat_interval_seconds, 86400);
if (extended_heartbeat_interval.second <= 0.) {
return Error{Error::Code::OUT_OF_RANGE_INTEGER,
"Telemetry extended heartbeat polling interval must be a "
"positive value"};
}
result.extended_heartbeat_interval =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double>(extended_heartbeat_interval.second));

// integration_name
std::tie(origin, result.integration_name) =
pick(env_config->integration_name, user_config.integration_name,
Expand Down
133 changes: 93 additions & 40 deletions src/datadog/telemetry/telemetry_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,39 @@ nlohmann::json encode_distributions(
return j;
}

nlohmann::json encode_configuration_field(const ConfigMetadata& config_metadata,
std::size_t seq_id) {
auto j = nlohmann::json{{"name", to_string(config_metadata.name)},
{"value", config_metadata.value},
{"seq_id", seq_id}};

switch (config_metadata.origin) {
case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE:
j["origin"] = "env_var";
break;
case ConfigMetadata::Origin::CODE:
j["origin"] = "code";
break;
case ConfigMetadata::Origin::REMOTE_CONFIG:
j["origin"] = "remote_config";
break;
case ConfigMetadata::Origin::DEFAULT:
j["origin"] = "default";
break;
}

if (config_metadata.error) {
// clang-format off
j["error"] = {
{"code", config_metadata.error->code},
{"message", config_metadata.error->message}
};
// clang-format on
}

return j;
}

} // namespace

Telemetry::Telemetry(FinalizedConfiguration config,
Expand All @@ -214,6 +247,16 @@ Telemetry::Telemetry(FinalizedConfiguration config,
clock_(std::move(clock)),
scheduler_(event_scheduler),
host_info_(get_host_info()) {
// Initialize current_configuration_ from the product configurations provided
// at startup. Each config name maps to a vector ordered by precedence
// (lowest to highest), so we take the last entry as the effective value.
for (const auto& product : config_.products) {
for (const auto& [config_name, config_metadatas] : product.configurations) {
if (!config_metadatas.empty()) {
current_configuration_[config_name] = config_metadatas.back();
}
}
}
app_started();
schedule_tasks();
}
Expand All @@ -227,6 +270,12 @@ void Telemetry::schedule_tasks() {
tasks_.emplace_back(scheduler_->schedule_recurring_event(
config_.metrics_interval, [this]() mutable { capture_metrics(); }));
}

tasks_.emplace_back(scheduler_->schedule_recurring_event(
config_.extended_heartbeat_interval,
[this]() {
send_payload("app-extended-heartbeat", extended_heartbeat_payload());
}));
}

Telemetry::~Telemetry() {
Expand All @@ -249,6 +298,7 @@ Telemetry::Telemetry(Telemetry&& rhs)
rates_(std::move(rhs.rates_)),
rates_snapshot_(std::move(rhs.rates_snapshot_)),
distributions_(std::move(rhs.distributions_)),
current_configuration_(std::move(rhs.current_configuration_)),
seq_id_(rhs.seq_id_),
config_seq_ids_(rhs.config_seq_ids_),
host_info_(rhs.host_info_) {
Expand All @@ -274,6 +324,7 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) {
std::swap(distributions_, rhs.distributions_);
std::swap(seq_id_, rhs.seq_id_);
std::swap(config_seq_ids_, rhs.config_seq_ids_);
std::swap(current_configuration_, rhs.current_configuration_);
std::swap(host_info_, rhs.host_info_);
schedule_tasks();
}
Expand Down Expand Up @@ -425,13 +476,13 @@ void Telemetry::send_payload(StringView request_type, std::string payload) {
void Telemetry::send_configuration_change() {
if (configuration_snapshot_.empty()) return;

std::vector<ConfigMetadata> current_configuration;
std::swap(current_configuration, configuration_snapshot_);
std::vector<ConfigMetadata> pending;
std::swap(pending, configuration_snapshot_);

auto configuration_json = nlohmann::json::array();
for (const auto& config_metadata : current_configuration) {
for (const auto& config_metadata : pending) {
configuration_json.emplace_back(
generate_configuration_field(config_metadata));
report_new_config_field(config_metadata));
}

auto telemetry_body =
Expand Down Expand Up @@ -591,7 +642,7 @@ std::string Telemetry::app_started_payload() {
// if (config_metadata.value.empty()) continue;
for (const auto& config_metadata : config_metadatas) {
configuration_json.emplace_back(
generate_configuration_field(config_metadata));
report_new_config_field(config_metadata));
}
}

Expand Down Expand Up @@ -678,6 +729,36 @@ std::string Telemetry::app_started_payload() {
return batch.dump();
}

std::string Telemetry::extended_heartbeat_payload() {
auto configuration_json = nlohmann::json::array();
for (const auto& [_, config_metadata] : current_configuration_) {
// Use the current seq_id without incrementing — this is a snapshot
// refresh, not a new change event.
auto seq_id = config_seq_ids_[config_metadata.name];
configuration_json.emplace_back(
encode_configuration_field(config_metadata, seq_id));
}

auto payload = nlohmann::json{{"configuration", configuration_json}};

if (!config_.integration_name.empty()) {
payload["integrations"] = nlohmann::json::array({
nlohmann::json{{"name", config_.integration_name},
{"version", config_.integration_version},
{"enabled", true}},
});
}

auto extended_heartbeat = nlohmann::json{
{"request_type", "app-extended-heartbeat"},
{"payload", std::move(payload)},
};

auto batch = generate_telemetry_body("message-batch");
batch["payload"] = nlohmann::json::array({std::move(extended_heartbeat)});
return batch.dump();
}

nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) {
std::time_t tracer_time = std::chrono::duration_cast<std::chrono::seconds>(
clock_().wall.time_since_epoch())
Expand Down Expand Up @@ -711,49 +792,21 @@ nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) {
});
}

nlohmann::json Telemetry::generate_configuration_field(
nlohmann::json Telemetry::report_new_config_field(
const ConfigMetadata& config_metadata) {
// NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can
// detect between non set fields.
config_seq_ids_[config_metadata.name] += 1;
auto seq_id = config_seq_ids_[config_metadata.name];

auto j = nlohmann::json{{"name", to_string(config_metadata.name)},
{"value", config_metadata.value},
{"seq_id", seq_id}};

switch (config_metadata.origin) {
case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE:
j["origin"] = "env_var";
break;
case ConfigMetadata::Origin::CODE:
j["origin"] = "code";
break;
case ConfigMetadata::Origin::REMOTE_CONFIG:
j["origin"] = "remote_config";
break;
case ConfigMetadata::Origin::DEFAULT:
j["origin"] = "default";
break;
}

if (config_metadata.error) {
// clang-format off
j["error"] = {
{"code", config_metadata.error->code},
{"message", config_metadata.error->message}
};
// clang-format on
}

return j;
return encode_configuration_field(config_metadata,
config_seq_ids_[config_metadata.name]);
}

void Telemetry::capture_configuration_change(
const std::vector<tracing::ConfigMetadata>& new_configuration) {
configuration_snapshot_.insert(configuration_snapshot_.begin(),
new_configuration.begin(),
new_configuration.end());
for (const auto& config : new_configuration) {
configuration_snapshot_.push_back(config);
current_configuration_[config.name] = config;
}
}

void Telemetry::capture_metrics() {
Expand Down
11 changes: 10 additions & 1 deletion src/datadog/telemetry/telemetry_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ class Telemetry final {
distributions_;

/// Configuration
// Pending configuration changes to be sent in the next
// `app-client-configuration-change` event.
std::vector<tracing::ConfigMetadata> configuration_snapshot_;
// Full current configuration state (initialized from app-started, updated on
// every configuration change). Used for `app-extended-heartbeat`.
std::unordered_map<tracing::ConfigName, tracing::ConfigMetadata>
current_configuration_;

std::mutex log_mutex_;
std::vector<telemetry::LogMessage> logs_;
Expand Down Expand Up @@ -143,7 +149,7 @@ class Telemetry final {
tracing::Optional<std::string> stacktrace = tracing::nullopt);

nlohmann::json generate_telemetry_body(std::string request_type);
nlohmann::json generate_configuration_field(
nlohmann::json report_new_config_field(
const tracing::ConfigMetadata& config_metadata);

// Constructs an `app-started` message using information provided when
Expand All @@ -155,6 +161,9 @@ class Telemetry final {
// Constructs a message-batch containing `app-closing`, and if metrics have
// been modified, a `generate-metrics` message.
std::string app_closing_payload();
// Constructs a message-batch containing `app-extended-heartbeat` with the
// full current configuration state.
std::string extended_heartbeat_payload();
};

} // namespace datadog::telemetry
Loading
Loading