Skip to content

feature: handle MSK events#1066

Draft
joeyzhao2018 wants to merge 12 commits intomainfrom
joey/handle_msk
Draft

feature: handle MSK events#1066
joeyzhao2018 wants to merge 12 commits intomainfrom
joey/handle_msk

Conversation

@joeyzhao2018
Copy link
Contributor

@joeyzhao2018 joeyzhao2018 commented Mar 5, 2026

https://datadoghq.atlassian.net/browse/SLES-2739

In Kafka's wire protocol (KIP-82), header values are always byte[]. Every Kafka client library enforces this:

Tracer Injection code Mechanism
dd-trace-java headers.add(key, value.getBytes(UTF_8)) String.getBytes() → byte[]
dd-trace-go Value: []byte(val) Go type conversion → []byte
dd-trace-dotnet _headers.Add(name, Encoding.UTF8.GetBytes(value)) UTF8.GetBytes() → byte[]

All three tracers accept string trace context values from the propagation layer, convert to UTF-8 bytes at the carrier adapter boundary, and hand byte[] to the Kafka client.
This isn't a quirk of Java's getBytes() — it's the only way Kafka headers work.

What MSK Lambda does

When MSK triggers a Lambda, AWS serializes the Kafka record to JSON. Since header values are byte[] on the wire, AWS encodes them as decimal byte values. However, the exact JSON
shape depends on the Lambda runtime:

  • Array format (observed in the existing msk_event.json testing payloads, i didn't change the support for this to be safe): byte values as a JSON array of integers
    "headers": [{"x-datadog-trace-id": [51, 54, 57, ...]}]
  • Object format (observed with the Java Lambda runtime): both the records list and the per-header byte values are JSON objects with numeric string keys, and byte values are
    decimal strings
    "records": {
    "topic-0": {
    "0": {
    "headers": {
    "0": {"someOtherHeader": ["70", "114", ...]},
    "2": {"x-datadog-trace-id": {"0":"52","1":"54",...}},
    "4": {"x-datadog-sampling-priority": ["49"]}
    }
    }
    }
    }
  • Note that Datadog headers can appear at any index — non-instrumentation headers may precede them.

What's the difference between the msk_event.json and the newly added msk_event_with_headers.json here?

  • msk_event.json represents a standard MSK trigger where the producer didn't attach any Kafka headers — i.e. no Datadog tracer was running on the producer side (or it's a non-instrumented producer like a raw Kafka client, a Kinesis Firehose delivery stream, or a schema-registry message). In those cases Lambda still delivers the event but with "headers": []. It's also the format you get when testing MSK triggers manually in the AWS console, which doesn't inject headers. ( source: Claude Code)
  • msk_event_with_headers.json reflects the real-world object format produced by the Java Lambda runtime, with a producer instrumented with a Datadog tracer injecting trace context
    as Kafka headers. It includes non-Datadog headers at lower indices to verify that the carrier extraction correctly finds Datadog headers regardless of their position. (source: I did a real world example and below is the evidence of testing)
Screenshot 2026-03-12 at 11 14 33 PM

joeyzhao2018 and others added 10 commits March 9, 2026 15:32
MSK event headers delivered by the Java Lambda runtime use a JSON object
with numeric string keys and decimal string values rather than an array
of integers. Records are similarly delivered as an object with numeric
string keys instead of an array.

Update deserialization and carrier extraction to support both formats,
and update the fixture and tests to reflect the real-world payload shape.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation
- Collapse nested `if let` blocks into a single `if let ... && let ...`
- Replace redundant closure `|o| o.len()` with `serde_json::Map::len`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant