Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
960 changes: 448 additions & 512 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions apps/skit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jemalloc_pprof = { version = "0.8", features = ["symbolize"], optional = true }
dhat = { version = "0.3", optional = true }

# MoQ support (optional)
moq-native = { version = "0.11.0", optional = true }
moq-native = { version = "0.12.1", optional = true }
async-trait = { workspace = true }

# For glob pattern matching in permissions
Expand All @@ -130,7 +130,7 @@ getrandom = "0.3"
aws-lc-rs = "1"

# For MoQ auth path matching (optional, with moq feature)
moq-lite = { version = "0.11.0", optional = true }
moq-lite = { version = "0.13.0", optional = true }
blake2 = "0.10.6"

[features]
Expand Down
6 changes: 3 additions & 3 deletions apps/skit/src/moq_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl MoqGateway {
#[allow(clippy::cognitive_complexity)]
pub async fn accept_connection(
&self,
session: moq_native::web_transport_quinn::Session,
request: moq_native::Request,
path: String,
auth: Option<Arc<dyn streamkit_core::moq_gateway::MoqAuthChecker>>,
) -> Result<(), String> {
Expand Down Expand Up @@ -123,8 +123,8 @@ impl MoqGateway {
if let Some(connection_tx) = connection_tx {
let (response_tx, response_rx) = oneshot::channel();

// Type-erase the WebTransport session
let session_boxed: streamkit_core::moq_gateway::WebTransportSession = Box::new(session);
// Type-erase the moq-native Request
let session_boxed: streamkit_core::moq_gateway::WebTransportSession = Box::new(request);

let conn =
MoqConnection { path: path.clone(), session: session_boxed, response_tx, auth };
Expand Down
257 changes: 114 additions & 143 deletions apps/skit/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3214,17 +3214,20 @@ fn start_moq_webtransport_acceptor(
key_path = %config.server.key_path,
"Using provided TLS certificates for MoQ WebTransport"
);
ServerTlsConfig {
cert: vec![std::path::PathBuf::from(&config.server.cert_path)],
key: vec![std::path::PathBuf::from(&config.server.key_path)],
generate: vec![],
}
let mut tls = ServerTlsConfig::default();
tls.cert = vec![std::path::PathBuf::from(&config.server.cert_path)];
tls.key = vec![std::path::PathBuf::from(&config.server.key_path)];
tls
} else {
info!("Auto-generating self-signed certificate for MoQ WebTransport (14-day validity for local development)");
ServerTlsConfig { cert: vec![], key: vec![], generate: vec!["localhost".to_string()] }
let mut tls = ServerTlsConfig::default();
tls.generate = vec!["localhost".to_string()];
tls
};

let moq_config = MoqServerConfig { bind: Some(addr), tls };
let mut moq_config = MoqServerConfig::default();
moq_config.bind = Some(addr);
moq_config.tls = tls;

info!(
address = %addr,
Expand Down Expand Up @@ -3256,142 +3259,41 @@ fn start_moq_webtransport_acceptor(
let auth_state = Arc::clone(&auth_state);

tokio::spawn(async move {
match request {
moq_native::Request::WebTransport(wt_request) => {
let url = wt_request.url();
let path = url.path().to_string();

// SECURITY: Never log the full URL (may contain jwt)
debug!(path = %path, "Received WebTransport connection request");

// Validate MoQ auth if enabled
let moq_auth = if auth_state.is_enabled() {
// Extract jwt from query params
let jwt = url
.query_pairs()
.find(|(k, _)| k == "jwt")
.map(|(_, v)| v.to_string());

let Some(jwt) = jwt else {
warn!(path = %path, "MoQ auth failed: missing jwt parameter");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
};

// Validate JWT
let claims = match auth_state.validate_moq_token(&jwt) {
Ok(c) => c,
Err(e) => {
warn!(path = %path, error = %e, "MoQ JWT validation failed");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
},
};

// Check audience
if claims.aud != crate::auth::AUD_MOQ {
warn!(path = %path, expected = crate::auth::AUD_MOQ, actual = %claims.aud, "MoQ auth failed: wrong audience");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
}

let token_hash = crate::auth::hash_token(&jwt);

// Enforce "tokens we mint" policy (parity with HTTP API auth).
let metadata_store = auth_state.token_metadata_store().cloned();
let Some(metadata_store) = metadata_store else {
warn!(path = %path, "MoQ auth failed: token metadata store not available");
let _ = wt_request
.close(axum::http::StatusCode::SERVICE_UNAVAILABLE)
.await;
return;
};

let meta = match metadata_store.get(&claims.jti).await {
Ok(Some(meta)) => meta,
Ok(None) => {
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token not recognized (not minted by this server)");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
},
Err(e) => {
warn!(path = %path, error = %e, "MoQ auth failed: metadata store error");
let _ = wt_request
.close(axum::http::StatusCode::SERVICE_UNAVAILABLE)
.await;
return;
},
};

// Extra robustness: ensure the presented token matches the stored hash.
if meta.token_hash != token_hash {
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token hash mismatch");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
}

if meta.revoked {
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token revoked");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
}

// Check revocation
if auth_state.is_revoked(&token_hash) {
warn!(path = %path, "MoQ auth failed: token revoked");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
}

// Verify root matches path and reduce permissions
match crate::auth::verify_moq_token(&claims, &path) {
Ok(ctx) => Some(Arc::new(ctx)
as Arc<
dyn streamkit_core::moq_gateway::MoqAuthChecker,
>),
Err(e) => {
warn!(path = %path, error = %e, "MoQ path verification failed");
let _ = wt_request
.close(axum::http::StatusCode::UNAUTHORIZED)
.await;
return;
},
}
} else {
None
};

match wt_request.ok().await {
Ok(session) => {
if let Err(e) = gateway
.accept_connection(session, path.clone(), moq_auth)
.await
{
warn!(path = %path, error = %e, "Failed to route WebTransport connection");
}
},
Err(e) => {
warn!(path = %path, error = %e, "Failed to accept WebTransport session");
},
}
},
moq_native::Request::Quic(_quic_request) => {
debug!("Received raw QUIC connection (not WebTransport), ignoring");
},
// Extract URL data before consuming the request.
// request.url() borrows, so we copy what we need first.
let (path, jwt_param) = {
let Some(url) = request.url() else {
debug!("Received MoQ connection without URL (raw QUIC), ignoring");
return;
};
let path = url.path().to_string();
let jwt_param = url
.query_pairs()
.find(|(k, _)| k == "jwt")
.map(|(_, v)| v.to_string());
(path, jwt_param)
};

// SECURITY: Never log the full URL (may contain jwt)
debug!(path = %path, "Received MoQ connection request");

// Validate MoQ auth if enabled
let moq_auth = if auth_state.is_enabled() {
match validate_moq_auth(&auth_state, &path, jwt_param).await {
Ok(ctx) => Some(ctx),
Err(status) => {
let _ = request.reject(status).await;
return;
},
}
} else {
None
};

if let Err(e) =
gateway.accept_connection(request, path.clone(), moq_auth).await
{
warn!(path = %path, error = %e, "Failed to route MoQ connection");
}
});
}
Expand All @@ -3407,6 +3309,75 @@ fn start_moq_webtransport_acceptor(
Ok(())
}

/// Validates MoQ auth for an incoming connection, returning the auth context on success
/// or the HTTP status code to reject with on failure.
#[cfg(feature = "moq")]
async fn validate_moq_auth(
auth_state: &crate::auth::AuthState,
path: &str,
jwt_param: Option<String>,
) -> Result<Arc<dyn streamkit_core::moq_gateway::MoqAuthChecker>, axum::http::StatusCode> {
let Some(jwt) = jwt_param else {
warn!(path = %path, "MoQ auth failed: missing jwt parameter");
return Err(axum::http::StatusCode::UNAUTHORIZED);
};

// Validate JWT
let claims = auth_state.validate_moq_token(&jwt).map_err(|e| {
warn!(path = %path, error = %e, "MoQ JWT validation failed");
axum::http::StatusCode::UNAUTHORIZED
})?;

// Check audience
if claims.aud != crate::auth::AUD_MOQ {
warn!(path = %path, expected = crate::auth::AUD_MOQ, actual = %claims.aud, "MoQ auth failed: wrong audience");
return Err(axum::http::StatusCode::UNAUTHORIZED);
}

let token_hash = crate::auth::hash_token(&jwt);

// Enforce "tokens we mint" policy (parity with HTTP API auth).
let metadata_store = auth_state.token_metadata_store().cloned().ok_or_else(|| {
warn!(path = %path, "MoQ auth failed: token metadata store not available");
axum::http::StatusCode::SERVICE_UNAVAILABLE
})?;

let meta = metadata_store.get(&claims.jti).await.map_err(|e| {
warn!(path = %path, error = %e, "MoQ auth failed: metadata store error");
axum::http::StatusCode::SERVICE_UNAVAILABLE
})?;

let Some(meta) = meta else {
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token not recognized (not minted by this server)");
return Err(axum::http::StatusCode::UNAUTHORIZED);
};

// Extra robustness: ensure the presented token matches the stored hash.
if meta.token_hash != token_hash {
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token hash mismatch");
return Err(axum::http::StatusCode::UNAUTHORIZED);
}

if meta.revoked {
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token revoked");
return Err(axum::http::StatusCode::UNAUTHORIZED);
}

// Check revocation
if auth_state.is_revoked(&token_hash) {
warn!(path = %path, "MoQ auth failed: token revoked");
return Err(axum::http::StatusCode::UNAUTHORIZED);
}

// Verify root matches path and reduce permissions
crate::auth::verify_moq_token(&claims, path)
.map_err(|e| {
warn!(path = %path, error = %e, "MoQ path verification failed");
axum::http::StatusCode::UNAUTHORIZED
})
.map(|ctx| Arc::new(ctx) as Arc<dyn streamkit_core::moq_gateway::MoqAuthChecker>)
}

/// Starts the HTTP/HTTPS server and optional MoQ WebTransport acceptor.
///
/// # Errors
Expand Down
8 changes: 4 additions & 4 deletions crates/nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ url = { version = "2.5.8", optional = true, features = ["serde"] }
rquickjs = { version = "0.11", features = ["array-buffer", "futures", "loader", "parallel"], optional = true }
wildmatch = { version = "2.6", optional = true }

moq-transport = { version = "0.12.1", optional = true }
moq-native = { version = "0.11.0", optional = true }
moq-lite = { version = "0.11.0", optional = true }
hang = { version = "0.10.0", optional = true }
moq-transport = { version = "0.12.2", optional = true }
moq-native = { version = "0.12.1", optional = true }
moq-lite = { version = "0.13.0", optional = true }
hang = { version = "0.13.0", optional = true }

# For local dev, debugging moq stuff
# moq-transport = { version = "0.11.0", optional = true }
Expand Down
26 changes: 26 additions & 0 deletions crates/nodes/src/transport/moq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ use streamkit_core::{

static SHARED_INSECURE_CLIENT: OnceLock<Result<moq_native::Client, String>> = OnceLock::new();

/// Returns a cached `moq_native::Client` with TLS verification disabled.
///
/// In moq-native 0.12, publish/consume origins are set on the `Client` via builder methods
/// (`with_publish` / `with_consume`) before calling `connect()`. The cached client has
/// neither set, so callers must clone and configure it for each connection.
fn shared_insecure_client() -> Result<moq_native::Client, StreamKitError> {
let client = SHARED_INSECURE_CLIENT.get_or_init(|| {
let mut client_config = moq_native::ClientConfig::default();
Expand All @@ -46,6 +51,27 @@ fn shared_insecure_client() -> Result<moq_native::Client, StreamKitError> {
}
}

/// Serialize a catalog to JSON with `priority` fields injected into `video` and `audio`.
///
/// The published `@moq/hang` JS client (0.1.2) still requires `priority` in the catalog
/// schema, but the Rust `hang` 0.13.0 crate removed it from the structs.
/// The upstream JS source has already dropped the requirement, but a new npm release
/// hasn't been published yet. This shim keeps the two sides compatible.
pub(super) fn catalog_to_json(catalog: &hang::catalog::Catalog) -> Result<String, StreamKitError> {
let mut value = serde_json::to_value(catalog)
.map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))?;

if let Some(video) = value.get_mut("video").and_then(|v| v.as_object_mut()) {
video.entry("priority").or_insert(serde_json::json!(60));
}
if let Some(audio) = value.get_mut("audio").and_then(|v| v.as_object_mut()) {
audio.entry("priority").or_insert(serde_json::json!(80));
}

serde_json::to_string(&value)
.map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))
}

pub(super) fn redact_url_str_for_logs(raw: &str) -> String {
raw.parse::<Url>().map_or_else(
|_| raw.split(['?', '#']).next().unwrap_or(raw).to_string(),
Expand Down
Loading
Loading