Skip to content
Open
15 changes: 3 additions & 12 deletions bottlecap/Cargo.lock
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the changes here?

it's just trying to fix the test failures. I also put this change in a separate pr here #1099 if you prefer that way.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 145 additions & 16 deletions bottlecap/src/lifecycle/invocation/triggers/msk_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,96 @@ pub struct MSKRecord {
pub topic: String,
pub partition: i32,
pub timestamp: f64,
#[serde(default)]
pub headers: Value,
}

/// Decodes a header value into raw bytes. Two formats have been observed:
///
/// - **Array**: elements are integers `[104, 101]` or decimal strings `["49"]`
/// - **Object** with numeric string keys and decimal string values: `{"0":"52","1":"54",...}`
fn bytes_from_header_value(val: &Value) -> Option<Vec<u8>> {
match val {
// Array format: elements may be integers `[104, 101]` or decimal strings `["49"]`
Value::Array(arr) => arr
.iter()
.map(|v| match v {
Value::Number(n) => n.as_u64().and_then(|n| u8::try_from(n).ok()),
Value::String(s) => s.parse::<u8>().ok(),
_ => None,
})
.collect(),
// Object format with numeric string keys and decimal string values: `{"0":"52","1":"54",...}`
Value::Object(obj) => {
let mut pairs: Vec<(u64, u8)> = obj
.iter()
.filter_map(|(k, v)| {
Some((k.parse::<u64>().ok()?, v.as_str()?.parse::<u8>().ok()?))
})
.collect();
pairs.sort_by_key(|(idx, _)| *idx);
Some(pairs.into_iter().map(|(_, b)| b).collect())
}
_ => None,
}
}

/// Extracts trace propagation headers from an MSK record's `headers` field into a carrier map.
/// The `headers` field is a JSON object with numeric string keys, one entry per Kafka header.
fn carrier_from_headers(headers: &Value) -> HashMap<String, String> {
let mut carrier = HashMap::new();

let entries: Vec<&Value> = match headers {
Value::Array(arr) => arr.iter().collect(),
Value::Object(obj) => {
let mut pairs: Vec<(u64, &Value)> = obj
.iter()
.filter_map(|(k, v)| k.parse::<u64>().ok().map(|n| (n, v)))
.collect();
pairs.sort_by_key(|(n, _)| *n);
pairs.into_iter().map(|(_, v)| v).collect()
}
_ => return carrier,
};

for entry in entries {
if let Value::Object(header_map) = entry {
for (key, val) in header_map {
if let Some(bytes) = bytes_from_header_value(val)
&& let Ok(s) = String::from_utf8(bytes)
{
carrier.insert(key.to_lowercase(), s);
}
}
}
}

carrier
}

impl Trigger for MSKEvent {
fn new(mut payload: Value) -> Option<Self> {
// We only care about the first item in the first record, so drop the others before deserializing.
// We only care about the first item in the first record, so drop the others before
// deserializing. Records are delivered as a JSON object with numeric string keys;
// normalize to a single-element array before deserializing.
if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) {
match records_map.iter_mut().next() {
Some((first_key, Value::Array(arr))) => {
arr.truncate(1);
let key = first_key.clone();
records_map.retain(|k, _| k == &key);
}
_ => {
records_map.clear();
let first_key = records_map.keys().next()?.clone();
let normalized = match records_map.get(&first_key)? {
Value::Array(arr) => Value::Array(vec![arr.first()?.clone()]),
Value::Object(obj) => {
// Records delivered as object with numeric string keys: {"0": {...}, "1": {...}, ...}
// Take the record with the lowest numeric key.
let mut pairs: Vec<(u64, Value)> = obj
.iter()
.filter_map(|(k, v)| k.parse::<u64>().ok().map(|n| (n, v.clone())))
.collect();
pairs.sort_by_key(|(n, _)| *n);
let (_, first_record) = pairs.into_iter().next()?;
Value::Array(vec![first_record])
}
}
_ => return None,
};
*records_map = serde_json::Map::from_iter([(first_key, normalized)]);
}

match serde_json::from_value::<Self>(payload) {
Expand All @@ -49,13 +123,16 @@ impl Trigger for MSKEvent {
}

fn is_match(payload: &Value) -> bool {
payload
let first_record_group = payload
.get("records")
.and_then(Value::as_object)
.and_then(|map| map.values().next())
.and_then(Value::as_array)
.and_then(|arr| arr.first())
.is_some_and(|rec| rec.get("topic").is_some())
.and_then(|map| map.values().next());
let first_record = match first_record_group {
Some(Value::Array(arr)) => arr.first(),
Some(Value::Object(obj)) => obj.values().next(),
_ => return false,
};
first_record.is_some_and(|rec| rec.get("topic").is_some())
}

#[allow(clippy::cast_possible_truncation)]
Expand Down Expand Up @@ -105,7 +182,10 @@ impl Trigger for MSKEvent {
}

fn get_carrier(&self) -> HashMap<String, String> {
HashMap::new()
self.records
.values()
.find_map(|arr| arr.first())
.map_or_else(HashMap::new, |record| carrier_from_headers(&record.headers))
}

fn is_async(&self) -> bool {
Expand Down Expand Up @@ -142,6 +222,7 @@ mod tests {
topic: String::from("topic1"),
partition: 0,
timestamp: 1_745_846_213_022f64,
headers: Value::Array(vec![]),
};
let mut expected_records = HashMap::new();
expected_records.insert(String::from("topic1"), vec![record]);
Expand Down Expand Up @@ -335,4 +416,52 @@ mod tests {
"msk" // fallback value
);
}

#[test]
fn test_new_with_headers() {
let json = read_json_file("msk_event_with_headers.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent");

let record = result
.records
.values()
.find_map(|arr| arr.first())
.expect("Expected at least one record");
assert_eq!(record.topic, "demo-topic");
// headers is an object with 6 entries (2 non-datadog + 4 datadog)
assert_eq!(
record.headers.as_object().map(serde_json::Map::len),
Some(6)
);
}

#[test]
fn test_get_carrier_with_headers() {
let json = read_json_file("msk_event_with_headers.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent");
let carrier = event.get_carrier();

// Datadog headers appear at indices 2-5; non-datadog headers at 0-1 are also decoded
// but won't be used by the propagator.
assert_eq!(
carrier.get("x-datadog-trace-id").map(String::as_str),
Some("1497116011738644768")
);
assert_eq!(
carrier.get("x-datadog-parent-id").map(String::as_str),
Some("2239801583077304042")
);
assert_eq!(
carrier
.get("x-datadog-sampling-priority")
.map(String::as_str),
Some("1")
);
assert_eq!(
carrier.get("x-datadog-tags").map(String::as_str),
Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000")
);
}
}
26 changes: 26 additions & 0 deletions bottlecap/tests/payloads/msk_event_with_headers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"demo-topic-0": {
"0": {
"topic": "demo-topic",
"partition": 0,
"offset": 101,
"timestamp": 1745846213022,
"timestampType": "CREATE_TIME",
"key": "b3JkZXJJZA==",
"value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==",
"headers": {
"0": {"someId": ["70","114","111","109","66","114","117","110","111"]},
"1": {"anotherId": {"0":"55","1":"52","2":"50","3":"101","4":"101","5":"101","6":"52","7":"57","8":"45","9":"101","10":"51","11":"100","12":"55","13":"45","14":"52","15":"51","16":"52","17":"54","18":"45","19":"97","20":"54","21":"57","22":"57","23":"45","24":"52","25":"100","26":"49","27":"56","28":"101","29":"99","30":"98","31":"53","32":"53","33":"101","34":"50","35":"99"}},
"2": {"x-datadog-trace-id": {"0":"49","1":"52","2":"57","3":"55","4":"49","5":"49","6":"54","7":"48","8":"49","9":"49","10":"55","11":"51","12":"56","13":"54","14":"52","15":"52","16":"55","17":"54","18":"56"}},
"3": {"x-datadog-parent-id": {"0":"50","1":"50","2":"51","3":"57","4":"56","5":"48","6":"49","7":"53","8":"56","9":"51","10":"48","11":"55","12":"55","13":"51","14":"48","15":"52","16":"48","17":"52","18":"50"}},
"4": {"x-datadog-sampling-priority": ["49"]},
"5": {"x-datadog-tags": {"0":"95","1":"100","2":"100","3":"46","4":"112","5":"46","6":"100","7":"109","8":"61","9":"45","10":"49","11":"44","12":"95","13":"100","14":"100","15":"46","16":"112","17":"46","18":"116","19":"105","20":"100","21":"61","22":"54","23":"57","24":"57","25":"99","26":"56","27":"51","28":"54","29":"53","30":"48","31":"48","32":"48","33":"48","34":"48","35":"48","36":"48","37":"48"}}
}
}
}
}
}
Loading