From 513f0ca91f47e66d007e338f1c2e0464dc1305a7 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:56:52 -0500 Subject: [PATCH 1/8] handle MSK events --- .../invocation/triggers/msk_event.rs | 47 ++++++++++++++++++- .../payloads/msk_event_with_headers.json | 23 +++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 bottlecap/tests/payloads/msk_event_with_headers.json diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 91bd85d17..d8e9dba36 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -21,6 +21,8 @@ pub struct MSKRecord { pub topic: String, pub partition: i32, pub timestamp: f64, + #[serde(default)] + pub headers: Vec>>, } impl Trigger for MSKEvent { @@ -105,7 +107,17 @@ impl Trigger for MSKEvent { } fn get_carrier(&self) -> HashMap { - HashMap::new() + let mut carrier = HashMap::new(); + if let Some(record) = self.records.values().find_map(|arr| arr.first()) { + for header_map in &record.headers { + for (key, value_bytes) in header_map { + if let Ok(value_str) = String::from_utf8(value_bytes.clone()) { + carrier.insert(key.to_lowercase(), value_str); + } + } + } + } + carrier } fn is_async(&self) -> bool { @@ -142,6 +154,7 @@ mod tests { topic: String::from("topic1"), partition: 0, timestamp: 1_745_846_213_022f64, + headers: vec![], }; let mut expected_records = HashMap::new(); expected_records.insert(String::from("topic1"), vec![record]); @@ -335,4 +348,36 @@ mod tests { "msk" // fallback value ); } + + #[test] + fn test_new_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); + + let record = result.records.values().find_map(|arr| arr.first()).unwrap(); + assert_eq!(record.topic, "topic1"); + assert_eq!(record.headers.len(), 3); + } + + #[test] + fn test_get_carrier_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); + let carrier = event.get_carrier(); + + assert_eq!( + carrier.get("x-datadog-trace-id").map(String::as_str), + Some("36979754430890456950") + ); + assert_eq!( + carrier.get("x-datadog-parent-id").map(String::as_str), + Some("7431398482019833808") + ); + assert_eq!( + carrier.get("x-datadog-sampling-priority").map(String::as_str), + Some("1") + ); + } } diff --git a/bottlecap/tests/payloads/msk_event_with_headers.json b/bottlecap/tests/payloads/msk_event_with_headers.json new file mode 100644 index 000000000..b5a6e6238 --- /dev/null +++ b/bottlecap/tests/payloads/msk_event_with_headers.json @@ -0,0 +1,23 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "topic1": [ + { + "topic": "topic1", + "partition": 0, + "offset": 101, + "timestamp": 1745846213022, + "timestampType":"CREATE_TIME", + "key": "b3JkZXJJZA==", + "value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==", + "headers": [ + {"x-datadog-trace-id": [51, 54, 57, 55, 57, 55, 53, 52, 52, 51, 48, 56, 57, 48, 52, 53, 54, 57, 53, 48]}, + {"x-datadog-parent-id": [55, 52, 51, 49, 51, 57, 56, 52, 56, 50, 48, 49, 57, 56, 51, 51, 56, 48, 56]}, + {"x-datadog-sampling-priority": [49]} + ] + } + ] + } +} From fe2c84b99a7ed832441d060bd83fa976ef9f0eef Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:20:19 -0400 Subject: [PATCH 2/8] cargo fmt and merge main --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index d8e9dba36..cb5cf8796 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -376,7 +376,9 @@ mod tests { Some("7431398482019833808") ); assert_eq!( - carrier.get("x-datadog-sampling-priority").map(String::as_str), + carrier + .get("x-datadog-sampling-priority") + .map(String::as_str), Some("1") ); } From 1dbb7d6c8dc9939466f8797de8f1b9b627290b8e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:32:35 -0400 Subject: [PATCH 3/8] fix --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index cb5cf8796..58ae9da97 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -355,7 +355,7 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); - let record = result.records.values().find_map(|arr| arr.first()).unwrap(); + let record = result.records.values().find_map(|arr| arr.first()).expect("Expected at least one record"); assert_eq!(record.topic, "topic1"); assert_eq!(record.headers.len(), 3); } From 4683724ea818605ad5e472fadcbbc18035733676 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:33:38 -0400 Subject: [PATCH 4/8] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 58ae9da97..723a60bbb 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -355,7 +355,11 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); - let record = result.records.values().find_map(|arr| arr.first()).expect("Expected at least one record"); + let record = result + .records + .values() + .find_map(|arr| arr.first()) + .expect("Expected at least one record"); assert_eq!(record.topic, "topic1"); assert_eq!(record.headers.len(), 3); } From 22b24a23b5e5c2979147192b277122757ccbcbff Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 11 Mar 2026 13:44:58 -0400 Subject: [PATCH 5/8] fix critical vulnerability --- bottlecap/Cargo.lock | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 37517a6ae..cb3b02ab8 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -383,7 +383,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.11.0", "log", "prettyplease", "proc-macro2", @@ -1640,15 +1640,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -2585,9 +2576,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", From 40cd56f4aba77f436e89e019abdbfd2fa30a7af7 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 12 Mar 2026 23:38:13 -0400 Subject: [PATCH 6/8] fix(msk): handle real-world header format from Java Lambda runtime MSK event headers delivered by the Java Lambda runtime use a JSON object with numeric string keys and decimal string values rather than an array of integers. Records are similarly delivered as an object with numeric string keys instead of an array. Update deserialization and carrier extraction to support both formats, and update the fixture and tests to reflect the real-world payload shape. Co-Authored-By: Claude Sonnet 4.6 --- .../invocation/triggers/msk_event.rs | 139 ++++++++++++++---- .../payloads/msk_event_with_headers.json | 29 ++-- 2 files changed, 123 insertions(+), 45 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 723a60bbb..04250bbd6 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -22,23 +22,95 @@ pub struct MSKRecord { pub partition: i32, pub timestamp: f64, #[serde(default)] - pub headers: Vec>>, + pub headers: Value, +} + +/// Decodes a header value into raw bytes. Two formats have been observed: +/// +/// - **Array**: elements are integers `[104, 101]` or decimal strings `["49"]` +/// - **Object** with numeric string keys and decimal string values: `{"0":"52","1":"54",...}` +fn bytes_from_header_value(val: &Value) -> Option> { + match val { + // Array format: elements may be integers `[104, 101]` or decimal strings `["49"]` + Value::Array(arr) => arr + .iter() + .map(|v| match v { + Value::Number(n) => n.as_u64().map(|n| n as u8), + Value::String(s) => s.parse::().ok(), + _ => None, + }) + .collect(), + // Object format with numeric string keys and decimal string values: `{"0":"52","1":"54",...}` + Value::Object(obj) => { + let mut pairs: Vec<(u64, u8)> = obj + .iter() + .filter_map(|(k, v)| { + Some((k.parse::().ok()?, v.as_str()?.parse::().ok()?)) + }) + .collect(); + pairs.sort_by_key(|(idx, _)| *idx); + Some(pairs.into_iter().map(|(_, b)| b).collect()) + } + _ => None, + } +} + +/// Extracts trace propagation headers from an MSK record's `headers` field into a carrier map. +/// The `headers` field is a JSON object with numeric string keys, one entry per Kafka header. +fn carrier_from_headers(headers: &Value) -> HashMap { + let mut carrier = HashMap::new(); + + let entries: Vec<&Value> = match headers { + Value::Array(arr) => arr.iter().collect(), + Value::Object(obj) => { + let mut pairs: Vec<(u64, &Value)> = obj + .iter() + .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v))) + .collect(); + pairs.sort_by_key(|(n, _)| *n); + pairs.into_iter().map(|(_, v)| v).collect() + } + _ => return carrier, + }; + + for entry in entries { + if let Value::Object(header_map) = entry { + for (key, val) in header_map { + if let Some(bytes) = bytes_from_header_value(val) { + if let Ok(s) = String::from_utf8(bytes) { + carrier.insert(key.to_lowercase(), s); + } + } + } + } + } + + carrier } impl Trigger for MSKEvent { fn new(mut payload: Value) -> Option { - // We only care about the first item in the first record, so drop the others before deserializing. + // We only care about the first item in the first record, so drop the others before + // deserializing. Records are delivered as a JSON object with numeric string keys; + // normalize to a single-element array before deserializing. if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) { - match records_map.iter_mut().next() { - Some((first_key, Value::Array(arr))) => { - arr.truncate(1); - let key = first_key.clone(); - records_map.retain(|k, _| k == &key); - } - _ => { - records_map.clear(); + let first_key = records_map.keys().next()?.clone(); + let normalized = match records_map.get(&first_key)? { + Value::Array(arr) => Value::Array(vec![arr.first()?.clone()]), + Value::Object(obj) => { + // Records delivered as object with numeric string keys: {"0": {...}, "1": {...}, ...} + // Take the record with the lowest numeric key. + let mut pairs: Vec<(u64, Value)> = obj + .iter() + .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v.clone()))) + .collect(); + pairs.sort_by_key(|(n, _)| *n); + let (_, first_record) = pairs.into_iter().next()?; + Value::Array(vec![first_record]) } - } + _ => return None, + }; + *records_map = serde_json::Map::from_iter([(first_key, normalized)]); } match serde_json::from_value::(payload) { @@ -51,13 +123,16 @@ impl Trigger for MSKEvent { } fn is_match(payload: &Value) -> bool { - payload + let first_record_group = payload .get("records") .and_then(Value::as_object) - .and_then(|map| map.values().next()) - .and_then(Value::as_array) - .and_then(|arr| arr.first()) - .is_some_and(|rec| rec.get("topic").is_some()) + .and_then(|map| map.values().next()); + let first_record = match first_record_group { + Some(Value::Array(arr)) => arr.first(), + Some(Value::Object(obj)) => obj.values().next(), + _ => return false, + }; + first_record.is_some_and(|rec| rec.get("topic").is_some()) } #[allow(clippy::cast_possible_truncation)] @@ -107,17 +182,10 @@ impl Trigger for MSKEvent { } fn get_carrier(&self) -> HashMap { - let mut carrier = HashMap::new(); - if let Some(record) = self.records.values().find_map(|arr| arr.first()) { - for header_map in &record.headers { - for (key, value_bytes) in header_map { - if let Ok(value_str) = String::from_utf8(value_bytes.clone()) { - carrier.insert(key.to_lowercase(), value_str); - } - } - } - } - carrier + self.records + .values() + .find_map(|arr| arr.first()) + .map_or_else(HashMap::new, |record| carrier_from_headers(&record.headers)) } fn is_async(&self) -> bool { @@ -154,7 +222,7 @@ mod tests { topic: String::from("topic1"), partition: 0, timestamp: 1_745_846_213_022f64, - headers: vec![], + headers: Value::Array(vec![]), }; let mut expected_records = HashMap::new(); expected_records.insert(String::from("topic1"), vec![record]); @@ -360,8 +428,9 @@ mod tests { .values() .find_map(|arr| arr.first()) .expect("Expected at least one record"); - assert_eq!(record.topic, "topic1"); - assert_eq!(record.headers.len(), 3); + assert_eq!(record.topic, "demo-topic"); + // headers is an object with 6 entries (2 non-datadog + 4 datadog) + assert_eq!(record.headers.as_object().map(|o| o.len()), Some(6)); } #[test] @@ -371,13 +440,15 @@ mod tests { let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); let carrier = event.get_carrier(); + // Datadog headers appear at indices 2-5; non-datadog headers at 0-1 are also decoded + // but won't be used by the propagator. assert_eq!( carrier.get("x-datadog-trace-id").map(String::as_str), - Some("36979754430890456950") + Some("1497116011738644768") ); assert_eq!( carrier.get("x-datadog-parent-id").map(String::as_str), - Some("7431398482019833808") + Some("2239801583077304042") ); assert_eq!( carrier @@ -385,5 +456,9 @@ mod tests { .map(String::as_str), Some("1") ); + assert_eq!( + carrier.get("x-datadog-tags").map(String::as_str), + Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000") + ); } } diff --git a/bottlecap/tests/payloads/msk_event_with_headers.json b/bottlecap/tests/payloads/msk_event_with_headers.json index b5a6e6238..939658526 100644 --- a/bottlecap/tests/payloads/msk_event_with_headers.json +++ b/bottlecap/tests/payloads/msk_event_with_headers.json @@ -3,21 +3,24 @@ "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { - "topic1": [ - { - "topic": "topic1", - "partition": 0, - "offset": 101, - "timestamp": 1745846213022, - "timestampType":"CREATE_TIME", + "demo-topic-0": { + "0": { + "topic": "demo-topic", + "partition": 0, + "offset": 101, + "timestamp": 1745846213022, + "timestampType": "CREATE_TIME", "key": "b3JkZXJJZA==", "value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==", - "headers": [ - {"x-datadog-trace-id": [51, 54, 57, 55, 57, 55, 53, 52, 52, 51, 48, 56, 57, 48, 52, 53, 54, 57, 53, 48]}, - {"x-datadog-parent-id": [55, 52, 51, 49, 51, 57, 56, 52, 56, 50, 48, 49, 57, 56, 51, 51, 56, 48, 56]}, - {"x-datadog-sampling-priority": [49]} - ] + "headers": { + "0": {"someId": ["70","114","111","109","66","114","117","110","111"]}, + "1": {"anotherId": {"0":"55","1":"52","2":"50","3":"101","4":"101","5":"101","6":"52","7":"57","8":"45","9":"101","10":"51","11":"100","12":"55","13":"45","14":"52","15":"51","16":"52","17":"54","18":"45","19":"97","20":"54","21":"57","22":"57","23":"45","24":"52","25":"100","26":"49","27":"56","28":"101","29":"99","30":"98","31":"53","32":"53","33":"101","34":"50","35":"99"}}, + "2": {"x-datadog-trace-id": {"0":"49","1":"52","2":"57","3":"55","4":"49","5":"49","6":"54","7":"48","8":"49","9":"49","10":"55","11":"51","12":"56","13":"54","14":"52","15":"52","16":"55","17":"54","18":"56"}}, + "3": {"x-datadog-parent-id": {"0":"50","1":"50","2":"51","3":"57","4":"56","5":"48","6":"49","7":"53","8":"56","9":"51","10":"48","11":"55","12":"55","13":"51","14":"48","15":"52","16":"48","17":"52","18":"50"}}, + "4": {"x-datadog-sampling-priority": ["49"]}, + "5": {"x-datadog-tags": {"0":"95","1":"100","2":"100","3":"46","4":"112","5":"46","6":"100","7":"109","8":"61","9":"45","10":"49","11":"44","12":"95","13":"100","14":"100","15":"46","16":"112","17":"46","18":"116","19":"105","20":"100","21":"61","22":"54","23":"57","24":"57","25":"99","26":"56","27":"51","28":"54","29":"53","30":"48","31":"48","32":"48","33":"48","34":"48","35":"48","36":"48","37":"48"}} + } } - ] + } } } From f64e58340da53b5fed765a4c8c29ab23755483db Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 12 Mar 2026 23:55:15 -0400 Subject: [PATCH 7/8] fix(msk): fix clippy lints in msk_event header parsing - Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation - Collapse nested `if let` blocks into a single `if let ... && let ...` - Replace redundant closure `|o| o.len()` with `serde_json::Map::len` Co-Authored-By: Claude Sonnet 4.6 --- .../src/lifecycle/invocation/triggers/msk_event.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 04250bbd6..1d265b203 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -35,7 +35,7 @@ fn bytes_from_header_value(val: &Value) -> Option> { Value::Array(arr) => arr .iter() .map(|v| match v { - Value::Number(n) => n.as_u64().map(|n| n as u8), + Value::Number(n) => n.as_u64().and_then(|n| u8::try_from(n).ok()), Value::String(s) => s.parse::().ok(), _ => None, }) @@ -76,10 +76,10 @@ fn carrier_from_headers(headers: &Value) -> HashMap { for entry in entries { if let Value::Object(header_map) = entry { for (key, val) in header_map { - if let Some(bytes) = bytes_from_header_value(val) { - if let Ok(s) = String::from_utf8(bytes) { - carrier.insert(key.to_lowercase(), s); - } + if let Some(bytes) = bytes_from_header_value(val) + && let Ok(s) = String::from_utf8(bytes) + { + carrier.insert(key.to_lowercase(), s); } } } @@ -430,7 +430,7 @@ mod tests { .expect("Expected at least one record"); assert_eq!(record.topic, "demo-topic"); // headers is an object with 6 entries (2 non-datadog + 4 datadog) - assert_eq!(record.headers.as_object().map(|o| o.len()), Some(6)); + assert_eq!(record.headers.as_object().map(serde_json::Map::len), Some(6)); } #[test] From 46014bb888a0801c2a1d6d3b8ed2019b9a9bdb12 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 13 Mar 2026 00:31:59 -0400 Subject: [PATCH 8/8] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 1d265b203..1bf2e3db8 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -430,7 +430,10 @@ mod tests { .expect("Expected at least one record"); assert_eq!(record.topic, "demo-topic"); // headers is an object with 6 entries (2 non-datadog + 4 datadog) - assert_eq!(record.headers.as_object().map(serde_json::Map::len), Some(6)); + assert_eq!( + record.headers.as_object().map(serde_json::Map::len), + Some(6) + ); } #[test]