From f483343b9fd491b53953c0f4e4042fe9853588e2 Mon Sep 17 00:00:00 2001 From: streamer45 Date: Sat, 7 Feb 2026 11:38:16 +0100 Subject: [PATCH 1/5] chore: update moq --- Cargo.lock | 39 ++-- apps/skit/Cargo.toml | 4 +- apps/skit/src/moq_gateway.rs | 6 +- apps/skit/src/server.rs | 257 +++++++++++-------------- crates/nodes/Cargo.toml | 8 +- crates/nodes/src/transport/moq/mod.rs | 5 + crates/nodes/src/transport/moq/peer.rs | 67 +++---- crates/nodes/src/transport/moq/pull.rs | 14 +- crates/nodes/src/transport/moq/push.rs | 17 +- ui/bun.lock | 12 +- ui/package.json | 2 +- 11 files changed, 209 insertions(+), 222 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58516420..d582bb92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1690,9 +1690,9 @@ checksum = "ab2f85be813ce08f0569fcd816f256c6f4287c975d69a9a14ceacc309f1de967" [[package]] name = "hang" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48979817ce00fbfeb6e65787fbb5b540ed221566272f85730dfde142aec6a0dc" +checksum = "be0cb0bfb07c0d8d184a5553fbe19f512957597d09f8a99266453c85f350b37b" dependencies = [ "anyhow", "buf-list", @@ -1898,7 +1898,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.4", ] [[package]] @@ -2524,9 +2524,9 @@ dependencies = [ [[package]] name = "moq-lite" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25d751c1058cd3293fc22633ec84c6e03a5d31dd33e2fa4c700b01094384ac00" +checksum = "bac018460d2ad6ac4f17df4a9388757ba4053a9c2c229379ad7d9415902250b5" dependencies = [ "async-channel", "bytes", @@ -2544,9 +2544,9 @@ dependencies = [ [[package]] name = "moq-native" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a8d037495f71647c80e1c69a39f118f0647bbcb6ac1d6a60cc79d45b0ea863f" +checksum = "b31ad8aab5956298fa8d9e798a0025bdcfc180bacf4333f2c7371dc988ec4f85" dependencies = [ "anyhow", "clap", @@ -2577,9 +2577,9 @@ dependencies = [ [[package]] name = "moq-transport" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49db4a29e1c6eafe9455a23df6a510310415349038b1b30fc6bff6f9d2a7a853" +checksum = "50d2f896962af0634a5b71f274a07590fbbe21f30c89d986066479078644b477" dependencies = [ "bytes", "futures", @@ -2596,9 +2596,9 @@ dependencies = [ [[package]] name = "mp4-atom" -version = "0.9.2" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b9fcf396d53fdf1c43a9afd38953412b9d782d11391807b473927317bb28f9" +checksum = "5e8e949244bbd26ea7eb6d936af3a6a0202be68bcfc9afce700f3c9026860ff7" dependencies = [ "bytes", "derive_more", @@ -3743,7 +3743,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 1.0.4", ] [[package]] @@ -5317,8 +5317,12 @@ checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" dependencies = [ "futures-util", "log", + "rustls", + "rustls-pki-types", "tokio", + "tokio-rustls", "tungstenite 0.24.0", + "webpki-roots 0.26.11", ] [[package]] @@ -5713,6 +5717,8 @@ dependencies = [ "httparse", "log", "rand 0.8.5", + "rustls", + "rustls-pki-types", "sha1", "thiserror 1.0.69", "utf-8", @@ -6542,6 +6548,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.4", +] + [[package]] name = "webpki-roots" version = "1.0.4" diff --git a/apps/skit/Cargo.toml b/apps/skit/Cargo.toml index 8dca0def..a1e83b2a 100644 --- a/apps/skit/Cargo.toml +++ b/apps/skit/Cargo.toml @@ -115,7 +115,7 @@ jemalloc_pprof = { version = "0.8", features = ["symbolize"], optional = true } dhat = { version = "0.3", optional = true } # MoQ support (optional) -moq-native = { version = "0.11.0", optional = true } +moq-native = { version = "0.12.0", optional = true } async-trait = { workspace = true } # For glob pattern matching in permissions @@ -130,7 +130,7 @@ getrandom = "0.3" aws-lc-rs = "1" # For MoQ auth path matching (optional, with moq feature) -moq-lite = { version = "0.11.0", optional = true } +moq-lite = { version = "0.12.0", optional = true } blake2 = "0.10.6" [features] diff --git a/apps/skit/src/moq_gateway.rs b/apps/skit/src/moq_gateway.rs index a81a78cf..f9c433dd 100644 --- a/apps/skit/src/moq_gateway.rs +++ b/apps/skit/src/moq_gateway.rs @@ -84,7 +84,7 @@ impl MoqGateway { #[allow(clippy::cognitive_complexity)] pub async fn accept_connection( &self, - session: moq_native::web_transport_quinn::Session, + request: moq_native::Request, path: String, auth: Option>, ) -> Result<(), String> { @@ -123,8 +123,8 @@ impl MoqGateway { if let Some(connection_tx) = connection_tx { let (response_tx, response_rx) = oneshot::channel(); - // Type-erase the WebTransport session - let session_boxed: streamkit_core::moq_gateway::WebTransportSession = Box::new(session); + // Type-erase the moq-native Request + let session_boxed: streamkit_core::moq_gateway::WebTransportSession = Box::new(request); let conn = MoqConnection { path: path.clone(), session: session_boxed, response_tx, auth }; diff --git a/apps/skit/src/server.rs b/apps/skit/src/server.rs index d3e4ab98..f914046e 100644 --- a/apps/skit/src/server.rs +++ b/apps/skit/src/server.rs @@ -3214,17 +3214,20 @@ fn start_moq_webtransport_acceptor( key_path = %config.server.key_path, "Using provided TLS certificates for MoQ WebTransport" ); - ServerTlsConfig { - cert: vec![std::path::PathBuf::from(&config.server.cert_path)], - key: vec![std::path::PathBuf::from(&config.server.key_path)], - generate: vec![], - } + let mut tls = ServerTlsConfig::default(); + tls.cert = vec![std::path::PathBuf::from(&config.server.cert_path)]; + tls.key = vec![std::path::PathBuf::from(&config.server.key_path)]; + tls } else { info!("Auto-generating self-signed certificate for MoQ WebTransport (14-day validity for local development)"); - ServerTlsConfig { cert: vec![], key: vec![], generate: vec!["localhost".to_string()] } + let mut tls = ServerTlsConfig::default(); + tls.generate = vec!["localhost".to_string()]; + tls }; - let moq_config = MoqServerConfig { bind: Some(addr), tls }; + let mut moq_config = MoqServerConfig::default(); + moq_config.bind = Some(addr); + moq_config.tls = tls; info!( address = %addr, @@ -3256,142 +3259,41 @@ fn start_moq_webtransport_acceptor( let auth_state = Arc::clone(&auth_state); tokio::spawn(async move { - match request { - moq_native::Request::WebTransport(wt_request) => { - let url = wt_request.url(); - let path = url.path().to_string(); - - // SECURITY: Never log the full URL (may contain jwt) - debug!(path = %path, "Received WebTransport connection request"); - - // Validate MoQ auth if enabled - let moq_auth = if auth_state.is_enabled() { - // Extract jwt from query params - let jwt = url - .query_pairs() - .find(|(k, _)| k == "jwt") - .map(|(_, v)| v.to_string()); - - let Some(jwt) = jwt else { - warn!(path = %path, "MoQ auth failed: missing jwt parameter"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - }; - - // Validate JWT - let claims = match auth_state.validate_moq_token(&jwt) { - Ok(c) => c, - Err(e) => { - warn!(path = %path, error = %e, "MoQ JWT validation failed"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - }, - }; - - // Check audience - if claims.aud != crate::auth::AUD_MOQ { - warn!(path = %path, expected = crate::auth::AUD_MOQ, actual = %claims.aud, "MoQ auth failed: wrong audience"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - } - - let token_hash = crate::auth::hash_token(&jwt); - - // Enforce "tokens we mint" policy (parity with HTTP API auth). - let metadata_store = auth_state.token_metadata_store().cloned(); - let Some(metadata_store) = metadata_store else { - warn!(path = %path, "MoQ auth failed: token metadata store not available"); - let _ = wt_request - .close(axum::http::StatusCode::SERVICE_UNAVAILABLE) - .await; - return; - }; - - let meta = match metadata_store.get(&claims.jti).await { - Ok(Some(meta)) => meta, - Ok(None) => { - warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token not recognized (not minted by this server)"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - }, - Err(e) => { - warn!(path = %path, error = %e, "MoQ auth failed: metadata store error"); - let _ = wt_request - .close(axum::http::StatusCode::SERVICE_UNAVAILABLE) - .await; - return; - }, - }; - - // Extra robustness: ensure the presented token matches the stored hash. - if meta.token_hash != token_hash { - warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token hash mismatch"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - } - - if meta.revoked { - warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token revoked"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - } - - // Check revocation - if auth_state.is_revoked(&token_hash) { - warn!(path = %path, "MoQ auth failed: token revoked"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - } - - // Verify root matches path and reduce permissions - match crate::auth::verify_moq_token(&claims, &path) { - Ok(ctx) => Some(Arc::new(ctx) - as Arc< - dyn streamkit_core::moq_gateway::MoqAuthChecker, - >), - Err(e) => { - warn!(path = %path, error = %e, "MoQ path verification failed"); - let _ = wt_request - .close(axum::http::StatusCode::UNAUTHORIZED) - .await; - return; - }, - } - } else { - None - }; - - match wt_request.ok().await { - Ok(session) => { - if let Err(e) = gateway - .accept_connection(session, path.clone(), moq_auth) - .await - { - warn!(path = %path, error = %e, "Failed to route WebTransport connection"); - } - }, - Err(e) => { - warn!(path = %path, error = %e, "Failed to accept WebTransport session"); - }, - } - }, - moq_native::Request::Quic(_quic_request) => { - debug!("Received raw QUIC connection (not WebTransport), ignoring"); - }, + // Extract URL data before consuming the request. + // request.url() borrows, so we copy what we need first. + let (path, jwt_param) = { + let Some(url) = request.url() else { + debug!("Received MoQ connection without URL (raw QUIC), ignoring"); + return; + }; + let path = url.path().to_string(); + let jwt_param = url + .query_pairs() + .find(|(k, _)| k == "jwt") + .map(|(_, v)| v.to_string()); + (path, jwt_param) + }; + + // SECURITY: Never log the full URL (may contain jwt) + debug!(path = %path, "Received MoQ connection request"); + + // Validate MoQ auth if enabled + let moq_auth = if auth_state.is_enabled() { + match validate_moq_auth(&auth_state, &path, jwt_param).await { + Ok(ctx) => Some(ctx), + Err(status) => { + let _ = request.reject(status).await; + return; + }, + } + } else { + None + }; + + if let Err(e) = + gateway.accept_connection(request, path.clone(), moq_auth).await + { + warn!(path = %path, error = %e, "Failed to route MoQ connection"); } }); } @@ -3407,6 +3309,75 @@ fn start_moq_webtransport_acceptor( Ok(()) } +/// Validates MoQ auth for an incoming connection, returning the auth context on success +/// or the HTTP status code to reject with on failure. +#[cfg(feature = "moq")] +async fn validate_moq_auth( + auth_state: &crate::auth::AuthState, + path: &str, + jwt_param: Option, +) -> Result, axum::http::StatusCode> { + let Some(jwt) = jwt_param else { + warn!(path = %path, "MoQ auth failed: missing jwt parameter"); + return Err(axum::http::StatusCode::UNAUTHORIZED); + }; + + // Validate JWT + let claims = auth_state.validate_moq_token(&jwt).map_err(|e| { + warn!(path = %path, error = %e, "MoQ JWT validation failed"); + axum::http::StatusCode::UNAUTHORIZED + })?; + + // Check audience + if claims.aud != crate::auth::AUD_MOQ { + warn!(path = %path, expected = crate::auth::AUD_MOQ, actual = %claims.aud, "MoQ auth failed: wrong audience"); + return Err(axum::http::StatusCode::UNAUTHORIZED); + } + + let token_hash = crate::auth::hash_token(&jwt); + + // Enforce "tokens we mint" policy (parity with HTTP API auth). + let metadata_store = auth_state.token_metadata_store().cloned().ok_or_else(|| { + warn!(path = %path, "MoQ auth failed: token metadata store not available"); + axum::http::StatusCode::SERVICE_UNAVAILABLE + })?; + + let meta = metadata_store.get(&claims.jti).await.map_err(|e| { + warn!(path = %path, error = %e, "MoQ auth failed: metadata store error"); + axum::http::StatusCode::SERVICE_UNAVAILABLE + })?; + + let Some(meta) = meta else { + warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token not recognized (not minted by this server)"); + return Err(axum::http::StatusCode::UNAUTHORIZED); + }; + + // Extra robustness: ensure the presented token matches the stored hash. + if meta.token_hash != token_hash { + warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token hash mismatch"); + return Err(axum::http::StatusCode::UNAUTHORIZED); + } + + if meta.revoked { + warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token revoked"); + return Err(axum::http::StatusCode::UNAUTHORIZED); + } + + // Check revocation + if auth_state.is_revoked(&token_hash) { + warn!(path = %path, "MoQ auth failed: token revoked"); + return Err(axum::http::StatusCode::UNAUTHORIZED); + } + + // Verify root matches path and reduce permissions + crate::auth::verify_moq_token(&claims, path) + .map_err(|e| { + warn!(path = %path, error = %e, "MoQ path verification failed"); + axum::http::StatusCode::UNAUTHORIZED + }) + .map(|ctx| Arc::new(ctx) as Arc) +} + /// Starts the HTTP/HTTPS server and optional MoQ WebTransport acceptor. /// /// # Errors diff --git a/crates/nodes/Cargo.toml b/crates/nodes/Cargo.toml index e2f1ed32..63f2be00 100644 --- a/crates/nodes/Cargo.toml +++ b/crates/nodes/Cargo.toml @@ -46,10 +46,10 @@ url = { version = "2.5.8", optional = true, features = ["serde"] } rquickjs = { version = "0.11", features = ["array-buffer", "futures", "loader", "parallel"], optional = true } wildmatch = { version = "2.6", optional = true } -moq-transport = { version = "0.12.1", optional = true } -moq-native = { version = "0.11.0", optional = true } -moq-lite = { version = "0.11.0", optional = true } -hang = { version = "0.10.0", optional = true } +moq-transport = { version = "0.12.2", optional = true } +moq-native = { version = "0.12.0", optional = true } +moq-lite = { version = "0.12.0", optional = true } +hang = { version = "0.11.0", optional = true } # For local dev, debugging moq stuff # moq-transport = { version = "0.11.0", optional = true } diff --git a/crates/nodes/src/transport/moq/mod.rs b/crates/nodes/src/transport/moq/mod.rs index d8b52fa3..b225ecb2 100644 --- a/crates/nodes/src/transport/moq/mod.rs +++ b/crates/nodes/src/transport/moq/mod.rs @@ -31,6 +31,11 @@ use streamkit_core::{ static SHARED_INSECURE_CLIENT: OnceLock> = OnceLock::new(); +/// Returns a cached `moq_native::Client` with TLS verification disabled. +/// +/// In moq-native 0.12, publish/consume origins are set on the `Client` via builder methods +/// (`with_publish` / `with_consume`) before calling `connect()`. The cached client has +/// neither set, so callers must clone and configure it for each connection. fn shared_insecure_client() -> Result { let client = SHARED_INSECURE_CLIENT.get_or_init(|| { let mut client_config = moq_native::ClientConfig::default(); diff --git a/crates/nodes/src/transport/moq/peer.rs b/crates/nodes/src/transport/moq/peer.rs index abbdd59e..6f2da838 100644 --- a/crates/nodes/src/transport/moq/peer.rs +++ b/crates/nodes/src/transport/moq/peer.rs @@ -532,13 +532,11 @@ impl MoqPeerNode { ) -> Result>, StreamKitError> { let path = moq_connection.path.clone(); - // Extract the WebTransport session - let web_transport_session = *moq_connection + // Extract the moq-native Request + let request = *moq_connection .session - .downcast::() - .map_err(|_| { - StreamKitError::Runtime("Invalid WebTransport session type".to_string()) - })?; + .downcast::() + .map_err(|_| StreamKitError::Runtime("Invalid MoQ request type".to_string()))?; // Notify gateway that we accepted the connection let _ = moq_connection @@ -550,13 +548,11 @@ impl MoqPeerNode { let receive_origin = client_publish_origin.consumer.clone(); // Accept MoQ session (publisher only sends, no server publish needed) - let session = moq_lite::Session::accept( - web_transport_session, - None, // No server publish origin - publisher is receive-only - Some(client_publish_origin.producer), - ) - .await - .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; + let session = request + .with_consume(client_publish_origin.producer) + .accept() + .await + .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; let handle = tokio::spawn(async move { let _permit = permit; @@ -590,13 +586,11 @@ impl MoqPeerNode { ) -> Result, StreamKitError> { let path = moq_connection.path.clone(); - // Extract the WebTransport session - let web_transport_session = *moq_connection + // Extract the moq-native Request + let request = *moq_connection .session - .downcast::() - .map_err(|_| { - StreamKitError::Runtime("Invalid WebTransport session type".to_string()) - })?; + .downcast::() + .map_err(|_| StreamKitError::Runtime("Invalid MoQ request type".to_string()))?; // Notify gateway that we accepted the connection let _ = moq_connection @@ -610,13 +604,12 @@ impl MoqPeerNode { let client_publish_origin = moq_lite::Origin::produce(); let receive_origin = client_publish_origin.consumer.clone(); - let session = moq_lite::Session::accept( - web_transport_session, - Some(server_publish_origin.consumer), - Some(client_publish_origin.producer), - ) - .await - .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; + let session = request + .with_publish(server_publish_origin.consumer) + .with_consume(client_publish_origin.producer) + .accept() + .await + .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; let handle = tokio::spawn(async move { let mut publisher_shutdown_rx = config.shutdown_rx.resubscribe(); @@ -982,13 +975,11 @@ impl MoqPeerNode { output_initial_delay_ms: u64, stats_delta_tx: mpsc::Sender, ) -> Result, StreamKitError> { - // Extract the WebTransport session - let web_transport_session = *moq_connection + // Extract the moq-native Request + let request = *moq_connection .session - .downcast::() - .map_err(|_| { - StreamKitError::Runtime("Invalid WebTransport session type".to_string()) - })?; + .downcast::() + .map_err(|_| StreamKitError::Runtime("Invalid MoQ request type".to_string()))?; // Notify gateway that we accepted the connection let _ = moq_connection @@ -1000,13 +991,11 @@ impl MoqPeerNode { let send_origin = server_publish_origin.producer.clone(); // Accept MoQ session (subscriber only receives, no client publish needed) - let session = moq_lite::Session::accept( - web_transport_session, - Some(server_publish_origin.consumer), - None, // No client publish origin - subscriber is send-only - ) - .await - .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; + let session = request + .with_publish(server_publish_origin.consumer) + .accept() + .await + .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; let handle = tokio::spawn(async move { let result = Self::subscriber_send_loop( diff --git a/crates/nodes/src/transport/moq/pull.rs b/crates/nodes/src/transport/moq/pull.rs index 58b1f22a..ca7a0c27 100644 --- a/crates/nodes/src/transport/moq/pull.rs +++ b/crates/nodes/src/transport/moq/pull.rs @@ -296,9 +296,10 @@ impl MoqPullNode { let client = super::shared_insecure_client()?; let origin = moq_lite::Origin::produce(); - let _consumer_session = client.connect(url, None, origin.producer).await.map_err(|e| { - StreamKitError::Runtime(format!("Failed to create consumer session: {e}")) - })?; + let _consumer_session = + client.clone().with_consume(origin.producer).connect(url).await.map_err(|e| { + StreamKitError::Runtime(format!("Failed to create consumer session: {e}")) + })?; // Subscribe to the specified broadcast. // @@ -429,9 +430,10 @@ impl MoqPullNode { // Create origin for consuming broadcasts only (no publishing to avoid cycles) let origin = moq_lite::Origin::produce(); - let _consumer_session = client.connect(url, None, origin.producer).await.map_err(|e| { - StreamKitError::Runtime(format!("Failed to create consumer session: {e}")) - })?; + let _consumer_session = + client.clone().with_consume(origin.producer).connect(url).await.map_err(|e| { + StreamKitError::Runtime(format!("Failed to create consumer session: {e}")) + })?; // Wait for broadcast to become available // Note: consume_broadcast() only works after announcement, so we primarily rely on announcements diff --git a/crates/nodes/src/transport/moq/push.rs b/crates/nodes/src/transport/moq/push.rs index 13fa7628..e37738bc 100644 --- a/crates/nodes/src/transport/moq/push.rs +++ b/crates/nodes/src/transport/moq/push.rs @@ -121,14 +121,15 @@ impl ProcessorNode for MoqPushNode { }; let publisher_origin = moq_lite::Origin::produce(); - let _publisher_session = match client.connect(url, publisher_origin.consumer, None).await { - Ok(session) => session, - Err(e) => { - let err_msg = format!("Failed to create publisher session: {e}"); - state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg); - return Err(StreamKitError::Runtime(err_msg)); - }, - }; + let _publisher_session = + match client.clone().with_publish(publisher_origin.consumer).connect(url).await { + Ok(session) => session, + Err(e) => { + let err_msg = format!("Failed to create publisher session: {e}"); + state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg); + return Err(StreamKitError::Runtime(err_msg)); + }, + }; // Create a transcoded broadcast and publish it let transcoded_broadcast = moq_lite::Broadcast::produce(); diff --git a/ui/bun.lock b/ui/bun.lock index e119e48f..f9a9604c 100644 --- a/ui/bun.lock +++ b/ui/bun.lock @@ -10,7 +10,7 @@ "@emotion/styled": "^11.14.1", "@kixelated/libavjs-webcodecs-polyfill": "^0.5.5", "@libav.js/variant-opus-af": "^6.8.8", - "@moq/hang": "^0.1.1", + "@moq/hang": "^0.1.2", "@radix-ui/react-accordion": "^1.2.12", "@radix-ui/react-alert-dialog": "^1.1.15", "@radix-ui/react-checkbox": "^1.3.3", @@ -337,11 +337,11 @@ "@marijn/find-cluster-break": ["@marijn/find-cluster-break@1.0.2", "", {}, "sha512-l0h88YhZFyKdXIFNfSWpyjStDjGHwZ/U7iobcK1cQQD8sejsONdQtTVU+1wVN1PBw40PiiHB1vA5S7VTfQiP9g=="], - "@moq/hang": ["@moq/hang@0.1.1", "", { "dependencies": { "@kixelated/libavjs-webcodecs-polyfill": "^0.5.5", "@libav.js/variant-opus-af": "^6.8.8", "@moq/lite": "^0.1.1", "@moq/signals": "^0.1.0", "async-mutex": "^0.5.0", "comlink": "^4.4.2", "zod": "^4.1.5" } }, "sha512-m2gMpSVZqoiPdJJoEURnsP/BC2wwMcST5ywRHC2r4WReJd1rWts8C/Qc9CgR/jzfZUJyZNYCtuXVvo0H5WBBiA=="], + "@moq/hang": ["@moq/hang@0.1.2", "", { "dependencies": { "@kixelated/libavjs-webcodecs-polyfill": "^0.5.5", "@libav.js/variant-opus-af": "^6.8.8", "@moq/lite": "^0.1.2", "@moq/signals": "^0.1.1", "@svta/cml-iso-bmff": "^1.0.0-alpha.9", "async-mutex": "^0.5.0", "comlink": "^4.4.2", "zod": "^4.1.5" } }, "sha512-OGwHifmIsko7K0Yzms5kdo9tEgBkx3cC0XAyPuGtWLPHea8OgvshEN/7S6iETz+ELrHIIuAkjC6tWHH925GH+A=="], - "@moq/lite": ["@moq/lite@0.1.1", "", { "dependencies": { "@moq/signals": "^0.1.0", "@moq/web-transport-ws": "^0.1.2", "async-mutex": "^0.5.0" }, "peerDependencies": { "zod": "^4.1.0" } }, "sha512-72zQoc8yrk4W9Y8WZK1aAMnS+7J8+ZIIiON+jhaZ/ifNrcabyQrnSOY8SiEz7vPrvAuIcyOylza8T9R6qRfQbw=="], + "@moq/lite": ["@moq/lite@0.1.2", "", { "dependencies": { "@moq/signals": "^0.1.1", "@moq/web-transport-ws": "^0.1.2", "async-mutex": "^0.5.0" }, "peerDependencies": { "zod": "^4.1.0" } }, "sha512-duz27+uFI4jxCe/w42rxlBwBIQg9/u/lFFqTplhCAZ5kyhkiokqacpsvD5OUA+j5OVLepnYZpyTYRqAzanL37g=="], - "@moq/signals": ["@moq/signals@0.1.0", "", { "dependencies": { "dequal": "^2.0.3" }, "peerDependencies": { "@types/react": "^19.1.8", "react": "^19.0.0", "solid-js": "^1.9.7" }, "optionalPeers": ["react", "solid-js"] }, "sha512-tXUJqiC1AHzGnDY0QfFlvO5yqryFyqmpz+YjJslqQwGKsArcWuRkQA0jeRL7qLpkRGzZOSBCXCZByeLTahuLZA=="], + "@moq/signals": ["@moq/signals@0.1.2", "", { "dependencies": { "dequal": "^2.0.3" }, "peerDependencies": { "@types/react": "^19.1.8", "react": "^19.0.0", "solid-js": "^1.9.7" }, "optionalPeers": ["react", "solid-js"] }, "sha512-mzDk7KvQLSU6ZjZp9jrK2ZRlq07zp+LFZvS6AbKFyEqOxawwsy4tRAPdK9zmFCfIupjqwia42Pq9upo/RgjrMA=="], "@moq/web-transport-ws": ["@moq/web-transport-ws@0.1.2", "", {}, "sha512-mYha+AkLNPT3uOGnTA5YWjpxc9LO/yriFSoWzKkR0zN3UMZb9RXbsD8Gbhg1pJZod6QD4tevHoOWTBADYN7yAQ=="], @@ -481,6 +481,10 @@ "@standard-schema/spec": ["@standard-schema/spec@1.0.0", "", {}, "sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA=="], + "@svta/cml-iso-bmff": ["@svta/cml-iso-bmff@1.0.0-beta.1", "", { "peerDependencies": { "@svta/cml-utils": "1.1.0" } }, "sha512-vacnGPHyg7Llz6pjuDxY10gzCqk4OOO0elPSio+VFokc1jdm5vraUKXNu9bzJftGf4FKJ2qUZJ2HguOSmywtZA=="], + + "@svta/cml-utils": ["@svta/cml-utils@1.1.0", "", {}, "sha512-5RyHD75RYbq0clUkb/L/+JklxAq+PZRAwKZTcmqUt/ciHm79HBq0/IgrDXYvTgIRGRv8gE4GNvUWQbvRZRxZpA=="], + "@tanstack/query-core": ["@tanstack/query-core@5.90.18", "", {}, "sha512-rbGx6bHgPNVzutP7BEr+53UPKohpckqlMAad+To9UxTbeaQ+kC/1SDRj+QzkwbQ7qhLT/1IKp34yS6thda6fzA=="], "@tanstack/react-query": ["@tanstack/react-query@5.90.18", "", { "dependencies": { "@tanstack/query-core": "5.90.18" }, "peerDependencies": { "react": "^18 || ^19" } }, "sha512-KqNZX0C5IFz4639zR1ilnQ288tQdJrMNLtzmlzyJ14xauBkhtLEy3mPU/V4KiHsr41eL1ILZbDP36TB12lYfCQ=="], diff --git a/ui/package.json b/ui/package.json index c9471a70..771b68c7 100644 --- a/ui/package.json +++ b/ui/package.json @@ -54,7 +54,7 @@ "@emotion/styled": "^11.14.1", "@kixelated/libavjs-webcodecs-polyfill": "^0.5.5", "@libav.js/variant-opus-af": "^6.8.8", - "@moq/hang": "^0.1.1", + "@moq/hang": "^0.1.2", "@radix-ui/react-accordion": "^1.2.12", "@radix-ui/react-alert-dialog": "^1.1.15", "@radix-ui/react-checkbox": "^1.3.3", From 9a43fb027e44804e2cb6f0b4860d9901c7c84833 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 7 Feb 2026 14:30:36 +0000 Subject: [PATCH 2/5] chore: update MoQ dependencies to latest versions Co-Authored-By: Claudio Costa --- Cargo.lock | 153 +------------------------ apps/skit/Cargo.toml | 4 +- crates/nodes/Cargo.toml | 6 +- crates/nodes/src/transport/moq/peer.rs | 52 +++++---- crates/nodes/src/transport/moq/pull.rs | 45 ++++---- crates/nodes/src/transport/moq/push.rs | 37 +++--- 6 files changed, 80 insertions(+), 217 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d582bb92..704c3021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,19 +213,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-compression" -version = "0.4.36" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98ec5f6c2f8bc326c994cb9e241cc257ddaba9afa8555a43cffbb5dd86efaa37" -dependencies = [ - "compression-codecs", - "compression-core", - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "async-lock" version = "3.4.1" @@ -519,18 +506,6 @@ name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" -dependencies = [ - "serde", -] - -[[package]] -name = "bytestring" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "113b4343b5f6617e7ad401ced8de3cc8b012e73a594347c307b90db3e9271289" -dependencies = [ - "bytes", -] [[package]] name = "bzip2" @@ -754,23 +729,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "compression-codecs" -version = "0.4.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0f7ac3e5b97fdce45e8922fb05cae2c37f7bbd63d30dd94821dacfd8f3f2bf2" -dependencies = [ - "compression-core", - "flate2", - "memchr", -] - -[[package]] -name = "compression-core" -version = "0.4.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1682,33 +1640,20 @@ dependencies = [ "tracing", ] -[[package]] -name = "h264-parser" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2f85be813ce08f0569fcd816f256c6f4287c975d69a9a14ceacc309f1de967" - [[package]] name = "hang" -version = "0.11.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be0cb0bfb07c0d8d184a5553fbe19f512957597d09f8a99266453c85f350b37b" +checksum = "c05205d948a355a7b260b82a652d64bc502a4ae42d523e25a6931220fdd5f0a0" dependencies = [ - "anyhow", "buf-list", "bytes", "derive_more", "futures", - "h264-parser", "hex", "lazy_static", - "m3u8-rs", "moq-lite", - "mp4-atom", - "num_enum", "regex", - "reqwest 0.12.26", - "scuffle-h265", "serde", "serde_json", "serde_with", @@ -1898,7 +1843,6 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots 1.0.4", ] [[package]] @@ -2396,16 +2340,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" -[[package]] -name = "m3u8-rs" -version = "5.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1d7ba86f7ea62f17f4310c55e93244619ddc7dadfc7e565de1967e4e41e6e7" -dependencies = [ - "chrono", - "nom", -] - [[package]] name = "mach2" version = "0.4.3" @@ -2524,9 +2458,9 @@ dependencies = [ [[package]] name = "moq-lite" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bac018460d2ad6ac4f17df4a9388757ba4053a9c2c229379ad7d9415902250b5" +checksum = "4462d2c8aabd99cd8be3f6704ff92b37446cdd5c576997cae5d74fda41705834" dependencies = [ "async-channel", "bytes", @@ -2544,9 +2478,9 @@ dependencies = [ [[package]] name = "moq-native" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b31ad8aab5956298fa8d9e798a0025bdcfc180bacf4333f2c7371dc988ec4f85" +checksum = "9d5a0aa1d9d584fd867f8e107d7e487cccc9eef47f27648baf2ebc3731c75706" dependencies = [ "anyhow", "clap", @@ -2594,22 +2528,6 @@ dependencies = [ "web-transport", ] -[[package]] -name = "mp4-atom" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e8e949244bbd26ea7eb6d936af3a6a0202be68bcfc9afce700f3c9026860ff7" -dependencies = [ - "bytes", - "derive_more", - "num", - "paste", - "serde", - "thiserror 1.0.69", - "tokio", - "tracing", -] - [[package]] name = "multer" version = "3.1.0" @@ -2827,15 +2745,6 @@ dependencies = [ "syn", ] -[[package]] -name = "nutype-enum" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e13adea6de269faa0724df58f43f6fe2a81af7094f1dcb8b5b968eb2103cb3" -dependencies = [ - "scuffle-workspace-hack", -] - [[package]] name = "objc2-core-foundation" version = "0.3.2" @@ -3721,21 +3630,16 @@ dependencies = [ "http-body", "http-body-util", "hyper", - "hyper-rustls", "hyper-util", "js-sys", "log", "percent-encoding", "pin-project-lite", - "quinn", - "rustls", - "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-rustls", "tower", "tower-http", "tower-service", @@ -3743,7 +3647,6 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots 1.0.4", ] [[package]] @@ -4181,49 +4084,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "scuffle-bytes-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0417748c2a42f4a08d4e634b68b1d64f22a8c24bef2e7ac93df33aa61202a45b" -dependencies = [ - "byteorder", - "bytes", - "bytestring", - "scuffle-workspace-hack", -] - -[[package]] -name = "scuffle-expgolomb" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48d21330974c941e4c0aedc1e7255ea809e8cbac51e135209f6d67843ad1b94d" -dependencies = [ - "scuffle-bytes-util", - "scuffle-workspace-hack", -] - -[[package]] -name = "scuffle-h265" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b04b276c2f79846b7968abe6f87cedf951e06fd2a2b72d99c457e85d7e40f3fb" -dependencies = [ - "bitflags 2.10.0", - "byteorder", - "bytes", - "nutype-enum", - "scuffle-bytes-util", - "scuffle-expgolomb", - "scuffle-workspace-hack", -] - -[[package]] -name = "scuffle-workspace-hack" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8028ded836a0d9fabdfa4d713389b76a2098b5153f50a135c8faed7e3a3d5ae2" - [[package]] name = "security-framework" version = "2.11.1" @@ -5524,7 +5384,6 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "async-compression", "bitflags 2.10.0", "bytes", "futures-core", diff --git a/apps/skit/Cargo.toml b/apps/skit/Cargo.toml index a1e83b2a..67b4a14b 100644 --- a/apps/skit/Cargo.toml +++ b/apps/skit/Cargo.toml @@ -115,7 +115,7 @@ jemalloc_pprof = { version = "0.8", features = ["symbolize"], optional = true } dhat = { version = "0.3", optional = true } # MoQ support (optional) -moq-native = { version = "0.12.0", optional = true } +moq-native = { version = "0.12.1", optional = true } async-trait = { workspace = true } # For glob pattern matching in permissions @@ -130,7 +130,7 @@ getrandom = "0.3" aws-lc-rs = "1" # For MoQ auth path matching (optional, with moq feature) -moq-lite = { version = "0.12.0", optional = true } +moq-lite = { version = "0.13.0", optional = true } blake2 = "0.10.6" [features] diff --git a/crates/nodes/Cargo.toml b/crates/nodes/Cargo.toml index 63f2be00..c6eb5944 100644 --- a/crates/nodes/Cargo.toml +++ b/crates/nodes/Cargo.toml @@ -47,9 +47,9 @@ rquickjs = { version = "0.11", features = ["array-buffer", "futures", "loader", wildmatch = { version = "2.6", optional = true } moq-transport = { version = "0.12.2", optional = true } -moq-native = { version = "0.12.0", optional = true } -moq-lite = { version = "0.12.0", optional = true } -hang = { version = "0.11.0", optional = true } +moq-native = { version = "0.12.1", optional = true } +moq-lite = { version = "0.13.0", optional = true } +hang = { version = "0.13.0", optional = true } # For local dev, debugging moq stuff # moq-transport = { version = "0.11.0", optional = true } diff --git a/crates/nodes/src/transport/moq/peer.rs b/crates/nodes/src/transport/moq/peer.rs index 6f2da838..1aa50c0a 100644 --- a/crates/nodes/src/transport/moq/peer.rs +++ b/crates/nodes/src/transport/moq/peer.rs @@ -545,11 +545,11 @@ impl MoqPeerNode { // Create origin for receiving from client let client_publish_origin = moq_lite::Origin::produce(); - let receive_origin = client_publish_origin.consumer.clone(); + let receive_origin = client_publish_origin.consume(); // Accept MoQ session (publisher only sends, no server publish needed) let session = request - .with_consume(client_publish_origin.producer) + .with_consume(client_publish_origin) .accept() .await .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; @@ -599,14 +599,14 @@ impl MoqPeerNode { // Create origins for full bidirectional MoQ let server_publish_origin = moq_lite::Origin::produce(); - let send_origin = server_publish_origin.producer.clone(); + let send_origin = server_publish_origin.clone(); let client_publish_origin = moq_lite::Origin::produce(); - let receive_origin = client_publish_origin.consumer.clone(); + let receive_origin = client_publish_origin.consume(); let session = request - .with_publish(server_publish_origin.consumer) - .with_consume(client_publish_origin.producer) + .with_publish(server_publish_origin.consume()) + .with_consume(client_publish_origin) .accept() .await .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; @@ -821,10 +821,11 @@ impl MoqPeerNode { tracing::info!("Received catalog from publisher: audio={:?}", catalog.audio); - if let Some(audio) = &catalog.audio { + { + let audio = &catalog.audio; if let Some(track_name) = audio.renditions.keys().next() { tracing::info!("Found audio track in catalog: {}", track_name); - return Ok(Some((track_name.clone(), audio.priority))); + return Ok(Some((track_name.clone(), 2))); } } tracing::debug!("Catalog has no audio yet, waiting for update..."); @@ -988,11 +989,11 @@ impl MoqPeerNode { // Create origin for sending to client let server_publish_origin = moq_lite::Origin::produce(); - let send_origin = server_publish_origin.producer.clone(); + let send_origin = server_publish_origin.clone(); // Accept MoQ session (subscriber only receives, no client publish needed) let session = request - .with_publish(server_publish_origin.consumer) + .with_publish(server_publish_origin.consume()) .accept() .await .map_err(|e| StreamKitError::Runtime(format!("Failed to accept session: {e}")))?; @@ -1056,7 +1057,7 @@ impl MoqPeerNode { ) .await?; - track_producer.inner.clone().close(); + track_producer.track.clone().close(); tracing::info!("Subscriber task finished after {} packets", packet_count); Ok(()) } @@ -1066,18 +1067,18 @@ impl MoqPeerNode { publish: &moq_lite::OriginProducer, broadcast_name: &str, ) -> Result< - (moq_lite::BroadcastProducer, hang::TrackProducer, moq_lite::TrackProducer), + (moq_lite::BroadcastProducer, hang::container::OrderedProducer, moq_lite::TrackProducer), StreamKitError, > { // Create broadcast - let broadcast_produce = moq_lite::Broadcast::produce(); - publish.publish_broadcast(broadcast_name, broadcast_produce.consumer); - let mut broadcast_producer = broadcast_produce.producer; + let mut broadcast_producer = publish.create_broadcast(broadcast_name).ok_or_else(|| { + StreamKitError::Runtime(format!("Failed to create broadcast '{broadcast_name}'")) + })?; // Create audio track let audio_track = moq_lite::Track { name: "audio/data".to_string(), priority: 80 }; let track_producer = broadcast_producer.create_track(audio_track.clone()); - let track_producer: hang::TrackProducer = track_producer.into(); + let track_producer: hang::container::OrderedProducer = track_producer.into(); // Create and publish catalog let catalog_producer = @@ -1100,11 +1101,13 @@ impl MoqPeerNode { channel_count: 1, bitrate: Some(64_000), description: None, + container: Default::default(), + jitter: None, }, ); let catalog = hang::catalog::Catalog { - audio: Some(hang::catalog::Audio { renditions: audio_renditions, priority: 80 }), + audio: hang::catalog::Audio { renditions: audio_renditions }, ..Default::default() }; @@ -1121,7 +1124,7 @@ impl MoqPeerNode { /// Run the main send loop, forwarding packets to the subscriber #[allow(clippy::too_many_arguments)] async fn run_subscriber_send_loop( - track_producer: &mut hang::TrackProducer, + track_producer: &mut hang::container::OrderedProducer, mut broadcast_rx: broadcast::Receiver, shutdown_rx: &mut broadcast::Receiver<()>, output_group_duration_ms: u64, @@ -1181,7 +1184,7 @@ impl MoqPeerNode { #[allow(clippy::too_many_arguments, clippy::cast_precision_loss)] fn handle_broadcast_recv( recv_result: Result, - track_producer: &mut hang::TrackProducer, + track_producer: &mut hang::container::OrderedProducer, packet_count: &mut u64, frame_count: &mut u64, last_log: &mut std::time::Instant, @@ -1213,14 +1216,15 @@ impl MoqPeerNode { } *last_ts_ms = Some(timestamp_ms); - let timestamp = hang::Timestamp::from_millis(timestamp_ms).map_err(|_| { - StreamKitError::Runtime("MoQ frame timestamp overflow".to_string()) - })?; + let timestamp = + hang::container::Timestamp::from_millis(timestamp_ms).map_err(|_| { + StreamKitError::Runtime("MoQ frame timestamp overflow".to_string()) + })?; - let mut payload = hang::BufList::new(); + let mut payload = hang::container::BufList::new(); payload.push_chunk(broadcast_frame.data); - let frame = hang::Frame { timestamp, keyframe, payload }; + let frame = hang::container::Frame { timestamp, keyframe, payload }; if let Err(e) = track_producer.write(frame) { tracing::warn!("Failed to write MoQ frame to subscriber: {e}"); diff --git a/crates/nodes/src/transport/moq/pull.rs b/crates/nodes/src/transport/moq/pull.rs index ca7a0c27..d2f1b09f 100644 --- a/crates/nodes/src/transport/moq/pull.rs +++ b/crates/nodes/src/transport/moq/pull.rs @@ -296,8 +296,9 @@ impl MoqPullNode { let client = super::shared_insecure_client()?; let origin = moq_lite::Origin::produce(); + let consumer = origin.consume(); let _consumer_session = - client.clone().with_consume(origin.producer).connect(url).await.map_err(|e| { + client.clone().with_consume(origin).connect(url).await.map_err(|e| { StreamKitError::Runtime(format!("Failed to create consumer session: {e}")) })?; @@ -306,7 +307,7 @@ impl MoqPullNode { // During dynamic session initialization, the broadcast may not have been announced yet. // Treat this as "no tracks discovered" rather than a hard error: the runtime `run()` path // already waits for announcements and will connect once the broadcast appears. - let Some(broadcast) = origin.consumer.consume_broadcast(&self.config.broadcast) else { + let Some(broadcast) = consumer.consume_broadcast(&self.config.broadcast) else { tracing::debug!( broadcast = %self.config.broadcast, "Broadcast not available during catalog discovery; using default output pin" @@ -378,23 +379,20 @@ impl MoqPullNode { let mut tracks = Vec::new(); - if let Some(audio) = catalog.audio { - for (track_name, config) in audio.renditions { - match config.codec { - hang::catalog::AudioCodec::Opus => { - tracing::info!(track = %track_name, "found opus audio track"); - let track = - moq_lite::Track { name: track_name, priority: audio.priority }; - tracks.push(track); - }, - codec => { - tracing::debug!( - "skipping non-opus audio track: {} (codec: {})", - track_name, - codec - ); - }, - } + for (track_name, config) in catalog.audio.renditions { + match config.codec { + hang::catalog::AudioCodec::Opus => { + tracing::info!(track = %track_name, "found opus audio track"); + let track = moq_lite::Track { name: track_name, priority: 2 }; + tracks.push(track); + }, + codec => { + tracing::debug!( + "skipping non-opus audio track: {} (codec: {})", + track_name, + codec + ); + }, } } @@ -430,18 +428,19 @@ impl MoqPullNode { // Create origin for consuming broadcasts only (no publishing to avoid cycles) let origin = moq_lite::Origin::produce(); + let consumer = origin.consume(); let _consumer_session = - client.clone().with_consume(origin.producer).connect(url).await.map_err(|e| { + client.clone().with_consume(origin).connect(url).await.map_err(|e| { StreamKitError::Runtime(format!("Failed to create consumer session: {e}")) })?; // Wait for broadcast to become available // Note: consume_broadcast() only works after announcement, so we primarily rely on announcements let broadcast = { - let mut consumer = origin.consumer.clone(); + let mut announcements = consumer.clone(); // Try immediate consume first (works if broadcast already announced) - if let Some(broadcast) = origin.consumer.consume_broadcast(&self.config.broadcast) { + if let Some(broadcast) = consumer.consume_broadcast(&self.config.broadcast) { tracing::info!("Broadcast '{}' is immediately available", self.config.broadcast); broadcast } else { @@ -469,7 +468,7 @@ impl MoqPullNode { } } } - Some((path, maybe_broadcast)) = consumer.announced() => { + Some((path, maybe_broadcast)) = announcements.announced() => { if let Some(broadcast) = maybe_broadcast { // Compare paths without allocation - bind path to extend lifetime let announced_path = path.as_path(); diff --git a/crates/nodes/src/transport/moq/push.rs b/crates/nodes/src/transport/moq/push.rs index e37738bc..dcd1c31f 100644 --- a/crates/nodes/src/transport/moq/push.rs +++ b/crates/nodes/src/transport/moq/push.rs @@ -122,7 +122,7 @@ impl ProcessorNode for MoqPushNode { let publisher_origin = moq_lite::Origin::produce(); let _publisher_session = - match client.clone().with_publish(publisher_origin.consumer).connect(url).await { + match client.clone().with_publish(publisher_origin.consume()).connect(url).await { Ok(session) => session, Err(e) => { let err_msg = format!("Failed to create publisher session: {e}"); @@ -132,14 +132,13 @@ impl ProcessorNode for MoqPushNode { }; // Create a transcoded broadcast and publish it - let transcoded_broadcast = moq_lite::Broadcast::produce(); - - // Publish the transcoded broadcast via the publisher session - publisher_origin - .producer - .publish_broadcast(&self.config.broadcast, transcoded_broadcast.consumer); - - let mut broadcast = transcoded_broadcast.producer; + let mut broadcast = + publisher_origin.create_broadcast(&self.config.broadcast).ok_or_else(|| { + StreamKitError::Runtime(format!( + "Failed to create broadcast '{}'", + self.config.broadcast + )) + })?; tracing::info!("Publishing to broadcast '{}'", self.config.broadcast); @@ -148,7 +147,7 @@ impl ProcessorNode for MoqPushNode { let audio_track = moq_lite::Track { name: "audio/data".to_string(), priority: 80 }; let track_producer = broadcast.create_track(audio_track.clone()); - let mut track_producer: hang::TrackProducer = track_producer.into(); + let mut track_producer: hang::container::OrderedProducer = track_producer.into(); // Create and publish a catalog describing our audio track let mut audio_renditions = std::collections::BTreeMap::new(); @@ -156,15 +155,17 @@ impl ProcessorNode for MoqPushNode { audio_track.name.clone(), hang::catalog::AudioConfig { codec: hang::catalog::AudioCodec::Opus, - sample_rate: 48000, // Default opus sample rate - channel_count: self.config.channels, // From configuration - bitrate: Some(128_000), // Default bitrate + sample_rate: 48000, + channel_count: self.config.channels, + bitrate: Some(128_000), description: None, + container: Default::default(), + jitter: None, }, ); let catalog = hang::catalog::Catalog { - audio: Some(hang::catalog::Audio { renditions: audio_renditions, priority: 80 }), + audio: hang::catalog::Audio { renditions: audio_renditions }, ..Default::default() }; @@ -254,14 +255,14 @@ impl ProcessorNode for MoqPushNode { let keyframe = is_first || clock.is_group_boundary_ms(self.config.group_duration_ms); - let timestamp = hang::Timestamp::from_millis(timestamp_ms).map_err(|_| { + let timestamp = hang::container::Timestamp::from_millis(timestamp_ms).map_err(|_| { StreamKitError::Runtime("MoQ frame timestamp overflow".to_string()) })?; - let mut payload = hang::BufList::new(); + let mut payload = hang::container::BufList::new(); payload.push_chunk(data); - let frame = hang::Frame { timestamp, keyframe, payload }; + let frame = hang::container::Frame { timestamp, keyframe, payload }; if let Err(e) = track_producer.write(frame) { let err_msg = format!("Failed to write MoQ frame: {e}"); @@ -312,7 +313,7 @@ impl ProcessorNode for MoqPushNode { state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); // Close the track when done (best-effort) - track_producer.inner.clone().close(); + track_producer.track.clone().close(); tracing::info!("MoqPushNode finished after sending {} packets", packet_count); Ok(()) From 33cf29c3cbbb8efc883d570e4bd02f5103c07907 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 7 Feb 2026 14:54:33 +0000 Subject: [PATCH 3/5] fix: use explicit Container::default() for clippy pedantic Co-Authored-By: Claudio Costa --- crates/nodes/src/transport/moq/peer.rs | 2 +- crates/nodes/src/transport/moq/push.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/nodes/src/transport/moq/peer.rs b/crates/nodes/src/transport/moq/peer.rs index 1aa50c0a..4fe12b7d 100644 --- a/crates/nodes/src/transport/moq/peer.rs +++ b/crates/nodes/src/transport/moq/peer.rs @@ -1101,7 +1101,7 @@ impl MoqPeerNode { channel_count: 1, bitrate: Some(64_000), description: None, - container: Default::default(), + container: hang::catalog::Container::default(), jitter: None, }, ); diff --git a/crates/nodes/src/transport/moq/push.rs b/crates/nodes/src/transport/moq/push.rs index dcd1c31f..6032bdf9 100644 --- a/crates/nodes/src/transport/moq/push.rs +++ b/crates/nodes/src/transport/moq/push.rs @@ -159,7 +159,7 @@ impl ProcessorNode for MoqPushNode { channel_count: self.config.channels, bitrate: Some(128_000), description: None, - container: Default::default(), + container: hang::catalog::Container::default(), jitter: None, }, ); From 2fb307def63f193485b7f172482473a634cc4c4c Mon Sep 17 00:00:00 2001 From: Claudio Costa Date: Sat, 7 Feb 2026 19:58:02 +0000 Subject: [PATCH 4/5] fix: inject priority into catalog JSON for JS client compatibility Co-Authored-By: Staging-Devin AI <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com> --- crates/nodes/src/transport/moq/mod.rs | 23 +++++++++++++++++++++++ crates/nodes/src/transport/moq/peer.rs | 4 +--- crates/nodes/src/transport/moq/push.rs | 7 +++---- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/crates/nodes/src/transport/moq/mod.rs b/crates/nodes/src/transport/moq/mod.rs index b225ecb2..06020911 100644 --- a/crates/nodes/src/transport/moq/mod.rs +++ b/crates/nodes/src/transport/moq/mod.rs @@ -51,6 +51,29 @@ fn shared_insecure_client() -> Result { } } +/// Serialize a catalog to JSON with `priority` fields injected into `video` and `audio`. +/// +/// The published `@moq/hang` JS client (0.1.2) still requires `priority` in the catalog +/// schema, but the Rust `hang` 0.13.0 crate removed it from the structs. +/// The upstream JS source has already dropped the requirement, but a new npm release +/// hasn't been published yet. This shim keeps the two sides compatible. +pub(super) fn catalog_to_json( + catalog: &hang::catalog::Catalog, +) -> Result { + let mut value = serde_json::to_value(catalog) + .map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))?; + + if let Some(video) = value.get_mut("video").and_then(|v| v.as_object_mut()) { + video.entry("priority").or_insert(serde_json::json!(60)); + } + if let Some(audio) = value.get_mut("audio").and_then(|v| v.as_object_mut()) { + audio.entry("priority").or_insert(serde_json::json!(80)); + } + + serde_json::to_string(&value) + .map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}"))) +} + pub(super) fn redact_url_str_for_logs(raw: &str) -> String { raw.parse::().map_or_else( |_| raw.split(['?', '#']).next().unwrap_or(raw).to_string(), diff --git a/crates/nodes/src/transport/moq/peer.rs b/crates/nodes/src/transport/moq/peer.rs index 4fe12b7d..e28965fe 100644 --- a/crates/nodes/src/transport/moq/peer.rs +++ b/crates/nodes/src/transport/moq/peer.rs @@ -1113,9 +1113,7 @@ impl MoqPeerNode { let mut catalog_producer = broadcast_producer.create_track(hang::catalog::Catalog::default_track()); - let catalog_json = catalog - .to_string() - .map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))?; + let catalog_json = super::catalog_to_json(&catalog)?; catalog_producer.write_frame(catalog_json.into_bytes()); Ok(catalog_producer) diff --git a/crates/nodes/src/transport/moq/push.rs b/crates/nodes/src/transport/moq/push.rs index 6032bdf9..93187da9 100644 --- a/crates/nodes/src/transport/moq/push.rs +++ b/crates/nodes/src/transport/moq/push.rs @@ -171,12 +171,11 @@ impl ProcessorNode for MoqPushNode { // Create catalog track and publish the catalog data let mut catalog_producer = broadcast.create_track(hang::catalog::Catalog::default_track()); - let catalog_json = match catalog.to_string() { + let catalog_json = match super::catalog_to_json(&catalog) { Ok(json) => json, Err(e) => { - let err_msg = format!("Failed to serialize catalog: {e}"); - state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg); - return Err(StreamKitError::Runtime(err_msg)); + state_helpers::emit_failed(&context.state_tx, &node_name, e.to_string()); + return Err(e); }, }; let catalog_data = catalog_json.into_bytes(); // Avoid intermediate Vec allocation From 234a53191f0f0625c3e8c17a14e8b77ed3a256df Mon Sep 17 00:00:00 2001 From: Claudio Costa Date: Sat, 7 Feb 2026 19:58:16 +0000 Subject: [PATCH 5/5] style: format catalog_to_json function Co-Authored-By: Staging-Devin AI <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com> --- crates/nodes/src/transport/moq/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/nodes/src/transport/moq/mod.rs b/crates/nodes/src/transport/moq/mod.rs index 06020911..e187743a 100644 --- a/crates/nodes/src/transport/moq/mod.rs +++ b/crates/nodes/src/transport/moq/mod.rs @@ -57,9 +57,7 @@ fn shared_insecure_client() -> Result { /// schema, but the Rust `hang` 0.13.0 crate removed it from the structs. /// The upstream JS source has already dropped the requirement, but a new npm release /// hasn't been published yet. This shim keeps the two sides compatible. -pub(super) fn catalog_to_json( - catalog: &hang::catalog::Catalog, -) -> Result { +pub(super) fn catalog_to_json(catalog: &hang::catalog::Catalog) -> Result { let mut value = serde_json::to_value(catalog) .map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))?;