diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index a9bab75a7..65177900a 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -383,7 +383,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.11.0", "log", "prettyplease", "proc-macro2", @@ -1652,15 +1652,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -2607,9 +2598,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 91bd85d17..1bf2e3db8 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -21,22 +21,96 @@ pub struct MSKRecord { pub topic: String, pub partition: i32, pub timestamp: f64, + #[serde(default)] + pub headers: Value, +} + +/// Decodes a header value into raw bytes. Two formats have been observed: +/// +/// - **Array**: elements are integers `[104, 101]` or decimal strings `["49"]` +/// - **Object** with numeric string keys and decimal string values: `{"0":"52","1":"54",...}` +fn bytes_from_header_value(val: &Value) -> Option> { + match val { + // Array format: elements may be integers `[104, 101]` or decimal strings `["49"]` + Value::Array(arr) => arr + .iter() + .map(|v| match v { + Value::Number(n) => n.as_u64().and_then(|n| u8::try_from(n).ok()), + Value::String(s) => s.parse::().ok(), + _ => None, + }) + .collect(), + // Object format with numeric string keys and decimal string values: `{"0":"52","1":"54",...}` + Value::Object(obj) => { + let mut pairs: Vec<(u64, u8)> = obj + .iter() + .filter_map(|(k, v)| { + Some((k.parse::().ok()?, v.as_str()?.parse::().ok()?)) + }) + .collect(); + pairs.sort_by_key(|(idx, _)| *idx); + Some(pairs.into_iter().map(|(_, b)| b).collect()) + } + _ => None, + } +} + +/// Extracts trace propagation headers from an MSK record's `headers` field into a carrier map. +/// The `headers` field is a JSON object with numeric string keys, one entry per Kafka header. +fn carrier_from_headers(headers: &Value) -> HashMap { + let mut carrier = HashMap::new(); + + let entries: Vec<&Value> = match headers { + Value::Array(arr) => arr.iter().collect(), + Value::Object(obj) => { + let mut pairs: Vec<(u64, &Value)> = obj + .iter() + .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v))) + .collect(); + pairs.sort_by_key(|(n, _)| *n); + pairs.into_iter().map(|(_, v)| v).collect() + } + _ => return carrier, + }; + + for entry in entries { + if let Value::Object(header_map) = entry { + for (key, val) in header_map { + if let Some(bytes) = bytes_from_header_value(val) + && let Ok(s) = String::from_utf8(bytes) + { + carrier.insert(key.to_lowercase(), s); + } + } + } + } + + carrier } impl Trigger for MSKEvent { fn new(mut payload: Value) -> Option { - // We only care about the first item in the first record, so drop the others before deserializing. + // We only care about the first item in the first record, so drop the others before + // deserializing. Records are delivered as a JSON object with numeric string keys; + // normalize to a single-element array before deserializing. if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) { - match records_map.iter_mut().next() { - Some((first_key, Value::Array(arr))) => { - arr.truncate(1); - let key = first_key.clone(); - records_map.retain(|k, _| k == &key); - } - _ => { - records_map.clear(); + let first_key = records_map.keys().next()?.clone(); + let normalized = match records_map.get(&first_key)? { + Value::Array(arr) => Value::Array(vec![arr.first()?.clone()]), + Value::Object(obj) => { + // Records delivered as object with numeric string keys: {"0": {...}, "1": {...}, ...} + // Take the record with the lowest numeric key. + let mut pairs: Vec<(u64, Value)> = obj + .iter() + .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v.clone()))) + .collect(); + pairs.sort_by_key(|(n, _)| *n); + let (_, first_record) = pairs.into_iter().next()?; + Value::Array(vec![first_record]) } - } + _ => return None, + }; + *records_map = serde_json::Map::from_iter([(first_key, normalized)]); } match serde_json::from_value::(payload) { @@ -49,13 +123,16 @@ impl Trigger for MSKEvent { } fn is_match(payload: &Value) -> bool { - payload + let first_record_group = payload .get("records") .and_then(Value::as_object) - .and_then(|map| map.values().next()) - .and_then(Value::as_array) - .and_then(|arr| arr.first()) - .is_some_and(|rec| rec.get("topic").is_some()) + .and_then(|map| map.values().next()); + let first_record = match first_record_group { + Some(Value::Array(arr)) => arr.first(), + Some(Value::Object(obj)) => obj.values().next(), + _ => return false, + }; + first_record.is_some_and(|rec| rec.get("topic").is_some()) } #[allow(clippy::cast_possible_truncation)] @@ -105,7 +182,10 @@ impl Trigger for MSKEvent { } fn get_carrier(&self) -> HashMap { - HashMap::new() + self.records + .values() + .find_map(|arr| arr.first()) + .map_or_else(HashMap::new, |record| carrier_from_headers(&record.headers)) } fn is_async(&self) -> bool { @@ -142,6 +222,7 @@ mod tests { topic: String::from("topic1"), partition: 0, timestamp: 1_745_846_213_022f64, + headers: Value::Array(vec![]), }; let mut expected_records = HashMap::new(); expected_records.insert(String::from("topic1"), vec![record]); @@ -335,4 +416,52 @@ mod tests { "msk" // fallback value ); } + + #[test] + fn test_new_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); + + let record = result + .records + .values() + .find_map(|arr| arr.first()) + .expect("Expected at least one record"); + assert_eq!(record.topic, "demo-topic"); + // headers is an object with 6 entries (2 non-datadog + 4 datadog) + assert_eq!( + record.headers.as_object().map(serde_json::Map::len), + Some(6) + ); + } + + #[test] + fn test_get_carrier_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); + let carrier = event.get_carrier(); + + // Datadog headers appear at indices 2-5; non-datadog headers at 0-1 are also decoded + // but won't be used by the propagator. + assert_eq!( + carrier.get("x-datadog-trace-id").map(String::as_str), + Some("1497116011738644768") + ); + assert_eq!( + carrier.get("x-datadog-parent-id").map(String::as_str), + Some("2239801583077304042") + ); + assert_eq!( + carrier + .get("x-datadog-sampling-priority") + .map(String::as_str), + Some("1") + ); + assert_eq!( + carrier.get("x-datadog-tags").map(String::as_str), + Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000") + ); + } } diff --git a/bottlecap/tests/payloads/msk_event_with_headers.json b/bottlecap/tests/payloads/msk_event_with_headers.json new file mode 100644 index 000000000..939658526 --- /dev/null +++ b/bottlecap/tests/payloads/msk_event_with_headers.json @@ -0,0 +1,26 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "demo-topic-0": { + "0": { + "topic": "demo-topic", + "partition": 0, + "offset": 101, + "timestamp": 1745846213022, + "timestampType": "CREATE_TIME", + "key": "b3JkZXJJZA==", + "value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==", + "headers": { + "0": {"someId": ["70","114","111","109","66","114","117","110","111"]}, + "1": {"anotherId": {"0":"55","1":"52","2":"50","3":"101","4":"101","5":"101","6":"52","7":"57","8":"45","9":"101","10":"51","11":"100","12":"55","13":"45","14":"52","15":"51","16":"52","17":"54","18":"45","19":"97","20":"54","21":"57","22":"57","23":"45","24":"52","25":"100","26":"49","27":"56","28":"101","29":"99","30":"98","31":"53","32":"53","33":"101","34":"50","35":"99"}}, + "2": {"x-datadog-trace-id": {"0":"49","1":"52","2":"57","3":"55","4":"49","5":"49","6":"54","7":"48","8":"49","9":"49","10":"55","11":"51","12":"56","13":"54","14":"52","15":"52","16":"55","17":"54","18":"56"}}, + "3": {"x-datadog-parent-id": {"0":"50","1":"50","2":"51","3":"57","4":"56","5":"48","6":"49","7":"53","8":"56","9":"51","10":"48","11":"55","12":"55","13":"51","14":"48","15":"52","16":"48","17":"52","18":"50"}}, + "4": {"x-datadog-sampling-priority": ["49"]}, + "5": {"x-datadog-tags": {"0":"95","1":"100","2":"100","3":"46","4":"112","5":"46","6":"100","7":"109","8":"61","9":"45","10":"49","11":"44","12":"95","13":"100","14":"100","15":"46","16":"112","17":"46","18":"116","19":"105","20":"100","21":"61","22":"54","23":"57","24":"57","25":"99","26":"56","27":"51","28":"54","29":"53","30":"48","31":"48","32":"48","33":"48","34":"48","35":"48","36":"48","37":"48"}} + } + } + } + } +}