Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::stats_exporter;
use arc_swap::ArcSwap;
use libdd_common::{Endpoint, HttpClient, MutexExt};
use libdd_trace_stats::span_concentrator::SpanConcentrator;
use libdd_trace_utils::span::TraceProjector;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -230,15 +231,22 @@ pub(crate) fn process_traces_for_stats<T: libdd_trace_utils::span::TraceData>(
stats_concentrator, ..
} = &**client_side_stats.load()
{
// Wrap the traces in a TraceCollection to use the TraceProjector API
let mut collection = libdd_trace_utils::span::v04::TraceCollection::new(std::mem::take(traces));

if !client_computed_top_level {
for chunk in traces.iter_mut() {
libdd_trace_utils::span::trace_utils::compute_top_level_span(chunk);
let mut projected = collection.project_mut();
for mut chunk in projected.chunks_mut() {
libdd_trace_utils::span::trace_utils::compute_top_level_span(&mut chunk);
}
}
add_spans_to_stats(stats_concentrator, traces);
add_spans_to_stats(stats_concentrator, &mut collection.traces);
// Once stats have been computed we can drop all chunks that are not going to be
// sampled by the agent
let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces);
let mut projected = collection.project_mut();
let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(&mut projected);
// Extract the traces back
*traces = collection.traces;

// Update the headers to indicate that stats have been computed and forward dropped
// traces counts
Expand Down
3 changes: 3 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl<'a> TraceSerializer<'a> {
tracer_payload::TraceChunks::V05(p) => {
rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)
}
tracer_payload::TraceChunks::V1(_) => {
todo!("V1 serialization not yet implemented")
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions libdd-data-pipeline/tests/test_trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod tracing_integration_tests {
use libdd_data_pipeline::trace_exporter::{
TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
};
use libdd_trace_utils::span::v05::dict::SharedDict;
use libdd_trace_utils::span::SharedDictBytes;
use libdd_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent;
use libdd_trace_utils::test_utils::{create_test_json_span, create_test_v05_span};
use serde_json::json;
Expand Down Expand Up @@ -68,7 +68,7 @@ mod tracing_integration_tests {
}

fn get_v05_trace_snapshot_test_payload() -> Vec<u8> {
let mut dict = SharedDict::default();
let mut dict = SharedDictBytes::default();

let span_1 = create_test_v05_span(
1234,
Expand Down
2 changes: 2 additions & 0 deletions libdd-tinybytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ rmp-serde = "1.1.1"

[dependencies]
serde = { version = "1.0.209", optional = true }
hashbrown = { version = "0.15", optional = true }

[features]
bytes_string = []
serialization = ["serde"]
hashbrown_support = ["hashbrown"]
26 changes: 23 additions & 3 deletions libdd-tinybytes/src/bytes_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ impl From<String> for BytesString {
}
}

impl From<&'static str> for BytesString {
fn from(value: &'static str) -> Self {
Self::from_static(value)
impl From<&str> for BytesString {
fn from(value: &str) -> Self {
Self::from_string(value.to_string())
}
}

Expand All @@ -199,6 +199,12 @@ impl hash::Hash for BytesString {
}
}

impl PartialEq<str> for BytesString {
fn eq(&self, other: &str) -> bool {
self.as_str() == other
}
}

impl PartialEq<&str> for BytesString {
fn eq(&self, other: &&str) -> bool {
self.as_str() == *other
Expand All @@ -211,6 +217,20 @@ impl Debug for BytesString {
}
}

#[cfg(feature = "hashbrown_support")]
impl hashbrown::Equivalent<BytesString> for String {
fn equivalent(&self, key: &BytesString) -> bool {
self.as_str() == key.as_ref()
}
}

#[cfg(feature = "hashbrown_support")]
impl<'a> hashbrown::Equivalent<BytesString> for &'a str {
fn equivalent(&self, key: &BytesString) -> bool {
*self == key.as_ref()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 6 additions & 0 deletions libdd-tinybytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ impl<T: UnderlyingBytes> From<T> for Bytes {
}
}

impl<'a> From<&'a [u8]> for Bytes {
fn from(value: &'a [u8]) -> Self {
Self::copy_from_slice(value)
}
}

impl AsRef<[u8]> for Bytes {
#[inline]
fn as_ref(&self) -> &[u8] {
Expand Down
9 changes: 5 additions & 4 deletions libdd-trace-stats/src/span_concentrator/stat_span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! support both trace-utils' Span and pb::Span.

use libdd_trace_protobuf::pb;
use libdd_trace_utils::span::{trace_utils, v04::Span, TraceData};
use libdd_trace_utils::span::{v04::Span, TraceData};
use libdd_trace_utils::trace_utils as pb_utils;
use std::borrow::Borrow;

Expand Down Expand Up @@ -73,15 +73,16 @@ impl<'a, T: TraceData> StatSpan<'a> for Span<T> {
}

fn is_measured(&'a self) -> bool {
trace_utils::is_measured(self)
self.metrics.get("_dd.measured").is_some_and(|v| *v == 1.0)
}

fn is_partial_snapshot(&'a self) -> bool {
trace_utils::is_partial_snapshot(self)
self.metrics.get("_dd.partial_version").is_some_and(|v| *v >= 0.0)
}

fn has_top_level(&'a self) -> bool {
trace_utils::has_top_level(self)
self.metrics.get("_dd.top_level").is_some_and(|v| *v == 1.0)
|| self.metrics.get("_top_level").is_some_and(|v| *v == 1.0)
}

fn get_meta(&'a self, key: &str) -> Option<&'a str> {
Expand Down
3 changes: 3 additions & 0 deletions libdd-trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ rand = "0.8.5"
bytes = "1.6.0"
rmpv = { version = "1.3.0", default-features = false }
rmp = { version = "0.8.14", default-features = false }
hashbrown = { version = "0.15", features = ["serde"] }
strum = { version = "0.26.2", features = ["derive"] }

libdd-common = { version = "1.1.0", path = "../libdd-common", default-features = false }
libdd-trace-protobuf = { version = "1.0.0", path = "../libdd-trace-protobuf" }
libdd-trace-normalization = { version = "1.0.0", path = "../libdd-trace-normalization" }
libdd-tinybytes = { version = "1.0.0", path = "../libdd-tinybytes", features = [
"bytes_string",
"serialization",
"hashbrown_support",
] }

# Compression feature
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-utils/src/msgpack_decoder/decode/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::msgpack_decoder::decode::{buffer::Buffer, error::DecodeError};
use crate::span::DeserializableTraceData;
use rmp::{decode, decode::RmpRead, Marker};
use std::collections::HashMap;
use hashbrown::HashMap;

/// Reads a map from the buffer and returns it as a `HashMap`.
///
Expand Down
6 changes: 3 additions & 3 deletions libdd-trace-utils/src/msgpack_decoder/decode/meta_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::string::handle_null_marker;
use crate::span::DeserializableTraceData;
use rmp::decode;
use std::collections::HashMap;
use hashbrown::HashMap;

fn read_byte_array_len<T: DeserializableTraceData>(
buf: &mut Buffer<T>,
Expand Down Expand Up @@ -52,7 +52,7 @@ mod tests {

#[test]
fn read_meta_test() {
let meta = HashMap::from([("key".to_string(), Bytes::from(vec![1, 2, 3, 4]))]);
let meta: HashMap<String, Bytes> = HashMap::from([("key".to_string(), Bytes::from(vec![1, 2, 3, 4]))]);

let serialized = rmp_serde::to_vec_named(&meta).unwrap();
let mut slice = Buffer::<SliceData>::new(serialized.as_ref());
Expand All @@ -63,7 +63,7 @@ mod tests {

#[test]
fn read_meta_wrong_family_test() {
let meta = HashMap::from([("key".to_string(), vec![1, 2, 3, 4])]);
let meta: HashMap<String, Vec<u8>> = HashMap::from([("key".to_string(), vec![1, 2, 3, 4])]);

let serialized = rmp_serde::to_vec_named(&meta).unwrap();
let mut slice = Buffer::<SliceData>::new(serialized.as_ref());
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-utils/src/msgpack_decoder/decode/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::number::read_number;
use crate::msgpack_decoder::decode::string::handle_null_marker;
use crate::span::DeserializableTraceData;
use std::collections::HashMap;
use hashbrown::HashMap;

#[inline]
pub fn read_metric_pair<T: DeserializableTraceData>(
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-utils/src/msgpack_decoder/decode/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl TryFrom<Number> for f64 {
}
}

fn read_num(buf: &mut &[u8], allow_null: bool) -> Result<Number, DecodeError> {
pub fn read_num(buf: &mut &[u8], allow_null: bool) -> Result<Number, DecodeError> {
match rmp::decode::read_marker(buf)
.map_err(|_| DecodeError::InvalidFormat("Unable to read marker for number".to_owned()))?
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::msgpack_decoder::decode::string::handle_null_marker;
use crate::span::v04::{AttributeAnyValue, AttributeArrayValue, SpanEvent};
use crate::span::DeserializableTraceData;
use std::borrow::Borrow;
use std::collections::HashMap;
use hashbrown::HashMap;
use std::str::FromStr;

/// Reads a slice of bytes and decodes it into a vector of `SpanEvent` objects.
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-utils/src/msgpack_decoder/decode/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::msgpack_decoder::decode::buffer::Buffer;
use crate::msgpack_decoder::decode::error::DecodeError;
use crate::span::DeserializableTraceData;
use rmp::decode;
use std::collections::HashMap;
use hashbrown::HashMap;

// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192)
const NULL_MARKER: &u8 = &0xc0;
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-utils/src/msgpack_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
pub mod decode;
pub mod v04;
pub mod v05;
pub mod v1;
6 changes: 3 additions & 3 deletions libdd-trace-utils/src/msgpack_decoder/v04/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ mod tests {
use rmp_serde;
use rmp_serde::to_vec_named;
use serde_json::json;
use std::collections::HashMap;
use hashbrown::HashMap;

#[test]
fn test_empty_array() {
Expand Down Expand Up @@ -288,7 +288,7 @@ mod tests {

#[test]
fn test_decoder_meta_fixed_map_success() {
let expected_meta = HashMap::from([
let expected_meta: HashMap<String, String> = HashMap::from([
("key1".to_string(), "value1".to_string()),
("key2".to_string(), "value2".to_string()),
]);
Expand Down Expand Up @@ -360,7 +360,7 @@ mod tests {

#[test]
fn test_decoder_metrics_fixed_map_success() {
let expected_metrics = HashMap::from([("metric1", 1.23), ("metric2", 4.56)]);
let expected_metrics: HashMap<&str, f64> = HashMap::from([("metric1", 1.23), ("metric2", 4.56)]);

let mut span = create_test_json_span(1, 2, 0, 0, false);
span["metrics"] = json!(expected_metrics.clone());
Expand Down
2 changes: 1 addition & 1 deletion libdd-trace-utils/src/msgpack_decoder/v05/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::msgpack_decoder::decode::{
};
use crate::span::v04::{Span, SpanBytes, SpanSlice};
use crate::span::DeserializableTraceData;
use std::collections::HashMap;
use hashbrown::HashMap;

const PAYLOAD_LEN: u32 = 2;
const SPAN_ELEM_COUNT: u32 = 12;
Expand Down
Loading
Loading