Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

214 changes: 214 additions & 0 deletions crates/ironrdp-server/src/autodetect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
//! Server-side auto-detect (RTT measurement) per [MS-RDPBCGR 2.2.14].
//!
//! The server periodically sends RTT Measure Request PDUs and records the
//! round-trip time from the client's response. Results are exposed via
//! [`AutoDetectManager::snapshot()`].
//!
//! [MS-RDPBCGR 2.2.14]: https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-rdpbcgr/dc672839-4f4e-40b1-a71c-cd6a959baa38

use std::collections::VecDeque;
use std::time::Instant;

use ironrdp_pdu::rdp::autodetect::{AutoDetectRequest, AutoDetectResponse};

/// Number of RTT samples to retain for averaging.
const RTT_WINDOW_SIZE: usize = 8;

/// Probes older than this are discarded as unresponsive.
pub(crate) const RTT_PROBE_MAX_AGE: core::time::Duration = core::time::Duration::from_secs(30);

/// Server-side auto-detect state machine.
///
/// Tracks outstanding RTT probes and computes round-trip statistics from
/// client responses. Call [`send_rtt_request()`](Self::send_rtt_request) to
/// generate a probe, then [`handle_response()`](Self::handle_response) when
/// the client replies.
pub struct AutoDetectManager {
next_sequence: u16,
pending_probes: Vec<(u16, Instant)>,
rtt_samples: VecDeque<u32>,
}

impl AutoDetectManager {
pub fn new() -> Self {
Self {
next_sequence: 0,
pending_probes: Vec::new(),
rtt_samples: VecDeque::with_capacity(RTT_WINDOW_SIZE),
}
}

/// Generate an RTT Measure Request PDU for continuous detection.
///
/// The caller must encode and send the returned [`AutoDetectRequest`] as
/// a Share Data PDU on the IO channel. Timing information is tracked
/// internally by [`AutoDetectManager`].
pub fn send_rtt_request(&mut self) -> AutoDetectRequest {
let seq = self.next_sequence;
self.next_sequence = seq.wrapping_add(1);
self.pending_probes.push((seq, Instant::now()));
AutoDetectRequest::rtt_continuous(seq)
}

/// Process an RTT Measure Response from the client.
///
/// Returns the measured RTT in milliseconds if the sequence number
/// matches an outstanding probe, or `None` if it was unexpected.
#[expect(
clippy::as_conversions,
clippy::cast_possible_truncation,
reason = "RTT in ms fits in u32 for any plausible network latency"
)]
pub fn handle_response(&mut self, response: &AutoDetectResponse) -> Option<u32> {
let AutoDetectResponse::RttResponse { sequence_number } = response else {
return None;
};

let idx = self.pending_probes.iter().position(|(s, _)| *s == *sequence_number)?;
let (_, sent_at) = self.pending_probes.remove(idx);

let rtt_ms = sent_at.elapsed().as_millis() as u32;

if self.rtt_samples.len() >= RTT_WINDOW_SIZE {
self.rtt_samples.pop_front();
}
self.rtt_samples.push_back(rtt_ms);

Some(rtt_ms)
}

/// Get current RTT statistics, or `None` if no measurements yet.
pub fn snapshot(&self) -> Option<RttSnapshot> {
if self.rtt_samples.is_empty() {
return None;
}

let min = *self.rtt_samples.iter().min().unwrap_or(&0);
let max = *self.rtt_samples.iter().max().unwrap_or(&0);
let sum: u64 = self.rtt_samples.iter().map(|&v| u64::from(v)).sum();
#[expect(
clippy::as_conversions,
clippy::cast_possible_truncation,
reason = "average of u32 samples fits in u32"
)]
let avg = (sum / self.rtt_samples.len() as u64) as u32;

Some(RttSnapshot {
min_ms: min,
max_ms: max,
avg_ms: avg,
sample_count: self.rtt_samples.len(),
})
}

/// Number of outstanding probes awaiting response.
pub fn pending_count(&self) -> usize {
self.pending_probes.len()
}

/// Discard probes older than the given threshold to prevent unbounded growth.
pub fn expire_stale_probes(&mut self, max_age: core::time::Duration) {
self.pending_probes.retain(|(_, sent_at)| sent_at.elapsed() < max_age);
}
}

impl Default for AutoDetectManager {
fn default() -> Self {
Self::new()
}
}

/// Snapshot of RTT measurement results.
#[derive(Debug, Clone, Copy)]
pub struct RttSnapshot {
/// Minimum observed RTT in milliseconds.
pub min_ms: u32,
/// Maximum observed RTT in milliseconds.
pub max_ms: u32,
/// Average RTT in milliseconds over the sample window.
pub avg_ms: u32,
/// Number of samples in the current window.
pub sample_count: usize,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn rtt_request_increments_sequence() {
let mut mgr = AutoDetectManager::new();
let req1 = mgr.send_rtt_request();
let req2 = mgr.send_rtt_request();
assert_eq!(req1.sequence_number(), 0);
assert_eq!(req2.sequence_number(), 1);
assert_eq!(mgr.pending_count(), 2);
}

#[test]
fn rtt_response_computes_latency() {
let mut mgr = AutoDetectManager::new();
let req = mgr.send_rtt_request();

let response = AutoDetectResponse::RttResponse {
sequence_number: req.sequence_number(),
};
let rtt = mgr.handle_response(&response);
assert!(rtt.is_some(), "should match the outstanding probe");
assert_eq!(mgr.pending_count(), 0);
}

#[test]
fn unknown_sequence_returns_none() {
let mut mgr = AutoDetectManager::new();
let _ = mgr.send_rtt_request();

let response = AutoDetectResponse::RttResponse { sequence_number: 999 };
assert!(mgr.handle_response(&response).is_none());
assert_eq!(mgr.pending_count(), 1, "original probe should remain");
}

#[test]
fn snapshot_returns_none_without_data() {
let mgr = AutoDetectManager::new();
assert!(mgr.snapshot().is_none());
}

#[test]
fn snapshot_reflects_measurements() {
let mut mgr = AutoDetectManager::new();

for _ in 0..3 {
let req = mgr.send_rtt_request();
let response = AutoDetectResponse::RttResponse {
sequence_number: req.sequence_number(),
};
let _ = mgr.handle_response(&response);
}

let snap = mgr.snapshot().expect("should have data after 3 measurements");
assert_eq!(snap.sample_count, 3);
assert!(snap.avg_ms < 100);
}

#[test]
fn sequence_number_wraps() {
let mut mgr = AutoDetectManager::new();
mgr.next_sequence = u16::MAX;
let req = mgr.send_rtt_request();
assert_eq!(req.sequence_number(), u16::MAX);

let req2 = mgr.send_rtt_request();
assert_eq!(req2.sequence_number(), 0, "should wrap around");
}

#[test]
fn stale_probe_expiry() {
let mut mgr = AutoDetectManager::new();
let _ = mgr.send_rtt_request();
assert_eq!(mgr.pending_count(), 1);

mgr.expire_stale_probes(core::time::Duration::ZERO);
assert_eq!(mgr.pending_count(), 0);
}
}
1 change: 1 addition & 0 deletions crates/ironrdp-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use {tokio, tokio_rustls};

mod macros;

pub mod autodetect;
mod builder;
mod capabilities;
mod clipboard;
Expand Down
81 changes: 80 additions & 1 deletion crates/ironrdp-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio_rustls::TlsAcceptor;
use tracing::{debug, error, trace, warn};
use {ironrdp_dvc as dvc, ironrdp_rdpsnd as rdpsnd};

use crate::autodetect::{AutoDetectManager, RttSnapshot};
use crate::clipboard::CliprdrServerFactory;
use crate::display::{DisplayUpdate, RdpServerDisplay};
use crate::echo::{EchoDvcBridge, EchoServerHandle, EchoServerMessage, build_echo_request};
Expand Down Expand Up @@ -240,6 +241,7 @@ pub struct RdpServer {
ev_receiver: Arc<Mutex<mpsc::UnboundedReceiver<ServerEvent>>>,
creds: Option<Credentials>,
local_addr: Option<SocketAddr>,
autodetect: Option<AutoDetectManager>,
}

#[derive(Debug)]
Expand All @@ -252,6 +254,8 @@ pub enum ServerEvent {
GetLocalAddr(oneshot::Sender<Option<SocketAddr>>),
#[cfg(feature = "egfx")]
Egfx(EgfxServerMessage),
/// Trigger an RTT measurement probe (requires auto-detect enabled).
AutoDetectRttRequest,
}

pub trait ServerEventSender {
Expand Down Expand Up @@ -307,6 +311,7 @@ impl RdpServer {
ev_receiver: Arc::new(Mutex::new(ev_receiver)),
creds: None,
local_addr: None,
autodetect: None,
}
}

Expand All @@ -323,6 +328,26 @@ impl RdpServer {
&self.echo_handle
}

/// Enable protocol-level auto-detect ([MS-RDPBCGR 2.2.14]).
///
/// Auto-detect uses lightweight Share Data PDUs on the IO channel,
/// separate from the ECHO DVC. It supports bandwidth measurement
/// in addition to RTT and works even when DVC is unavailable.
///
/// Send probes via [`ServerEvent::AutoDetectRttRequest`] and
/// query results with [`rtt_snapshot()`](Self::rtt_snapshot).
pub fn enable_autodetect(&mut self) {
self.autodetect = Some(AutoDetectManager::new());
}

/// Get the latest auto-detect RTT snapshot.
///
/// Returns `None` if auto-detect is not enabled or no measurements
/// have been received yet.
pub fn rtt_snapshot(&self) -> Option<RttSnapshot> {
self.autodetect.as_ref().and_then(|ad| ad.snapshot())
}

/// Returns the shared EGFX server handle for proactive frame submission.
///
/// Available after `build_server_with_handle()` returns `Some` during
Expand Down Expand Up @@ -570,6 +595,7 @@ impl RdpServer {
&mut self,
events: &mut Vec<ServerEvent>,
writer: &mut impl FramedWrite,
io_channel_id: u16,
user_channel_id: u16,
) -> Result<RunState> {
// Avoid wave message queuing up and causing extra delays.
Expand Down Expand Up @@ -685,6 +711,18 @@ impl RdpServer {
writer.write_all(&data).await?;
}
},
ServerEvent::AutoDetectRttRequest => {
if let Some(ref mut ad) = self.autodetect {
ad.expire_stale_probes(crate::autodetect::RTT_PROBE_MAX_AGE);
let request = ad.send_rtt_request();
let data = encode_share_data_pdu(
rdp::headers::ShareDataPdu::AutoDetectReq(request),
io_channel_id,
user_channel_id,
)?;
writer.write_all(&data).await?;
}
Comment thread
glamberson marked this conversation as resolved.
}
}
}

Expand Down Expand Up @@ -776,7 +814,7 @@ impl RdpServer {
}
let mut this = this.lock().await;
match this
.dispatch_server_events(&mut events, &mut event_writer, user_channel_id)
.dispatch_server_events(&mut events, &mut event_writer, io_channel_id, user_channel_id)
.await?
{
RunState::Continue => continue,
Expand Down Expand Up @@ -996,6 +1034,16 @@ impl RdpServer {
return Ok(true);
}

rdp::headers::ShareDataPdu::AutoDetectRsp(response) => {
if let Some(ref mut ad) = self.autodetect {
if let Some(rtt_ms) = ad.handle_response(&response) {
debug!(rtt_ms, seq = response.sequence_number(), "RTT measured");
} else {
trace!(seq = response.sequence_number(), "Unmatched auto-detect response");
}
}
}

unexpected => {
warn!(?unexpected, "Unexpected share data pdu");
}
Expand Down Expand Up @@ -1122,6 +1170,37 @@ impl RdpServer {
}
}

/// Encode a server-initiated Share Data PDU for the IO channel.
///
/// `share_id` is hard-coded to 0, matching the existing convention in
/// `deactivate_all()`. In practice, RDP clients do not validate `share_id`
/// on server-initiated PDUs, but a future refactor could thread the
/// negotiated value from the Demand Active exchange if needed.
fn encode_share_data_pdu(
share_data_pdu: rdp::headers::ShareDataPdu,
io_channel_id: u16,
user_channel_id: u16,
) -> Result<Vec<u8>> {
let header = rdp::headers::ShareDataHeader {
share_data_pdu,
stream_priority: rdp::headers::StreamPriority::Medium,
compression_flags: rdp::headers::CompressionFlags::empty(),
compression_type: rdp::client_info::CompressionType::K8,
};
let pdu = rdp::headers::ShareControlHeader {
share_id: 0,
pdu_source: user_channel_id,
share_control_pdu: ShareControlPdu::Data(header),
Comment thread
glamberson marked this conversation as resolved.
};
let user_data = encode_vec(&pdu)?.into();
let mcs_pdu = SendDataIndication {
initiator_id: user_channel_id,
channel_id: io_channel_id,
user_data,
Comment thread
glamberson marked this conversation as resolved.
};
Ok(encode_vec(&X224(mcs_pdu))?)
}

async fn deactivate_all(
io_channel_id: u16,
user_channel_id: u16,
Expand Down
1 change: 1 addition & 0 deletions crates/ironrdp-testsuite-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ ironrdp-str.path = "../ironrdp-str"
ironrdp-input.path = "../ironrdp-input"
ironrdp-rdcleanpath.path = "../ironrdp-rdcleanpath"
ironrdp-rdpsnd.path = "../ironrdp-rdpsnd"
ironrdp-server.path = "../ironrdp-server"
ironrdp-svc.path = "../ironrdp-svc"
ironrdp-session = { path = "../ironrdp-session", features = ["qoi"] }
ironrdp-propertyset.path = "../ironrdp-propertyset"
Expand Down
Loading
Loading