From 11ebe02b2c2ab7126d95f76fcfbea95b68e69bdc Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 6 Mar 2026 10:15:49 -0500 Subject: [PATCH 1/3] Listen on both TCP and named pipes when pipe name is configured MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a named pipe name is configured, the mini agent now starts both a TCP listener and a named pipe listener concurrently. This ensures backwards compatibility — older tracers that only support TCP can still communicate with the compat binary, while new tracers use named pipes for multi-function isolation. When no pipe name is configured, behavior is unchanged (TCP only). Co-Authored-By: Claude Opus 4.6 --- crates/datadog-trace-agent/src/mini_agent.rs | 211 ++++++++++++++++--- 1 file changed, 185 insertions(+), 26 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 6af32b12..bb59b62a 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -128,53 +128,75 @@ impl MiniAgent { ) }); - // Determine which transport to use based on configuration + // Determine which transports to use based on configuration #[cfg(any(all(windows, feature = "windows-pipes"), test))] let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref(); #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] let pipe_name_opt: Option<&String> = None; - if let Some(pipe_name) = pipe_name_opt { - debug!("Mini Agent started: listening on named pipe {}", pipe_name); - } else { - debug!( - "Mini Agent started: listening on port {}", - self.config.dd_apm_receiver_port - ); - } debug!( "Time taken to start the Mini Agent: {} ms", now.elapsed().as_millis() ); + // Always start TCP listener + let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); + let tcp_listener = tokio::net::TcpListener::bind(&addr).await?; + debug!( + "Mini Agent listening on TCP port {}", + self.config.dd_apm_receiver_port + ); + if let Some(pipe_name) = pipe_name_opt { - // Windows named pipe transport + // Both TCP and named pipe transports + debug!("Mini Agent also listening on named pipe {}", pipe_name); + #[cfg(all(windows, feature = "windows-pipes"))] { - Self::serve_named_pipe( - pipe_name, - service, - trace_flusher_handle, - stats_flusher_handle, - ) - .await?; + let tcp_service = service.clone(); + let pipe_service = service; + + let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp( + tcp_listener, + tcp_service, + )); + + let mut pipe_handle = tokio::spawn(Self::serve_accept_loop_named_pipe( + pipe_name.clone(), + pipe_service, + )); + + // Monitor all tasks — if any critical task dies, shut down + tokio::select! { + result = &mut tcp_handle => { + error!("TCP accept loop died: {:?}", result); + return Err("TCP accept loop terminated unexpectedly".into()); + }, + result = &mut pipe_handle => { + error!("Named pipe accept loop died: {:?}", result); + return Err("Named pipe accept loop terminated unexpectedly".into()); + }, + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, + } } #[cfg(not(all(windows, feature = "windows-pipes")))] { - let _ = pipe_name; // Suppress unused variable warning + let _ = pipe_name; unreachable!( - "Named pipes are only supported on Windows with the windows-pipes feature \ - enabled, cannot use pipe: {}.", - pipe_name + "Named pipes are only supported on Windows with the windows-pipes feature enabled." ); } } else { - // TCP transport - let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); - let listener = tokio::net::TcpListener::bind(&addr).await?; - + // TCP-only transport Self::serve_tcp( - listener, + tcp_listener, service, trace_flusher_handle, stats_flusher_handle, @@ -345,6 +367,143 @@ impl MiniAgent { } } + /// TCP accept loop without flusher monitoring, for use when running alongside named pipes. + #[cfg(any(all(windows, feature = "windows-pipes"), test))] + async fn serve_accept_loop_tcp( + listener: tokio::net::TcpListener, + service: S, + ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { + let server = hyper::server::conn::http1::Builder::new(); + let mut joinset = tokio::task::JoinSet::new(); + + loop { + let conn = tokio::select! { + con_res = listener.accept() => match con_res { + Err(e) + if matches!( + e.kind(), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionRefused + ) => + { + continue; + } + Err(e) => { + error!("TCP server error: {e}"); + return Err(e.into()); + } + Ok((conn, _)) => conn, + }, + finished = async { + match joinset.join_next().await { + Some(finished) => finished, + None => std::future::pending().await, + } + } => match finished { + Err(e) if e.is_panic() => { + std::panic::resume_unwind(e.into_panic()); + }, + Ok(()) | Err(_) => continue, + }, + }; + let conn = hyper_util::rt::TokioIo::new(conn); + let server = server.clone(); + let service = service.clone(); + joinset.spawn(async move { + if let Err(e) = server.serve_connection(conn, service).await { + error!("TCP connection error: {e}"); + } + }); + } + } + + /// Named pipe accept loop without flusher monitoring, for use when running alongside TCP. + #[cfg(all(windows, feature = "windows-pipes"))] + async fn serve_accept_loop_named_pipe( + pipe_name: String, + service: S, + ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { + let server = hyper::server::conn::http1::Builder::new(); + let mut joinset = tokio::task::JoinSet::new(); + + loop { + let pipe = match ServerOptions::new().create(&pipe_name) { + Ok(pipe) => { + debug!("Created pipe server instance '{}' in byte mode", pipe_name); + pipe + } + Err(e) => { + error!("Failed to create named pipe: {e}"); + return Err(e.into()); + } + }; + + let conn = tokio::select! { + connect_res = pipe.connect() => match connect_res { + Err(e) + if matches!( + e.kind(), + io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionRefused + ) => + { + continue; + } + Err(e) => { + error!("Named pipe connection error: {e}"); + return Err(e.into()); + } + Ok(()) => { + debug!("Client connected to '{}'", pipe_name); + pipe + } + }, + finished = async { + match joinset.join_next().await { + Some(finished) => finished, + None => std::future::pending().await, + } + } => match finished { + Err(e) if e.is_panic() => { + std::panic::resume_unwind(e.into_panic()); + }, + Ok(()) | Err(_) => continue, + }, + }; + + let conn = hyper_util::rt::TokioIo::new(conn); + let server = server.clone(); + let service = service.clone(); + joinset.spawn(async move { + if let Err(e) = server.serve_connection(conn, service).await { + error!("Named pipe connection error: {e}"); + } + }); + } + } + #[allow(clippy::too_many_arguments)] async fn trace_endpoint_handler( config: Arc, From 131147e79ef5399ad5802be7d64e92bb000cf575 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 6 Mar 2026 10:40:59 -0500 Subject: [PATCH 2/3] Deduplicate accept loop code in serve_tcp and remove dead serve_named_pipe serve_tcp now delegates to serve_accept_loop_tcp instead of duplicating the accept loop logic. serve_named_pipe is removed as it was replaced by serve_accept_loop_named_pipe in the prior commit. Co-Authored-By: Claude Opus 4.6 --- crates/datadog-trace-agent/src/mini_agent.rs | 162 ++----------------- 1 file changed, 16 insertions(+), 146 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index bb59b62a..2d9f65df 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -156,10 +156,8 @@ impl MiniAgent { let tcp_service = service.clone(); let pipe_service = service; - let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp( - tcp_listener, - tcp_service, - )); + let mut tcp_handle = + tokio::spawn(Self::serve_accept_loop_tcp(tcp_listener, tcp_service)); let mut pipe_handle = tokio::spawn(Self::serve_accept_loop_named_pipe( pipe_name.clone(), @@ -223,152 +221,24 @@ impl MiniAgent { S::Future: Send, S::Error: std::error::Error + Send + Sync + 'static, { - let server = hyper::server::conn::http1::Builder::new(); - let mut joinset = tokio::task::JoinSet::new(); - - loop { - let conn = tokio::select! { - con_res = listener.accept() => match con_res { - Err(e) - if matches!( - e.kind(), - io::ErrorKind::ConnectionAborted - | io::ErrorKind::ConnectionReset - | io::ErrorKind::ConnectionRefused - ) => - { - continue; - } - Err(e) => { - error!("Server error: {e}"); - return Err(e.into()); - } - Ok((conn, _)) => conn, - }, - finished = async { - match joinset.join_next().await { - Some(finished) => finished, - None => std::future::pending().await, - } - } => match finished { - Err(e) if e.is_panic() => { - std::panic::resume_unwind(e.into_panic()); - }, - Ok(()) | Err(_) => continue, - }, - // If there's some error in the background tasks, we can't send data - result = &mut trace_flusher_handle => { - error!("Trace flusher task died: {:?}", result); - return Err("Trace flusher task terminated unexpectedly".into()); - }, - result = &mut stats_flusher_handle => { - error!("Stats flusher task died: {:?}", result); - return Err("Stats flusher task terminated unexpectedly".into()); - }, - }; - let conn = hyper_util::rt::TokioIo::new(conn); - let server = server.clone(); - let service = service.clone(); - joinset.spawn(async move { - if let Err(e) = server.serve_connection(conn, service).await { - error!("Connection error: {e}"); - } - }); - } - } - - #[cfg(all(windows, feature = "windows-pipes"))] - async fn serve_named_pipe( - pipe_name: &str, - service: S, - mut trace_flusher_handle: tokio::task::JoinHandle<()>, - mut stats_flusher_handle: tokio::task::JoinHandle<()>, - ) -> Result<(), Box> - where - S: hyper::service::Service< - hyper::Request, - Response = hyper::Response, - > + Clone - + Send - + 'static, - S::Future: Send, - S::Error: std::error::Error + Send + Sync + 'static, - { - let server = hyper::server::conn::http1::Builder::new(); - let mut joinset = tokio::task::JoinSet::new(); - - loop { - // Create a new pipe instance - // pipe_name already includes \\.\pipe\ prefix from config - let pipe = match ServerOptions::new().create(pipe_name) { - Ok(pipe) => { - debug!("Created pipe server instance '{}' in byte mode", pipe_name); - pipe - } - Err(e) => { - error!("Failed to create named pipe: {e}"); - return Err(e.into()); - } - }; - - // Wait for client connection - let conn = tokio::select! { - connect_res = pipe.connect() => match connect_res { - Err(e) - if matches!( - e.kind(), - io::ErrorKind::ConnectionAborted - | io::ErrorKind::ConnectionReset - | io::ErrorKind::ConnectionRefused - ) => - { - continue; - } - Err(e) => { - error!("Named pipe connection error: {e}"); - return Err(e.into()); - } - Ok(()) => { - debug!("Client connected to '{}'", pipe_name); - pipe - } - }, - finished = async { - match joinset.join_next().await { - Some(finished) => finished, - None => std::future::pending().await, - } - } => match finished { - Err(e) if e.is_panic() => { - std::panic::resume_unwind(e.into_panic()); - }, - Ok(()) | Err(_) => continue, - }, - // If there's some error in the background tasks, we can't send data - result = &mut trace_flusher_handle => { - error!("Trace flusher task died: {:?}", result); - return Err("Trace flusher task terminated unexpectedly".into()); - }, - result = &mut stats_flusher_handle => { - error!("Stats flusher task died: {:?}", result); - return Err("Stats flusher task terminated unexpectedly".into()); - }, - }; + let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp(listener, service)); - // Hyper http parser handles buffering pipe data - let conn = hyper_util::rt::TokioIo::new(conn); - let server = server.clone(); - let service = service.clone(); - joinset.spawn(async move { - if let Err(e) = server.serve_connection(conn, service).await { - error!("Connection error: {e}"); - } - }); + tokio::select! { + result = &mut tcp_handle => { + error!("TCP accept loop died: {:?}", result); + return Err("TCP accept loop terminated unexpectedly".into()); + }, + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, } } - /// TCP accept loop without flusher monitoring, for use when running alongside named pipes. - #[cfg(any(all(windows, feature = "windows-pipes"), test))] async fn serve_accept_loop_tcp( listener: tokio::net::TcpListener, service: S, From 2c71ea0742436904e6d1b3aea04625dc0227fbd1 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 6 Mar 2026 10:58:46 -0500 Subject: [PATCH 3/3] Fix Windows build: add mut to flusher handles with allow(unused_mut) On Windows, the flusher handles are borrowed as &mut in the tokio::select! block when both TCP and named pipe transports are active. On other platforms this branch is dead code, so allow(unused_mut) suppresses the warning. Co-Authored-By: Claude Opus 4.6 --- crates/datadog-trace-agent/src/mini_agent.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 2d9f65df..d8aad841 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -71,7 +71,8 @@ impl MiniAgent { // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. let trace_flusher = self.trace_flusher.clone(); - let trace_flusher_handle = tokio::spawn(async move { + #[allow(unused_mut)] + let mut trace_flusher_handle = tokio::spawn(async move { trace_flusher.start_trace_flusher(trace_rx).await; }); @@ -84,7 +85,8 @@ impl MiniAgent { // start our stats flusher. let stats_flusher = self.stats_flusher.clone(); let stats_config = self.config.clone(); - let stats_flusher_handle = tokio::spawn(async move { + #[allow(unused_mut)] + let mut stats_flusher_handle = tokio::spawn(async move { stats_flusher .start_stats_flusher(stats_config, stats_rx) .await;