diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 355fcd512..a354274c6 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -440,9 +440,6 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env, return true; } -// Static variable to track the current published context -static otel_process_ctx_result* current_published_context = nullptr; - extern "C" DLLEXPORT void JNICALL Java_com_datadoghq_profiler_OTelContext_setProcessCtx0(JNIEnv *env, jclass unused, @@ -460,16 +457,19 @@ Java_com_datadoghq_profiler_OTelContext_setProcessCtx0(JNIEnv *env, JniString version_str(env, version); JniString tracer_version_str(env, tracer_version); + const char *host_name_attrs[] = {"host.name", hostname_str.c_str(), NULL}; + otel_process_ctx_data data = { - .deployment_environment_name = const_cast(env_str.c_str()), - .host_name = const_cast(hostname_str.c_str()), - .service_instance_id = const_cast(runtime_id_str.c_str()), - .service_name = const_cast(service_str.c_str()), - .service_version = const_cast(version_str.c_str()), - .telemetry_sdk_language = const_cast("java"), - .telemetry_sdk_version = const_cast(tracer_version_str.c_str()), - .telemetry_sdk_name = const_cast("dd-trace-java"), - .resources = NULL // TODO: Arbitrary tags not supported yet for Java + .deployment_environment_name = env_str.c_str(), + .service_instance_id = runtime_id_str.c_str(), + .service_name = service_str.c_str(), + .service_version = version_str.c_str(), + .telemetry_sdk_language = "java", + .telemetry_sdk_version = tracer_version_str.c_str(), + .telemetry_sdk_name = "dd-trace-java", + .resource_attributes = host_name_attrs, + .extra_attributes = NULL, + .thread_ctx_config = NULL }; otel_process_ctx_result result = otel_process_ctx_publish(&data); @@ -491,8 +491,6 @@ Java_com_datadoghq_profiler_OTelContext_readProcessCtx0(JNIEnv *env, jclass unus // Convert C strings to Java strings jstring jDeploymentEnvironmentName = result.data.deployment_environment_name ? env->NewStringUTF(result.data.deployment_environment_name) : nullptr; - jstring jHostName = result.data.host_name ? - env->NewStringUTF(result.data.host_name) : nullptr; jstring jServiceInstanceId = result.data.service_instance_id ? env->NewStringUTF(result.data.service_instance_id) : nullptr; jstring jServiceName = result.data.service_name ? @@ -505,7 +503,17 @@ Java_com_datadoghq_profiler_OTelContext_readProcessCtx0(JNIEnv *env, jclass unus env->NewStringUTF(result.data.telemetry_sdk_version) : nullptr; jstring jTelemetrySdkName = result.data.telemetry_sdk_name ? env->NewStringUTF(result.data.telemetry_sdk_name) : nullptr; - // TODO: result.data.resources not supported yet for Java + + // Extract host.name from resource_attributes + jstring jHostName = nullptr; + if (result.data.resource_attributes != NULL) { + for (int i = 0; result.data.resource_attributes[i] != NULL; i += 2) { + if (strcmp(result.data.resource_attributes[i], "host.name") == 0 && result.data.resource_attributes[i + 1] != NULL) { + jHostName = env->NewStringUTF(result.data.resource_attributes[i + 1]); + break; + } + } + } otel_process_ctx_read_drop(&result); diff --git a/ddprof-lib/src/main/cpp/otel_process_ctx.cpp b/ddprof-lib/src/main/cpp/otel_process_ctx.cpp index c7ce0c4ce..2993027f7 100644 --- a/ddprof-lib/src/main/cpp/otel_process_ctx.cpp +++ b/ddprof-lib/src/main/cpp/otel_process_ctx.cpp @@ -7,37 +7,24 @@ #define _GNU_SOURCE #endif -#ifdef __cplusplus - #include - using std::atomic_thread_fence; - using std::memory_order_seq_cst; -#else - #include -#endif -#include +// Note: Things here are needed for NOOP. Things that are only for non-NOOP get added further below. + #include -#include -#include -#include #define ADD_QUOTES_HELPER(x) #x #define ADD_QUOTES(x) ADD_QUOTES_HELPER(x) -#ifndef PR_SET_VMA - #define PR_SET_VMA 0x53564d41 - #define PR_SET_VMA_ANON_NAME 0 -#endif - static const otel_process_ctx_data empty_data = { .deployment_environment_name = NULL, - .host_name = NULL, .service_instance_id = NULL, .service_name = NULL, .service_version = NULL, .telemetry_sdk_language = NULL, .telemetry_sdk_version = NULL, .telemetry_sdk_name = NULL, - .resources = NULL + .resource_attributes = NULL, + .extra_attributes = NULL, + .thread_ctx_config = NULL }; #if (defined(OTEL_PROCESS_CTX_NOOP) && OTEL_PROCESS_CTX_NOOP) || !defined(__linux__) @@ -64,28 +51,50 @@ static const otel_process_ctx_data empty_data = { #endif // OTEL_PROCESS_CTX_NO_READ #else // OTEL_PROCESS_CTX_NOOP +#ifdef __cplusplus + #include + using std::atomic_thread_fence; + using std::memory_order_seq_cst; +#else + #include +#endif +#include +#include +#include #include #include +#include +#include + +#define KEY_VALUE_LIMIT 4096 +#define UINT14_MAX 16383 +#define OTEL_CTX_SIGNATURE "OTEL_CTX" + +#ifndef PR_SET_VMA + #define PR_SET_VMA 0x53564d41 + #define PR_SET_VMA_ANON_NAME 0 +#endif + +#ifndef MFD_NOEXEC_SEAL + #define MFD_NOEXEC_SEAL 8U +#endif + /** * The process context data that's written into the published anonymous mapping. * * An outside-of-process reader will read this struct + otel_process_payload to get the data. */ typedef struct __attribute__((packed, aligned(8))) { - char otel_process_ctx_signature[8]; // Always "OTEL_CTX" - // TODO: Is version useful? Should we just get rid of it? - uint32_t otel_process_ctx_version; // Always > 0, incremented when the data structure changes - // TODO: Is size useful? Should we just get rid of it? - uint32_t otel_process_payload_size; // Always > 0, size of storage - // TODO: Should we just inline the data in the mapping itself? - char *otel_process_payload; // Always non-null, points to the storage for the data; expected to be a msgpack map of string key/value pairs, null-terminated + char otel_process_ctx_signature[8]; // Always "OTEL_CTX" + uint32_t otel_process_ctx_version; // Always > 0, incremented when the data structure changes, currently v2 + uint32_t otel_process_payload_size; // Always > 0, size of storage + uint64_t otel_process_monotonic_published_at_ns; // Timestamp from when the context was published in nanoseconds from CLOCK_BOOTTIME. 0 during updates. + char *otel_process_payload; // Always non-null, points to the storage for the data; expected to be a protobuf map of string key/value pairs, null-terminated } otel_process_ctx_mapping; /** * The full state of a published process context. * - * This is returned as an opaque type to the caller. - * * It is used to store the all data for the process context and that needs to be kept around while the context is published. */ typedef struct { @@ -103,47 +112,74 @@ typedef struct { */ static otel_process_ctx_state published_state; -static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint32_t *out_size, otel_process_ctx_data data); +static otel_process_ctx_result otel_process_ctx_update(uint64_t monotonic_published_at_ns, const otel_process_ctx_data *data); +static otel_process_ctx_result otel_process_ctx_encode_protobuf_payload(char **out, uint32_t *out_size, otel_process_ctx_data data); -// We use a mapping size of 2 pages explicitly as a hint when running on legacy kernels that don't support the -// PR_SET_VMA_ANON_NAME prctl call; see below for more details. -static long size_for_mapping(void) { - long page_size_bytes = sysconf(_SC_PAGESIZE); - if (page_size_bytes < 4096) { - return -1; - } - return page_size_bytes * 2; +static uint64_t monotonic_time_now_ns(void) { + struct timespec ts; + if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) return 0; + return ts.tv_sec * 1000000000ULL + ts.tv_nsec; +} + +static bool ctx_is_published(otel_process_ctx_state state) { + return state.mapping != NULL && state.mapping != MAP_FAILED && getpid() == state.publisher_pid; } // The process context is designed to be read by an outside-of-process reader. Thus, for concurrency purposes the steps // on this method are ordered in a way to avoid races, or if not possible to avoid, to allow the reader to detect if there was a race. otel_process_ctx_result otel_process_ctx_publish(const otel_process_ctx_data *data) { - // Step: Drop any previous context it if it exists + if (!data) return (otel_process_ctx_result) {.success = false, .error_message = "otel_process_ctx_data is NULL (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + + uint64_t monotonic_published_at_ns = monotonic_time_now_ns(); + if (monotonic_published_at_ns == 0) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to get current time (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + // Step: If the context has been published by this process, update it in place + if (ctx_is_published(published_state)) return otel_process_ctx_update(monotonic_published_at_ns, data); + + // Step: Drop any previous context state if it exists // No state should be around anywhere after this step. if (!otel_process_ctx_drop_current()) { return (otel_process_ctx_result) {.success = false, .error_message = "Failed to drop previous context (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } - // Step: Determine size for mapping - long mapping_size = size_for_mapping(); - if (mapping_size == -1) { - return (otel_process_ctx_result) {.success = false, .error_message = "Failed to get page size (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; - } - // Step: Prepare the payload to be published // The payload SHOULD be ready and valid before trying to actually create the mapping. - if (!data) return (otel_process_ctx_result) {.success = false, .error_message = "otel_process_ctx_data is NULL (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; uint32_t payload_size = 0; - otel_process_ctx_result result = otel_process_ctx_encode_payload(&published_state.payload, &payload_size, *data); + otel_process_ctx_result result = otel_process_ctx_encode_protobuf_payload(&published_state.payload, &payload_size, *data); if (!result.success) return result; // Step: Create the mapping + const ssize_t mapping_size = sizeof(otel_process_ctx_mapping); published_state.publisher_pid = getpid(); // This allows us to detect in forks that we shouldn't touch the mapping - published_state.mapping = (otel_process_ctx_mapping *) - mmap(NULL, mapping_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (published_state.mapping == MAP_FAILED) { + int fd = memfd_create("OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING | MFD_NOEXEC_SEAL); + if (fd < 0) { + // MFD_NOEXEC_SEAL is a newer flag; older kernels reject unknown flags, so let's retry without it + fd = memfd_create("OTEL_CTX", MFD_CLOEXEC | MFD_ALLOW_SEALING); + } + bool failed_to_close_fd = false; + if (fd >= 0) { + // Try to create mapping from memfd + if (ftruncate(fd, mapping_size) == -1) { + close(fd); // Swallow errors here, truncation already failed anyway + otel_process_ctx_drop_current(); + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to truncate memfd (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + published_state.mapping = (otel_process_ctx_mapping *) mmap(NULL, mapping_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0); + failed_to_close_fd = (close(fd) == -1); + } else { + // Fallback: Use an anonymous mapping instead + published_state.mapping = (otel_process_ctx_mapping *) mmap(NULL, mapping_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + } + if (published_state.mapping == MAP_FAILED || failed_to_close_fd) { otel_process_ctx_drop_current(); - return (otel_process_ctx_result) {.success = false, .error_message = "Failed to allocate mapping (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + + if (failed_to_close_fd) { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to close memfd (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } else { + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to allocate mapping (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } } // Step: Setup MADV_DONTFORK @@ -152,50 +188,42 @@ otel_process_ctx_result otel_process_ctx_publish(const otel_process_ctx_data *da if (otel_process_ctx_drop_current()) { return (otel_process_ctx_result) {.success = false, .error_message = "Failed to setup MADV_DONTFORK (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } else { - return (otel_process_ctx_result) {.success = false, .error_message = "Failed to drop previous context (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + return (otel_process_ctx_result) {.success = false, .error_message = "Failed to drop context (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } } // Step: Populate the mapping - // The payload and any extra fields must come first and not be reordered with the signature by the compiler. + // The payload and any extra fields must come first and not be reordered with the monotonic_published_at_ns by the compiler. *published_state.mapping = (otel_process_ctx_mapping) { - .otel_process_ctx_signature = {0}, // Set in "Step: Populate the signature into the mapping" below - .otel_process_ctx_version = 1, + .otel_process_ctx_signature = { 'O', 'T', 'E', 'L', '_', 'C', 'T', 'X' }, + .otel_process_ctx_version = 2, .otel_process_payload_size = payload_size, + .otel_process_monotonic_published_at_ns = 0, // Set in "Step: Populate the monotonic_published_at_ns into the mapping" below .otel_process_payload = published_state.payload }; - // Step: Synchronization - Mapping has been filled and is missing signature - // Make sure the initialization of the mapping + payload above does not get reordered with setting the signature below. Setting - // the signature is what tells an outside reader that the context is fully published. + // Step: Synchronization - Mapping has been filled and is missing monotonic_published_at_ns + // Make sure the initialization of the mapping + payload above does not get reordered with setting the monotonic_published_at_ns below. Setting + // the monotonic_published_at_ns is what tells an outside reader that the context is fully published. atomic_thread_fence(memory_order_seq_cst); - // Step: Populate the signature into the mapping - // The signature must come last and not be reordered with the fields above by the compiler. After this step, external readers - // can read the signature and know that the payload is ready to be read. - memcpy(published_state.mapping->otel_process_ctx_signature, "OTEL_CTX", sizeof(published_state.mapping->otel_process_ctx_signature)); - - // Step: Change permissions on the mapping to only read permission - // We've observed the combination of anonymous mapping + a given number of pages + read-only permission is not very common, - // so this is left as a hint for when running on older kernels and the naming the mapping feature below isn't available. - // For modern kernels, doing this is harmless so we do it unconditionally. - if (mprotect(published_state.mapping, mapping_size, PROT_READ) == -1) { - if (otel_process_ctx_drop_current()) { - return (otel_process_ctx_result) {.success = false, .error_message = "Failed to change permissions on mapping (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; - } else { - return (otel_process_ctx_result) {.success = false, .error_message = "Failed to drop previous context (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; - } - } + // Step: Populate the monotonic_published_at_ns into the mapping + // The monotonic_published_at_ns must come last and not be reordered with the fields above by the compiler. After this step, external readers + // can read the monotonic_published_at_ns and know that the payload is ready to be read. + published_state.mapping->otel_process_monotonic_published_at_ns = monotonic_published_at_ns; - // Step: Name the mapping so outside readers can: + // Step: Attempt to name the mapping so outside readers can: // * Find it by name // * Hook on prctl to detect when new mappings are published - if (prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME, published_state.mapping, mapping_size, "OTEL_CTX") == -1) { - // Naming an anonymous mapping is a Linux 5.17+ feature. On earlier versions, this method call can fail. Thus it's OK - // for this to fail because: - // 1. Things that hook on prctl are still able to see this call, even though it's not supported (TODO: Confirm this is actually the case) - // 2. As a fallback, on older kernels, it's possible to scan the mappings and look for the "OTEL_CTX" signature in the memory itself, - // after observing the mapping has the expected number of pages and permissions. + if (prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME, published_state.mapping, mapping_size, OTEL_CTX_SIGNATURE) == -1) { + // Naming an anonymous mapping is an optional Linux 5.17+ feature (`CONFIG_ANON_VMA_NAME`). + // Many distros, such as Ubuntu and Arch enable it. On earlier kernel versions or kernels without the feature, this call can fail. + // + // It's OK for this to fail because (per-usecase): + // 1. "Find it by name" => As a fallback, it's possible to scan the mappings and for the memfd name. + // 2. "Hook on prctl" => When hooking on prctl via eBPF it's still possible to see this call, even when it's not supported/enabled. + // This works even on older kernels! For this reason we unconditionally make this call even on older kernels -- to + // still allow detection via hooking onto prctl. } // All done! @@ -210,67 +238,189 @@ bool otel_process_ctx_drop_current(void) { published_state = (otel_process_ctx_state) {.publisher_pid = 0, .mapping = NULL, .payload = NULL}; atomic_thread_fence(memory_order_seq_cst); + bool success = true; + // The mapping only exists if it was created by the current process; if it was inherited by a fork it doesn't exist anymore // (due to the MADV_DONTFORK) and we don't need to do anything to it. - if (state.mapping != NULL && state.mapping != MAP_FAILED && getpid() == state.publisher_pid) { - long mapping_size = size_for_mapping(); - if (mapping_size == -1 || munmap(state.mapping, mapping_size) == -1) return false; + if (ctx_is_published(state)) { + success = munmap(state.mapping, sizeof(otel_process_ctx_mapping)) == 0; } // The payload may have been inherited from a parent. This is a regular malloc so we need to free it so we don't leak. - if (state.payload) free(state.payload); + free(state.payload); - return true; + return success; } -static otel_process_ctx_result validate_and_calculate_payload_size(size_t *out_pairs_size, size_t *out_num_pairs, char **pairs) { +static otel_process_ctx_result otel_process_ctx_update(uint64_t monotonic_published_at_ns, const otel_process_ctx_data *data) { + if (data == NULL || !ctx_is_published(published_state)) { + return (otel_process_ctx_result) {.success = false, .error_message = "Unexpected: otel_process_ctx_data is NULL or context is not published (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + + if (monotonic_published_at_ns == published_state.mapping->otel_process_monotonic_published_at_ns) { + // Advance published_at_ns to allow readers to detect the update + monotonic_published_at_ns++; + } + + // Step: Prepare the new payload to be published + // The payload SHOULD be ready and valid before trying to actually update the mapping. + uint32_t payload_size = 0; + char *payload; + otel_process_ctx_result result = otel_process_ctx_encode_protobuf_payload(&payload, &payload_size, *data); + if (!result.success) return result; + + // Step: Zero out monotonic_published_at_ns in the mapping + // This enables readers to detect that an update is in-progress + published_state.mapping->otel_process_monotonic_published_at_ns = 0; + + // Step: Synchronization - Make sure readers observe the zeroing above before anything else below + atomic_thread_fence(memory_order_seq_cst); + + // Step: Install updated data + published_state.mapping->otel_process_payload_size = payload_size; + published_state.mapping->otel_process_payload = payload; + + // Step: Synchronization - Make sure readers observe the updated data before anything else below + atomic_thread_fence(memory_order_seq_cst); + + // Step: Install new monotonic_published_at_ns + // The update is now complete -- readers that observe the new timestamp will observe the updated payload + published_state.mapping->otel_process_monotonic_published_at_ns = monotonic_published_at_ns; + + // Step: Attempt to name the mapping so outside readers can detect the update + if (prctl(PR_SET_VMA, PR_SET_VMA_ANON_NAME, published_state.mapping, sizeof(otel_process_ctx_mapping), OTEL_CTX_SIGNATURE) == -1) { + // It's OK for this to fail -- see otel_process_ctx_publish for why + } + + // Step: Update bookkeeping + free(published_state.payload); // This was still pointing to the old payload + published_state.payload = payload; + + // All done! + + return (otel_process_ctx_result) {.success = true, .error_message = NULL}; +} + +// The caller is responsible for enforcing that value fits within UINT14_MAX +static size_t protobuf_varint_size(uint16_t value) { return value >= 128 ? 2 : 1; } + +// Field tag for record + varint len + data +static size_t protobuf_record_size(size_t len) { return 1 + protobuf_varint_size(len) + len; } + +static size_t protobuf_string_size(const char *str) { return protobuf_record_size(strlen(str)); } + +static size_t protobuf_otel_keyvalue_string_size(const char *key, const char *value) { + size_t key_field_size = protobuf_string_size(key); // String + size_t value_field_size = protobuf_record_size(protobuf_string_size(value)); // Nested AnyValue message with a string inside + return key_field_size + value_field_size; // Does not include the keyvalue record tag + size, only its payload +} + +static size_t protobuf_otel_array_value_content_size(const char **strings) { + size_t total = 0; + for (size_t i = 0; strings[i] != NULL; i++) { + total += protobuf_record_size(protobuf_string_size(strings[i])); // ArrayValue.values[i]: AnyValue{string_value} + } + return total; +} + +// As a simplification, we enforce that keys and values are <= 4096 (KEY_VALUE_LIMIT) so that their size + extra bytes always fits within UINT14_MAX +static otel_process_ctx_result validate_and_calculate_protobuf_payload_size(size_t *out_pairs_size, const char **pairs) { size_t num_entries = 0; for (size_t i = 0; pairs[i] != NULL; i++) num_entries++; if (num_entries % 2 != 0) { return (otel_process_ctx_result) {.success = false, .error_message = "Value in otel_process_ctx_data is NULL (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } - *out_num_pairs = num_entries / 2; *out_pairs_size = 0; - for (size_t i = 0; i < *out_num_pairs; i++) { - size_t key_len = strlen(pairs[i * 2]); - if (key_len > INT16_MAX) { - return (otel_process_ctx_result) {.success = false, .error_message = "Length of key in otel_process_ctx_data exceeds INT16_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + for (size_t i = 0; pairs[i * 2] != NULL; i++) { + const char *key = pairs[i * 2]; + const char *value = pairs[i * 2 + 1]; + + if (strlen(key) > KEY_VALUE_LIMIT) { + return (otel_process_ctx_result) {.success = false, .error_message = "Length of key in otel_process_ctx_data exceeds 4096 limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } - size_t value_len = strlen(pairs[i * 2 + 1]); - if (value_len > INT16_MAX) { - return (otel_process_ctx_result) {.success = false, .error_message = "Length of value in otel_process_ctx_data exceeds INT16_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + if (strlen(value) > KEY_VALUE_LIMIT) { + return (otel_process_ctx_result) {.success = false, .error_message = "Length of value in otel_process_ctx_data exceeds 4096 limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } - *out_pairs_size += 1 + 2 + key_len; // str 16 for key - *out_pairs_size += 1 + 2 + value_len; // str 16 for value - } + *out_pairs_size += protobuf_record_size(protobuf_otel_keyvalue_string_size(key, value)); // KeyValue message + } return (otel_process_ctx_result) {.success = true, .error_message = NULL}; } -static void write_msgpack_string(char **ptr, const char *str) { +/** + * Writes a protobuf varint encoding for the given value. + * As a simplification, only supports values that fit in 1 or 2 bytes (0-16383 UINT14_MAX). + */ +static void write_protobuf_varint(char **ptr, uint16_t value) { + if (protobuf_varint_size(value) == 1) { + *(*ptr)++ = (char)value; + } else { + // Two bytes: first byte has MSB set, second byte has value + *(*ptr)++ = (char)((value & 0x7F) | 0x80); // Low 7 bits + continuation bit + *(*ptr)++ = (char)(value >> 7); // High 7 bits + } +} + +static void write_protobuf_string(char **ptr, const char *str) { size_t len = strlen(str); - // Write str 16 header - *(*ptr)++ = 0xda; - *(*ptr)++ = (len >> 8) & 0xFF; // high byte of length - *(*ptr)++ = len & 0xFF; // low byte of length + write_protobuf_varint(ptr, len); memcpy(*ptr, str, len); *ptr += len; } -// TODO: The serialization format is still under discussion and is not considered stable yet. -// Comments **very** welcome: Should we use JSON instead? Or protobuf? -// -// Encode the payload as a msgpack map of string key/value pairs. +static void write_protobuf_tag(char **ptr, uint8_t field_number) { + *(*ptr)++ = (char)((field_number << 3) | 2); // Field type is always 2 (LEN) +} + +static void write_attribute(char **ptr, uint8_t field_number, const char *key, const char *value) { + write_protobuf_tag(ptr, field_number); + write_protobuf_varint(ptr, protobuf_otel_keyvalue_string_size(key, value)); + + // KeyValue + write_protobuf_tag(ptr, 1); // KeyValue.key (field 1) + write_protobuf_string(ptr, key); + write_protobuf_tag(ptr, 2); // KeyValue.value (field 2) + write_protobuf_varint(ptr, protobuf_string_size(value)); + + // AnyValue + write_protobuf_tag(ptr, 1); // AnyValue.string_value (field 1) + write_protobuf_string(ptr, value); +} + +static void write_array_attribute(char **ptr, uint8_t field_number, const char *key, const char **strings) { + size_t array_value_content_size = protobuf_otel_array_value_content_size(strings); + size_t any_value_content_size = protobuf_record_size(array_value_content_size); + size_t kv_content_size = protobuf_string_size(key) + protobuf_record_size(any_value_content_size); + + write_protobuf_tag(ptr, field_number); + write_protobuf_varint(ptr, kv_content_size); + + write_protobuf_tag(ptr, 1); // KeyValue.key (field 1) + write_protobuf_string(ptr, key); + + write_protobuf_tag(ptr, 2); // KeyValue.value (field 2) = AnyValue message + write_protobuf_varint(ptr, any_value_content_size); + + write_protobuf_tag(ptr, 5); // AnyValue.array_value (field 5) = ArrayValue message + write_protobuf_varint(ptr, array_value_content_size); + + for (size_t i = 0; strings[i] != NULL; i++) { // ArrayValue.values (field 1) - repeated AnyValue entries + write_protobuf_tag(ptr, 1); // ArrayValue.values[i] + write_protobuf_varint(ptr, protobuf_string_size(strings[i])); // Inner AnyValue size + write_protobuf_tag(ptr, 1); // AnyValue.string_value (field 1) + write_protobuf_string(ptr, strings[i]); + } +} + +// Encode the payload as protobuf bytes. // -// This method implements an extremely compact but limited msgpack encoder. This encoder supports only encoding a single -// flat key-value map where every key and value is a string. -// For extra compact code, it uses only a "map 16" encoding format with only "str 16" strings, rather than attempting to -// use some of the other encoding alternatives. -static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint32_t *out_size, otel_process_ctx_data data) { +// This method implements an extremely compact but limited protobuf encoder for the ProcessContext message. +// It encodes all fields as Resource attributes (KeyValue pairs). +// For extra compact code, it fixes strings at up to 4096 bytes. +static otel_process_ctx_result otel_process_ctx_encode_protobuf_payload(char **out, uint32_t *out_size, otel_process_ctx_data data) { const char *pairs[] = { "deployment.environment.name", data.deployment_environment_name, - "host.name", data.host_name, "service.instance.id", data.service_instance_id, "service.name", data.service_name, "service.version", data.service_version, @@ -280,22 +430,53 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 NULL }; - size_t num_pairs = 0, pairs_size = 0; - otel_process_ctx_result validation_result = validate_and_calculate_payload_size(&pairs_size, &num_pairs, (char **) pairs); + size_t pairs_size = 0; + otel_process_ctx_result validation_result = validate_and_calculate_protobuf_payload_size(&pairs_size, (const char **) pairs); if (!validation_result.success) return validation_result; - size_t resources_pairs_size = 0, resources_num_pairs = 0; - if (data.resources != NULL) { - validation_result = validate_and_calculate_payload_size(&resources_pairs_size, &resources_num_pairs, data.resources); + size_t resource_attributes_pairs_size = 0; + if (data.resource_attributes != NULL) { + validation_result = validate_and_calculate_protobuf_payload_size(&resource_attributes_pairs_size, data.resource_attributes); + if (!validation_result.success) return validation_result; + } + + size_t extra_attributes_pairs_size = 0; + if (data.extra_attributes != NULL) { + validation_result = validate_and_calculate_protobuf_payload_size(&extra_attributes_pairs_size, data.extra_attributes); if (!validation_result.success) return validation_result; } - size_t total_pairs = num_pairs + resources_num_pairs; - size_t total_size = pairs_size + resources_pairs_size + 1 + 2; // map 16 header (1 byte + 2 bytes for count) + size_t thread_ctx_pairs_size = 0; + if (data.thread_ctx_config != NULL) { + if (data.thread_ctx_config->schema_version != NULL) { + const char *thread_ctx_pairs[] = {"threadlocal.schema_version", data.thread_ctx_config->schema_version, NULL}; + validation_result = validate_and_calculate_protobuf_payload_size(&thread_ctx_pairs_size, thread_ctx_pairs); + if (!validation_result.success) return validation_result; + } + if (data.thread_ctx_config->attribute_key_map != NULL) { + if (data.thread_ctx_config->schema_version == NULL) { + return (otel_process_ctx_result) {.success = false, .error_message = "attribute_key_map requires schema_version to be set (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + for (size_t i = 0; data.thread_ctx_config->attribute_key_map[i] != NULL; i++) { + if (strlen(data.thread_ctx_config->attribute_key_map[i]) > KEY_VALUE_LIMIT) { + return (otel_process_ctx_result) {.success = false, .error_message = "Length of attribute_key_map entry exceeds 4096 limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + } + size_t array_value_content_size = protobuf_otel_array_value_content_size(data.thread_ctx_config->attribute_key_map); + size_t any_value_content_size = protobuf_record_size(array_value_content_size); + size_t kv_content_size = protobuf_string_size("threadlocal.attribute_key_map") + protobuf_record_size(any_value_content_size); + if (kv_content_size > UINT14_MAX) { + return (otel_process_ctx_result) {.success = false, .error_message = "Encoded size of attribute_key_map exceeds UINT14_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + } + thread_ctx_pairs_size += protobuf_record_size(kv_content_size); + } + } - if (total_pairs > INT16_MAX) { - return (otel_process_ctx_result) {.success = false, .error_message = "Total number of pairs exceeds INT16_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; + size_t resource_size = pairs_size + resource_attributes_pairs_size; + if (resource_size > UINT14_MAX) { + return (otel_process_ctx_result) {.success = false, .error_message = "Encoded size of resource attributes exceeds UINT14_MAX limit (" __FILE__ ":" ADD_QUOTES(__LINE__) ")"}; } + size_t total_size = protobuf_record_size(resource_size) + extra_attributes_pairs_size + thread_ctx_pairs_size; char *encoded = (char *) calloc(total_size, 1); if (!encoded) { @@ -303,20 +484,29 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 } char *ptr = encoded; - // Write map 16 header (0xde) followed by count - *ptr++ = 0xde; - *ptr++ = (total_pairs >> 8) & 0xFF; // high byte of count - *ptr++ = total_pairs & 0xFF; // low byte of count + // ProcessContext.resource (field 1) + write_protobuf_tag(&ptr, 1); + write_protobuf_varint(&ptr, resource_size); - for (size_t i = 0; i < num_pairs; i++) { - write_msgpack_string(&ptr, pairs[i * 2]); // Write key - write_msgpack_string(&ptr, pairs[i * 2 + 1]); // Write value + for (size_t i = 0; pairs[i * 2] != NULL; i++) { + write_attribute(&ptr, 1, pairs[i * 2], pairs[i * 2 + 1]); } - if (data.resources != NULL) { - for (size_t i = 0; i < resources_num_pairs; i++) { - write_msgpack_string(&ptr, data.resources[i * 2]); // Write key - write_msgpack_string(&ptr, data.resources[i * 2 + 1]); // Write value + for (size_t i = 0; data.resource_attributes != NULL && data.resource_attributes[i * 2] != NULL; i++) { + write_attribute(&ptr, 1, data.resource_attributes[i * 2], data.resource_attributes[i * 2 + 1]); + } + + // ProcessContext.extra_attributes (field 2) + for (size_t i = 0; data.extra_attributes != NULL && data.extra_attributes[i * 2] != NULL; i++) { + write_attribute(&ptr, 2, data.extra_attributes[i * 2], data.extra_attributes[i * 2 + 1]); + } + + if (data.thread_ctx_config != NULL) { + if (data.thread_ctx_config->schema_version != NULL) { + write_attribute(&ptr, 2, "threadlocal.schema_version", data.thread_ctx_config->schema_version); + } + if (data.thread_ctx_config->attribute_key_map != NULL) { + write_array_attribute(&ptr, 2, "threadlocal.attribute_key_map", data.thread_ctx_config->attribute_key_map); } } @@ -335,14 +525,6 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 // Note: The below parsing code is only for otel_process_ctx_read and is only provided for debugging // and testing purposes. - // Named mappings are supported on Linux 5.17+ - static bool named_mapping_supported(void) { - struct utsname uts; - int major, minor; - if (uname(&uts) != 0 || sscanf(uts.release, "%d.%d", &major, &minor) != 2) return false; - return (major > 5) || (major == 5 && minor >= 17); - } - static void *parse_mapping_start(char *line) { char *endptr = NULL; unsigned long long start = strtoull(line, &endptr, 16); @@ -350,41 +532,6 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 return (void *)(uintptr_t) start; } - static bool is_otel_process_ctx_mapping(char *line) { - size_t name_len = sizeof("[anon:OTEL_CTX]") - 1; - size_t line_len = strlen(line); - if (line_len < name_len) return false; - if (line[line_len-1] == '\n') line[--line_len] = '\0'; - - // Validate expected permission - if (strstr(line, " r--p ") == NULL) return false; - - // Validate expected context size - int64_t start, end; - if (sscanf(line, "%" PRIx64 "-%" PRIx64, &start, &end) != 2) return false; - if (start == 0 || end == 0 || end <= start) return false; - if ((end - start) != size_for_mapping()) return false; - - if (named_mapping_supported()) { - // On Linux 5.17+, check if the line ends with [anon:OTEL_CTX] - return memcmp(line + (line_len - name_len), "[anon:OTEL_CTX]", name_len) == 0; - } else { - // On older kernels, parse the address to to find the OTEL_CTX signature - void *addr = parse_mapping_start(line); - if (addr == NULL) return false; - - // Read 8 bytes at the address using process_vm_readv (to avoid any issues with concurrency/races) - char buffer[8]; - struct iovec local[] = {{.iov_base = buffer, .iov_len = sizeof(buffer)}}; - struct iovec remote[] = {{.iov_base = addr, .iov_len = sizeof(buffer)}}; - - ssize_t bytes_read = process_vm_readv(getpid(), local, 1, remote, 1, 0); - if (bytes_read != sizeof(buffer)) return false; - - return memcmp(buffer, "OTEL_CTX", sizeof(buffer)) == 0; - } - } - static otel_process_ctx_mapping *try_finding_mapping(void) { char line[8192]; otel_process_ctx_mapping *result = NULL; @@ -393,7 +540,8 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 if (!fp) return result; while (fgets(line, sizeof(line), fp)) { - if (is_otel_process_ctx_mapping(line)) { + bool is_process_ctx = strstr(line, "[anon_shmem:OTEL_CTX]") != NULL || strstr(line, "[anon:OTEL_CTX]") != NULL || strstr(line, "/memfd:OTEL_CTX") != NULL; + if (is_process_ctx) { result = (otel_process_ctx_mapping *)parse_mapping_start(line); break; } @@ -403,100 +551,248 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 return result; } - // Simplified msgpack decoder to match the exact encoder above. If the msgpack string doesn't match the encoder, this will - // return false. - static bool otel_process_ctx_decode_payload(char *payload, otel_process_ctx_data *data_out) { - char *ptr = payload; + // Helper function to read a protobuf varint (limited to 1-2 bytes, max value UINT14_MAX, matching write_protobuf_varint above) + static bool read_protobuf_varint(char **ptr, char *end_ptr, uint16_t *value) { + if (*ptr >= end_ptr) return false; + + unsigned char first_byte = (unsigned char)**ptr; + (*ptr)++; + + if (first_byte < 128) { + *value = first_byte; + return true; + } else { + if (*ptr >= end_ptr) return false; + unsigned char second_byte = (unsigned char)**ptr; + (*ptr)++; + + *value = (first_byte & 0x7F) | (second_byte << 7); + return *value <= UINT14_MAX; + } + } + + // Helper function to read a protobuf string into a buffer, within the same limits as the encoder imposes + static bool read_protobuf_string(char **ptr, char *end_ptr, char *buffer) { + uint16_t len; + if (!read_protobuf_varint(ptr, end_ptr, &len) || len >= KEY_VALUE_LIMIT + 1 || *ptr + len > end_ptr) return false; + + memcpy(buffer, *ptr, len); + buffer[len] = '\0'; + *ptr += len; + + return true; + } - // Check map 16 header (0xde) - if ((unsigned char)*ptr++ != 0xde) return false; + // Reads field name and validates the fixed LEN wire type + static bool read_protobuf_tag(char **ptr, char *end_ptr, uint8_t *field_number) { + if (*ptr >= end_ptr) return false; - // Read count (2 bytes, big endian) - uint16_t count = ((uint8_t)*ptr << 8) | (uint8_t)*(ptr + 1); - ptr += 2; + unsigned char tag = (unsigned char)**ptr; + (*ptr)++; - // We expect at least 8 pairs (the standard fields) - if (count < 8) return false; + uint8_t wire_type = tag & 0x07; + *field_number = tag >> 3; - // Initialize output data - data_out->deployment_environment_name = NULL; - data_out->host_name = NULL; - data_out->service_instance_id = NULL; - data_out->service_name = NULL; - data_out->service_version = NULL; - data_out->telemetry_sdk_language = NULL; - data_out->telemetry_sdk_version = NULL; - data_out->telemetry_sdk_name = NULL; - data_out->resources = NULL; + return wire_type == 2; // We only need the LEN wire type for now + } + + // Peeks at the key of an OTel KeyValue message without advancing the pointer. + static bool peek_protobuf_key(char *ptr, char *end_ptr, char *key_buffer) { + char *p = ptr; + uint8_t kv_field; + if (!read_protobuf_tag(&p, end_ptr, &kv_field)) return false; + if (kv_field != 1) return false; // KeyValue.key is field 1 + return read_protobuf_string(&p, end_ptr, key_buffer); + } - // Allocate resources array with space for all pairs as a simplification (2 entries per pair + 1 for NULL terminator) - data_out->resources = (char **) calloc(count * 2 + 1, sizeof(char *)); - if (!data_out->resources) return false; + // Reads an OTel KeyValue message (key string + AnyValue-wrapped string) into the provided buffers. + static bool read_protobuf_keyvalue(char **ptr, char *end_ptr, char *key_buffer, char *value_buffer) { + bool key_found = false; + bool value_found = false; + + while (*ptr < end_ptr) { + uint8_t kv_field; + if (!read_protobuf_tag(ptr, end_ptr, &kv_field)) return false; + + if (kv_field == 1) { // KeyValue.key + if (!read_protobuf_string(ptr, end_ptr, key_buffer)) return false; + key_found = true; + } else if (kv_field == 2) { // KeyValue.value (AnyValue) + uint16_t _any_len; // Unused, but we still need to consume + validate the varint + if (!read_protobuf_varint(ptr, end_ptr, &_any_len)) return false; + uint8_t any_field; + if (!read_protobuf_tag(ptr, end_ptr, &any_field)) return false; + + if (any_field == 1) { // AnyValue.string_value + if (!read_protobuf_string(ptr, end_ptr, value_buffer)) return false; + value_found = true; + } + } + } - int resources_index = 0; + return key_found && value_found; + } - // Decode each key-value pair - for (int i = 0; i < count; i++) { - // Check str 16 header for key (0xda) - if ((unsigned char)*ptr++ != 0xda) return false; + // Reads an AnyValue.array_value (field 5) from ptr; ptr must be at KeyValue.value (tag 2). + // Allocates a NULL-terminated array of strings and sets *out_array immediately. On error the caller must free it. + static bool read_protobuf_array_value_strings(char **ptr, char *end_ptr, char *value_buffer, const char ***out_array) { + uint8_t field; + if (!read_protobuf_tag(ptr, end_ptr, &field) || field != 2) return false; + uint16_t any_len; + if (!read_protobuf_varint(ptr, end_ptr, &any_len)) return false; + char *any_end = *ptr + any_len; + if (any_end > end_ptr) return false; + + if (!read_protobuf_tag(ptr, any_end, &field) || field != 5) return false; + uint16_t array_len; + if (!read_protobuf_varint(ptr, any_end, &array_len)) return false; + char *array_end = *ptr + array_len; + if (array_end > any_end) return false; + + size_t max = 100; + size_t capacity = max + 1; + const char **arr = (const char **) calloc(capacity, sizeof(char *)); + if (!arr) return false; + *out_array = arr; + size_t count = 0; + + while (*ptr < array_end) { + if (count >= max) return false; + if (!read_protobuf_tag(ptr, array_end, &field) || field != 1) return false; + uint16_t item_len; + if (!read_protobuf_varint(ptr, array_end, &item_len)) return false; + char *item_end = *ptr + item_len; + if (item_end > array_end) return false; + if (!read_protobuf_tag(ptr, item_end, &field) || field != 1) return false; + if (!read_protobuf_string(ptr, item_end, value_buffer)) return false; + char *dup = strdup(value_buffer); + if (!dup) return false; + arr[count++] = dup; + } - // Read key length (2 bytes, big endian) - uint16_t key_len = ((uint8_t)*ptr << 8) | (uint8_t)*(ptr + 1); - ptr += 2; + return true; + } - // Get pointer to key (not null-terminated) - char *key_not_terminated = ptr; - ptr += key_len; + // Simplified protobuf decoder to match the exact encoder above. If the protobuf data doesn't match the encoder, this will + // return false. + static bool otel_process_ctx_decode_payload(char *payload, uint32_t payload_size, otel_process_ctx_data *data_out, char *key_buffer, char *value_buffer) { + char *ptr = payload; + char *end_ptr = payload + payload_size; - // Check str 16 header for value (0xda) - if ((unsigned char)*ptr++ != 0xda) return false; + *data_out = empty_data; - // Read value length (2 bytes, big endian) - uint16_t value_len = ((uint8_t)*ptr << 8) | (uint8_t)*(ptr + 1); - ptr += 2; + // Parse ProcessContext wrapper - expect field 1 (resource) + uint8_t process_ctx_field; + if (!read_protobuf_tag(&ptr, end_ptr, &process_ctx_field) || process_ctx_field != 1) return false; - // Read value - char *value = (char *) calloc(value_len + 1, 1); + uint16_t resource_len; + if (!read_protobuf_varint(&ptr, end_ptr, &resource_len)) return false; + char *resource_end = ptr + resource_len; + if (resource_end > end_ptr) return false; + + size_t resource_index = 0; + size_t resource_capacity = 201; // Allocate space for 100 pairs + NULL terminator entry + data_out->resource_attributes = (const char **) calloc(resource_capacity, sizeof(char *)); + if (data_out->resource_attributes == NULL) return false; + + size_t extra_attributes_index = 0; + size_t extra_attributes_capacity = 201; // Allocate space for 100 pairs + NULL terminator entry + data_out->extra_attributes = (const char **) calloc(extra_attributes_capacity, sizeof(char *)); + if (data_out->extra_attributes == NULL) return false; + + // Parse resource attributes (field 1) + while (ptr < resource_end) { + uint8_t field_number; + if (!read_protobuf_tag(&ptr, resource_end, &field_number) || field_number != 1) return false; + + uint16_t kv_len; + if (!read_protobuf_varint(&ptr, resource_end, &kv_len)) return false; + char *kv_end = ptr + kv_len; + if (kv_end > resource_end) return false; + + if (!read_protobuf_keyvalue(&ptr, kv_end, key_buffer, value_buffer)) return false; + + char *value = strdup(value_buffer); if (!value) return false; - memcpy(value, ptr, value_len); - value[value_len] = '\0'; - ptr += value_len; - - // Assign to appropriate field based on key - if (key_len == strlen("deployment.environment.name") && memcmp(key_not_terminated, "deployment.environment.name", strlen("deployment.environment.name")) == 0) { - data_out->deployment_environment_name = value; - } else if (key_len == strlen("host.name") && memcmp(key_not_terminated, "host.name", strlen("host.name")) == 0) { - data_out->host_name = value; - } else if (key_len == strlen("service.instance.id") && memcmp(key_not_terminated, "service.instance.id", strlen("service.instance.id")) == 0) { - data_out->service_instance_id = value; - } else if (key_len == strlen("service.name") && memcmp(key_not_terminated, "service.name", strlen("service.name")) == 0) { - data_out->service_name = value; - } else if (key_len == strlen("service.version") && memcmp(key_not_terminated, "service.version", strlen("service.version")) == 0) { - data_out->service_version = value; - } else if (key_len == strlen("telemetry.sdk.language") && memcmp(key_not_terminated, "telemetry.sdk.language", strlen("telemetry.sdk.language")) == 0) { - data_out->telemetry_sdk_language = value; - } else if (key_len == strlen("telemetry.sdk.version") && memcmp(key_not_terminated, "telemetry.sdk.version", strlen("telemetry.sdk.version")) == 0) { - data_out->telemetry_sdk_version = value; - } else if (key_len == strlen("telemetry.sdk.name") && memcmp(key_not_terminated, "telemetry.sdk.name", strlen("telemetry.sdk.name")) == 0) { - data_out->telemetry_sdk_name = value; + + // Dispatch based on key + const char **field = NULL; + if (strcmp(key_buffer, "deployment.environment.name") == 0) { field = &data_out->deployment_environment_name; } + else if (strcmp(key_buffer, "service.instance.id") == 0) { field = &data_out->service_instance_id; } + else if (strcmp(key_buffer, "service.name") == 0) { field = &data_out->service_name; } + else if (strcmp(key_buffer, "service.version") == 0) { field = &data_out->service_version; } + else if (strcmp(key_buffer, "telemetry.sdk.language") == 0) { field = &data_out->telemetry_sdk_language; } + else if (strcmp(key_buffer, "telemetry.sdk.version") == 0) { field = &data_out->telemetry_sdk_version; } + else if (strcmp(key_buffer, "telemetry.sdk.name") == 0) { field = &data_out->telemetry_sdk_name; } + + if (field != NULL) { + if (*field != NULL) { free(value); return false; } + *field = value; } else { - // Unknown key, put it into resources - char *key = (char *) calloc(key_len + 1, 1); - if (!key) { + char *key = strdup(key_buffer); + + if (!key || resource_index + 2 >= resource_capacity) { + free(key); free(value); return false; } - memcpy(key, key_not_terminated, key_len); - key[key_len] = '\0'; + data_out->resource_attributes[resource_index] = key; + data_out->resource_attributes[resource_index + 1] = value; + resource_index += 2; + } + } - data_out->resources[resources_index++] = key; - data_out->resources[resources_index++] = value; + // Parse extra attributes (field 2) + while (ptr < end_ptr) { + uint8_t extra_ctx_field; + if (!read_protobuf_tag(&ptr, end_ptr, &extra_ctx_field) || extra_ctx_field != 2) return false; + + uint16_t kv_len; + if (!read_protobuf_varint(&ptr, end_ptr, &kv_len)) return false; + char *kv_end = ptr + kv_len; + if (kv_end > end_ptr) return false; + + if (!peek_protobuf_key(ptr, kv_end, key_buffer)) return false; + + if (strcmp(key_buffer, "threadlocal.attribute_key_map") == 0) { + // Consume key to advance ptr + uint8_t kv_field; + if (!read_protobuf_tag(&ptr, kv_end, &kv_field) || kv_field != 1) return false; + if (!read_protobuf_string(&ptr, kv_end, key_buffer)) return false; + if (!data_out->thread_ctx_config) { + otel_thread_ctx_config_data *setup = (otel_thread_ctx_config_data *) calloc(1, sizeof(otel_thread_ctx_config_data)); + if (!setup) return false; + data_out->thread_ctx_config = setup; + } + if (!read_protobuf_array_value_strings(&ptr, kv_end, value_buffer, &((otel_thread_ctx_config_data *)data_out->thread_ctx_config)->attribute_key_map)) return false; + } else { + if (!read_protobuf_keyvalue(&ptr, kv_end, key_buffer, value_buffer)) return false; + + char *value = strdup(value_buffer); + if (!value) return false; + + // Dispatch based on key + if (strcmp(key_buffer, "threadlocal.schema_version") == 0) { + otel_thread_ctx_config_data *setup = (otel_thread_ctx_config_data *) calloc(1, sizeof(otel_thread_ctx_config_data)); + if (!setup) { free(value); return false; } + setup->schema_version = value; + data_out->thread_ctx_config = setup; + } else { + char *key = strdup(key_buffer); + if (!key || extra_attributes_index + 2 >= extra_attributes_capacity) { + free(key); + free(value); + return false; + } + data_out->extra_attributes[extra_attributes_index] = key; + data_out->extra_attributes[extra_attributes_index + 1] = value; + extra_attributes_index += 2; + } } } - // Verify all required fields were found + // Validate all required fields were found return data_out->deployment_environment_name != NULL && - data_out->host_name != NULL && data_out->service_instance_id != NULL && data_out->service_name != NULL && data_out->service_version != NULL && @@ -506,17 +802,30 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 } void otel_process_ctx_read_data_drop(otel_process_ctx_data data) { - if (data.deployment_environment_name) free(data.deployment_environment_name); - if (data.host_name) free(data.host_name); - if (data.service_instance_id) free(data.service_instance_id); - if (data.service_name) free(data.service_name); - if (data.service_version) free(data.service_version); - if (data.telemetry_sdk_language) free(data.telemetry_sdk_language); - if (data.telemetry_sdk_version) free(data.telemetry_sdk_version); - if (data.telemetry_sdk_name) free(data.telemetry_sdk_name); - if (data.resources) { - for (int i = 0; data.resources[i] != NULL; i++) free(data.resources[i]); - free(data.resources); + if (data.deployment_environment_name) free((void *)data.deployment_environment_name); + if (data.service_instance_id) free((void *)data.service_instance_id); + if (data.service_name) free((void *)data.service_name); + if (data.service_version) free((void *)data.service_version); + if (data.telemetry_sdk_language) free((void *)data.telemetry_sdk_language); + if (data.telemetry_sdk_version) free((void *)data.telemetry_sdk_version); + if (data.telemetry_sdk_name) free((void *)data.telemetry_sdk_name); + if (data.resource_attributes) { + for (int i = 0; data.resource_attributes[i] != NULL; i++) free((void *)data.resource_attributes[i]); + free((void *)data.resource_attributes); + } + if (data.extra_attributes) { + for (int i = 0; data.extra_attributes[i] != NULL; i++) free((void *)data.extra_attributes[i]); + free((void *)data.extra_attributes); + } + if (data.thread_ctx_config) { + if (data.thread_ctx_config->schema_version) free((void *)data.thread_ctx_config->schema_version); + if (data.thread_ctx_config->attribute_key_map) { + for (int i = 0; data.thread_ctx_config->attribute_key_map[i] != NULL; i++) { + free((void *)data.thread_ctx_config->attribute_key_map[i]); + } + free((void *)data.thread_ctx_config->attribute_key_map); + } + free((void *)data.thread_ctx_config); } } @@ -526,13 +835,25 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 return (otel_process_ctx_read_result) {.success = false, .error_message = "No OTEL_CTX mapping found (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; } - if (strncmp(mapping->otel_process_ctx_signature, "OTEL_CTX", sizeof(mapping->otel_process_ctx_signature)) != 0 || mapping->otel_process_ctx_version != 1) { + if (strncmp(mapping->otel_process_ctx_signature, OTEL_CTX_SIGNATURE, sizeof(mapping->otel_process_ctx_signature)) != 0 || mapping->otel_process_ctx_version != 2) { return (otel_process_ctx_read_result) {.success = false, .error_message = "Invalid OTEL_CTX signature or version (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; } otel_process_ctx_data data = empty_data; - if (!otel_process_ctx_decode_payload(mapping->otel_process_payload, &data)) { + char *key_buffer = (char *) calloc(KEY_VALUE_LIMIT + 1, 1); + char *value_buffer = (char *) calloc(KEY_VALUE_LIMIT + 1, 1); + if (!key_buffer || !value_buffer) { + free(key_buffer); + free(value_buffer); + return (otel_process_ctx_read_result) {.success = false, .error_message = "Failed to allocate decode buffers (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; + } + + bool success = otel_process_ctx_decode_payload(mapping->otel_process_payload, mapping->otel_process_payload_size, &data, key_buffer, value_buffer); + free(key_buffer); + free(value_buffer); + + if (!success) { otel_process_ctx_read_data_drop(data); return (otel_process_ctx_read_result) {.success = false, .error_message = "Failed to decode payload (" __FILE__ ":" ADD_QUOTES(__LINE__) ")", .data = empty_data}; } @@ -542,13 +863,8 @@ static otel_process_ctx_result otel_process_ctx_encode_payload(char **out, uint3 bool otel_process_ctx_read_drop(otel_process_ctx_read_result *result) { if (!result || !result->success) return false; - - // Free allocated strings in the data otel_process_ctx_read_data_drop(result->data); - - // Reset the result to empty state *result = (otel_process_ctx_read_result) {.success = false, .error_message = "Data dropped", .data = empty_data}; - return true; } #endif // OTEL_PROCESS_CTX_NO_READ diff --git a/ddprof-lib/src/main/cpp/otel_process_ctx.h b/ddprof-lib/src/main/cpp/otel_process_ctx.h index 878949a5e..4e83a4805 100644 --- a/ddprof-lib/src/main/cpp/otel_process_ctx.h +++ b/ddprof-lib/src/main/cpp/otel_process_ctx.h @@ -4,9 +4,9 @@ #pragma once #define OTEL_PROCESS_CTX_VERSION_MAJOR 0 -#define OTEL_PROCESS_CTX_VERSION_MINOR 0 -#define OTEL_PROCESS_CTX_VERSION_PATCH 7 -#define OTEL_PROCESS_CTX_VERSION_STRING "0.0.7" +#define OTEL_PROCESS_CTX_VERSION_MINOR 1 +#define OTEL_PROCESS_CTX_VERSION_PATCH 0 +#define OTEL_PROCESS_CTX_VERSION_STRING "0.1.0" #ifdef __cplusplus extern "C" { @@ -18,12 +18,25 @@ extern "C" { * # OpenTelemetry Process Context reference implementation * * `otel_process_ctx.h` and `otel_process_ctx.c` provide a reference implementation for the OpenTelemetry - * process-level context sharing specification. (TODO Link) + * process-level context sharing specification. + * (https://github.com/open-telemetry/opentelemetry-specification/pull/4719/) * * This reference implementation is Linux-only, as the specification currently only covers Linux. * On non-Linux OS's (or when OTEL_PROCESS_CTX_NOOP is defined) no-op versions of functions are supplied. */ + /** + * Config for the experimental thread context sharing mechanism, see + * https://docs.google.com/document/d/1eatbHpEXXhWZEPrXZpfR58-5RIx-81mUgF69Zpn3Rz4/edit?tab=t.bmgoq3yor67o for usage + * details. + */ +typedef struct { + const char *schema_version; + // NULL-terminated array of attribute key strings to be used in thread context. + // Can be NULL if not needed. + const char **attribute_key_map; +} otel_thread_ctx_config_data; + /** * Data that can be published as a process context. * @@ -37,38 +50,32 @@ extern "C" { * * Strings MAY be: * * Empty - * - * The below fields map to usual datadog attributes as follows (TODO: Remove this once we share the header publicly) - * * deployment_environment_name -> env - * * host_name -> hostname - * * service_instance_id -> runtime-id - * * service_name -> service - * * service_version -> version - * * telemetry_sdk_language -> tracer_language - * * telemetry_sdk_version -> tracer_version - * * telemetry_sdk_name -> name of library (e.g. dd-trace-java) */ typedef struct { // https://opentelemetry.io/docs/specs/semconv/registry/attributes/deployment/#deployment-environment-name - char *deployment_environment_name; - // https://opentelemetry.io/docs/specs/semconv/registry/attributes/host/#host-name - char *host_name; + const char *deployment_environment_name; // https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-instance-id - char *service_instance_id; + const char *service_instance_id; // https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-name - char *service_name; + const char *service_name; // https://opentelemetry.io/docs/specs/semconv/registry/attributes/service/#service-version - char *service_version; + const char *service_version; // https://opentelemetry.io/docs/specs/semconv/registry/attributes/telemetry/#telemetry-sdk-language - char *telemetry_sdk_language; + const char *telemetry_sdk_language; // https://opentelemetry.io/docs/specs/semconv/registry/attributes/telemetry/#telemetry-sdk-version - char *telemetry_sdk_version; + const char *telemetry_sdk_version; // https://opentelemetry.io/docs/specs/semconv/registry/attributes/telemetry/#telemetry-sdk-name - char *telemetry_sdk_name; - // Additional key/value pairs as resources https://opentelemetry.io/docs/specs/otel/resource/sdk/ - // Can be NULL if no resources are needed; if non-NULL, this array MUST be terminated with a NULL entry. + const char *telemetry_sdk_name; + // Additional key/value pairs as resource attributes https://opentelemetry.io/docs/specs/otel/resource/sdk/ + // Can be NULL if no resource attributes are needed; if non-NULL, this array MUST be terminated with a NULL entry. + // Every even entry is a key, every odd entry is a value (E.g. "key1", "value1", "key2", "value2", NULL). + const char **resource_attributes; + // Additional key/value pairs as extra attributes (ProcessContext.extra_attributes in process_context.proto) + // Can be NULL if no extra attributes are needed; if non-NULL, this array MUST be terminated with a NULL entry. // Every even entry is a key, every odd entry is a value (E.g. "key1", "value1", "key2", "value2", NULL). - char **resources; + const char **extra_attributes; + // Experimental thread context sharing mechanism configuration. See struct definition for details. Can be NULL. + const otel_thread_ctx_config_data *thread_ctx_config; } otel_process_ctx_data; /** Number of entries in the `otel_process_ctx_data` struct. Can be used to easily detect when the struct is updated. */ diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/OTelContext.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/OTelContext.java index d3b74a7cb..203db349e 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/OTelContext.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/OTelContext.java @@ -63,7 +63,7 @@ public ProcessContext(String deploymentEnvironmentName, String hostName, String this.telemetrySdkVersion = telemetrySdkVersion; this.telemetrySdkName = telemetrySdkName; } - + @Override public String toString() { return String.format("ProcessContext{deploymentEnvironmentName='%s', hostName='%s', serviceInstanceId='%s', serviceName='%s', serviceVersion='%s', telemetrySdkLanguage='%s', telemetrySdkVersion='%s', telemetrySdkName='%s'}", @@ -181,7 +181,7 @@ public ProcessContext readProcessContext() { * OTelContext.getInstance().setProcessContext( * "staging", // env * "my-hostname", // hostname - * "instance-12345" // runtime-id + * "instance-12345", // runtime-id * "my-service", // service * "1.0.0", // version * "3.5.0" // tracer-version @@ -191,8 +191,8 @@ public ProcessContext readProcessContext() { * @param env the deployment environment name as defined by OpenTelemetry * semantic conventions (deployment.environment.name). Must not be null. * Examples: "production", "staging", "development", "test" - * @param hostname the hostname of the service as defined by OpenTelemetry - * semantic conventions (host.name). Must not be null. + * @param hostname the hostname of the service, recorded under the OpenTelemetry + * semantic convention key host.name as a resource attribute. Must not be null. * Examples: "my-hostname", "my-hostname.example.com" * @param runtimeId the unique identifier for this specific instance of the service * as defined by OpenTelemetry semantic conventions (service.instance.id). @@ -219,7 +219,7 @@ public void setProcessContext(String env, String hostname, String runtimeId, Str setProcessCtx0(env, hostname, runtimeId, service, version, tracerVersion); } finally { lock.writeLock().unlock(); - } + } } private static native void setProcessCtx0(String env, String hostname, String runtimeId, String service, String version, String tracerVersion); diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/context/ProcessContextTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/context/ProcessContextTest.java index fac9421a1..902bb9ed5 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/context/ProcessContextTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/context/ProcessContextTest.java @@ -7,7 +7,6 @@ import java.io.BufferedReader; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -33,30 +32,31 @@ public void testProcessContextMappingCreation() throws IOException { OtelMappingInfo mapping = findOtelMapping(); assertNotNull(mapping, "OTEL mapping should exist after setProcessContext"); - + verifyMappingPermissions(mapping); } - + private static class OtelMappingInfo { final String startAddress; final String endAddress; final String permissions; - + OtelMappingInfo(String startAddress, String endAddress, String permissions) { this.startAddress = startAddress; this.endAddress = endAddress; this.permissions = permissions; } } - + private OtelMappingInfo findOtelMapping() throws IOException { Path mapsFile = Paths.get("/proc/self/maps"); if (!Files.exists(mapsFile)) { return null; } - - Pattern otelPattern = Pattern.compile("^([0-9a-f]+)-([0-9a-f]+)\\s+(\\S+)\\s+\\S+\\s+\\S+\\s+\\S+\\s*\\[anon:OTEL_CTX\\].*$"); - + + // Match memfd-backed mapping (/memfd:OTEL_CTX), anon mapping ([anon:OTEL_CTX]), or anon shmem ([anon_shmem:OTEL_CTX]) + Pattern otelPattern = Pattern.compile("^([0-9a-f]+)-([0-9a-f]+)\\s+(\\S+)\\s+\\S+\\s+\\S+\\s+\\S+\\s*(?:/memfd:OTEL_CTX|\\[anon:OTEL_CTX\\]|\\[anon_shmem:OTEL_CTX\\]).*$"); + try (BufferedReader reader = Files.newBufferedReader(mapsFile)) { String line; while ((line = reader.readLine()) != null) { @@ -76,8 +76,6 @@ private OtelMappingInfo findOtelMapping() throws IOException { private void verifyMappingPermissions(OtelMappingInfo mapping) { assertTrue(mapping.permissions.contains("r"), "OTEL mapping should have read permission, got: " + mapping.permissions); - assertFalse(mapping.permissions.contains("w"), - "OTEL mapping should not have write permission, got: " + mapping.permissions); assertFalse(mapping.permissions.contains("x"), "OTEL mapping should not have execute permission, got: " + mapping.permissions); }