diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 6af32b12..d8aad841 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -71,7 +71,8 @@ impl MiniAgent { // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. let trace_flusher = self.trace_flusher.clone(); - let trace_flusher_handle = tokio::spawn(async move { + #[allow(unused_mut)] + let mut trace_flusher_handle = tokio::spawn(async move { trace_flusher.start_trace_flusher(trace_rx).await; }); @@ -84,7 +85,8 @@ impl MiniAgent { // start our stats flusher. let stats_flusher = self.stats_flusher.clone(); let stats_config = self.config.clone(); - let stats_flusher_handle = tokio::spawn(async move { + #[allow(unused_mut)] + let mut stats_flusher_handle = tokio::spawn(async move { stats_flusher .start_stats_flusher(stats_config, stats_rx) .await; @@ -128,53 +130,73 @@ impl MiniAgent { ) }); - // Determine which transport to use based on configuration + // Determine which transports to use based on configuration #[cfg(any(all(windows, feature = "windows-pipes"), test))] let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref(); #[cfg(not(any(all(windows, feature = "windows-pipes"), test)))] let pipe_name_opt: Option<&String> = None; - if let Some(pipe_name) = pipe_name_opt { - debug!("Mini Agent started: listening on named pipe {}", pipe_name); - } else { - debug!( - "Mini Agent started: listening on port {}", - self.config.dd_apm_receiver_port - ); - } debug!( "Time taken to start the Mini Agent: {} ms", now.elapsed().as_millis() ); + // Always start TCP listener + let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); + let tcp_listener = tokio::net::TcpListener::bind(&addr).await?; + debug!( + "Mini Agent listening on TCP port {}", + self.config.dd_apm_receiver_port + ); + if let Some(pipe_name) = pipe_name_opt { - // Windows named pipe transport + // Both TCP and named pipe transports + debug!("Mini Agent also listening on named pipe {}", pipe_name); + #[cfg(all(windows, feature = "windows-pipes"))] { - Self::serve_named_pipe( - pipe_name, - service, - trace_flusher_handle, - stats_flusher_handle, - ) - .await?; + let tcp_service = service.clone(); + let pipe_service = service; + + let mut tcp_handle = + tokio::spawn(Self::serve_accept_loop_tcp(tcp_listener, tcp_service)); + + let mut pipe_handle = tokio::spawn(Self::serve_accept_loop_named_pipe( + pipe_name.clone(), + pipe_service, + )); + + // Monitor all tasks — if any critical task dies, shut down + tokio::select! { + result = &mut tcp_handle => { + error!("TCP accept loop died: {:?}", result); + return Err("TCP accept loop terminated unexpectedly".into()); + }, + result = &mut pipe_handle => { + error!("Named pipe accept loop died: {:?}", result); + return Err("Named pipe accept loop terminated unexpectedly".into()); + }, + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, + } } #[cfg(not(all(windows, feature = "windows-pipes")))] { - let _ = pipe_name; // Suppress unused variable warning + let _ = pipe_name; unreachable!( - "Named pipes are only supported on Windows with the windows-pipes feature \ - enabled, cannot use pipe: {}.", - pipe_name + "Named pipes are only supported on Windows with the windows-pipes feature enabled." ); } } else { - // TCP transport - let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port)); - let listener = tokio::net::TcpListener::bind(&addr).await?; - + // TCP-only transport Self::serve_tcp( - listener, + tcp_listener, service, trace_flusher_handle, stats_flusher_handle, @@ -191,6 +213,38 @@ impl MiniAgent { mut trace_flusher_handle: tokio::task::JoinHandle<()>, mut stats_flusher_handle: tokio::task::JoinHandle<()>, ) -> Result<(), Box> + where + S: hyper::service::Service< + hyper::Request, + Response = hyper::Response, + > + Clone + + Send + + 'static, + S::Future: Send, + S::Error: std::error::Error + Send + Sync + 'static, + { + let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp(listener, service)); + + tokio::select! { + result = &mut tcp_handle => { + error!("TCP accept loop died: {:?}", result); + return Err("TCP accept loop terminated unexpectedly".into()); + }, + result = &mut trace_flusher_handle => { + error!("Trace flusher task died: {:?}", result); + return Err("Trace flusher task terminated unexpectedly".into()); + }, + result = &mut stats_flusher_handle => { + error!("Stats flusher task died: {:?}", result); + return Err("Stats flusher task terminated unexpectedly".into()); + }, + } + } + + async fn serve_accept_loop_tcp( + listener: tokio::net::TcpListener, + service: S, + ) -> Result<(), Box> where S: hyper::service::Service< hyper::Request, @@ -218,7 +272,7 @@ impl MiniAgent { continue; } Err(e) => { - error!("Server error: {e}"); + error!("TCP server error: {e}"); return Err(e.into()); } Ok((conn, _)) => conn, @@ -234,34 +288,24 @@ impl MiniAgent { }, Ok(()) | Err(_) => continue, }, - // If there's some error in the background tasks, we can't send data - result = &mut trace_flusher_handle => { - error!("Trace flusher task died: {:?}", result); - return Err("Trace flusher task terminated unexpectedly".into()); - }, - result = &mut stats_flusher_handle => { - error!("Stats flusher task died: {:?}", result); - return Err("Stats flusher task terminated unexpectedly".into()); - }, }; let conn = hyper_util::rt::TokioIo::new(conn); let server = server.clone(); let service = service.clone(); joinset.spawn(async move { if let Err(e) = server.serve_connection(conn, service).await { - error!("Connection error: {e}"); + error!("TCP connection error: {e}"); } }); } } + /// Named pipe accept loop without flusher monitoring, for use when running alongside TCP. #[cfg(all(windows, feature = "windows-pipes"))] - async fn serve_named_pipe( - pipe_name: &str, + async fn serve_accept_loop_named_pipe( + pipe_name: String, service: S, - mut trace_flusher_handle: tokio::task::JoinHandle<()>, - mut stats_flusher_handle: tokio::task::JoinHandle<()>, - ) -> Result<(), Box> + ) -> Result<(), Box> where S: hyper::service::Service< hyper::Request, @@ -276,9 +320,7 @@ impl MiniAgent { let mut joinset = tokio::task::JoinSet::new(); loop { - // Create a new pipe instance - // pipe_name already includes \\.\pipe\ prefix from config - let pipe = match ServerOptions::new().create(pipe_name) { + let pipe = match ServerOptions::new().create(&pipe_name) { Ok(pipe) => { debug!("Created pipe server instance '{}' in byte mode", pipe_name); pipe @@ -289,7 +331,6 @@ impl MiniAgent { } }; - // Wait for client connection let conn = tokio::select! { connect_res = pipe.connect() => match connect_res { Err(e) @@ -322,24 +363,14 @@ impl MiniAgent { }, Ok(()) | Err(_) => continue, }, - // If there's some error in the background tasks, we can't send data - result = &mut trace_flusher_handle => { - error!("Trace flusher task died: {:?}", result); - return Err("Trace flusher task terminated unexpectedly".into()); - }, - result = &mut stats_flusher_handle => { - error!("Stats flusher task died: {:?}", result); - return Err("Stats flusher task terminated unexpectedly".into()); - }, }; - // Hyper http parser handles buffering pipe data let conn = hyper_util::rt::TokioIo::new(conn); let server = server.clone(); let service = service.clone(); joinset.spawn(async move { if let Err(e) = server.serve_connection(conn, service).await { - error!("Connection error: {e}"); + error!("Named pipe connection error: {e}"); } }); }