From 625265b25cba204723ad5c7ed33de9a6044e8c82 Mon Sep 17 00:00:00 2001 From: davidcforbes Date: Sun, 22 Feb 2026 21:25:09 -0800 Subject: [PATCH 1/2] Add trunk registration module for inbound PSTN call delivery Telnyx credential-based SIP connections require the PBX to REGISTER with sip.telnyx.com for inbound call routing. This adds a new TrunkRegistrationModule that scans trunk configs for register=true, then spawns a background task per trunk that periodically sends REGISTER requests via rsipstack's Registration API. Co-Authored-By: Claude Opus 4.6 --- src/app.rs | 4 +- src/proxy/mod.rs | 1 + src/proxy/routing/mod.rs | 8 ++ src/proxy/trunk_register.rs | 172 ++++++++++++++++++++++++++++++++++++ 4 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 src/proxy/trunk_register.rs diff --git a/src/app.rs b/src/app.rs index 6d64e191..d99270a1 100644 --- a/src/app.rs +++ b/src/app.rs @@ -14,6 +14,7 @@ use crate::{ presence::PresenceModule, registrar::RegistrarModule, server::{SipServer, SipServerBuilder}, + trunk_register::TrunkRegistrationModule, ws::sip_ws_handler, }, }; @@ -348,7 +349,8 @@ impl AppStateBuilder { .register_module("auth", AuthModule::create) .register_module("presence", PresenceModule::create) .register_module("registrar", RegistrarModule::create) - .register_module("call", CallModule::create); + .register_module("call", CallModule::create) + .register_module("trunk_register", TrunkRegistrationModule::create); builder = addon_registry.apply_proxy_server_hooks(builder, core.clone()); builder.build().await diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 2d1a221f..2887c1a8 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -21,6 +21,7 @@ pub mod proxy_call; pub mod registrar; pub mod routing; pub mod server; +pub mod trunk_register; pub mod tests; pub mod user; pub mod user_db; diff --git a/src/proxy/routing/mod.rs b/src/proxy/routing/mod.rs index a9783735..7728c62e 100644 --- a/src/proxy/routing/mod.rs +++ b/src/proxy/routing/mod.rs @@ -78,6 +78,12 @@ pub struct TrunkConfig { pub country: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub policy: Option, + /// Enable upstream registration (for inbound call delivery) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub register: Option, + /// Registration expiry in seconds (default 3600) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub register_expires: Option, #[serde(skip)] pub origin: ConfigOrigin, } @@ -103,6 +109,8 @@ impl Default for TrunkConfig { incoming_to_user_prefix: None, country: None, policy: None, + register: None, + register_expires: None, origin: ConfigOrigin::embedded(), } } diff --git a/src/proxy/trunk_register.rs b/src/proxy/trunk_register.rs new file mode 100644 index 00000000..fbb1ee26 --- /dev/null +++ b/src/proxy/trunk_register.rs @@ -0,0 +1,172 @@ +use crate::config::ProxyConfig; +use crate::proxy::{ProxyModule, server::SipServerRef}; +use anyhow::Result; +use async_trait::async_trait; +use rsipstack::dialog::{authenticate::Credential, registration::Registration}; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; + +pub struct TrunkRegistrationModule { + server: SipServerRef, +} + +impl TrunkRegistrationModule { + pub fn create( + server: SipServerRef, + _config: Arc, + ) -> Result> { + Ok(Box::new(TrunkRegistrationModule { server })) + } +} + +#[async_trait] +impl ProxyModule for TrunkRegistrationModule { + fn name(&self) -> &str { + "trunk_register" + } + + async fn on_start(&mut self) -> Result<()> { + let trunks = self.server.data_context.trunks_snapshot(); + let endpoint_inner = self.server.endpoint.inner.clone(); + let cancel_token = self.server.cancel_token.clone(); + + for (name, trunk) in trunks { + if trunk.register != Some(true) { + continue; + } + + let (username, password) = match (&trunk.username, &trunk.password) { + (Some(u), Some(p)) => (u.clone(), p.clone()), + _ => { + warn!( + trunk = %name, + "trunk_register: skipping trunk without credentials" + ); + continue; + } + }; + + let dest_uri = match rsip::Uri::try_from(trunk.dest.as_str()) { + Ok(uri) => uri, + Err(e) => { + warn!( + trunk = %name, + dest = %trunk.dest, + error = %e, + "trunk_register: invalid dest URI" + ); + continue; + } + }; + + let expires = trunk.register_expires.unwrap_or(3600); + let token = cancel_token.child_token(); + let ep = endpoint_inner.clone(); + let trunk_name = name.clone(); + + info!( + trunk = %trunk_name, + dest = %trunk.dest, + expires = expires, + "trunk_register: starting registration loop" + ); + + crate::utils::spawn(async move { + register_loop(ep, trunk_name, dest_uri, username, password, expires, token).await; + }); + } + + Ok(()) + } + + async fn on_stop(&self) -> Result<()> { + debug!("trunk_register: module stopped"); + Ok(()) + } +} + +async fn register_loop( + endpoint: rsipstack::transaction::endpoint::EndpointInnerRef, + trunk_name: String, + dest_uri: rsip::Uri, + username: String, + password: String, + expires: u32, + cancel_token: CancellationToken, +) { + let credential = Credential { + username, + password, + realm: None, + }; + + let mut registration = Registration::new(endpoint, Some(credential)); + let mut retry_delay_secs: u64 = 30; + let max_retry_delay_secs: u64 = 300; + + loop { + if cancel_token.is_cancelled() { + info!(trunk = %trunk_name, "trunk_register: shutting down"); + return; + } + + match registration.register(dest_uri.clone(), Some(expires)).await { + Ok(resp) if resp.status_code == rsip::StatusCode::OK => { + let actual_expires = registration.expires(); + info!( + trunk = %trunk_name, + expires = actual_expires, + "trunk_register: registration successful" + ); + retry_delay_secs = 30; // reset backoff on success + + // Re-register at 85% of expiry + let sleep_secs = (actual_expires as u64) * 85 / 100; + let sleep_secs = sleep_secs.max(30); + + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)) => {} + _ = cancel_token.cancelled() => { + info!(trunk = %trunk_name, "trunk_register: shutting down"); + return; + } + } + } + Ok(resp) => { + warn!( + trunk = %trunk_name, + status = %resp.status_code, + "trunk_register: registration failed" + ); + + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(retry_delay_secs)) => {} + _ = cancel_token.cancelled() => { + info!(trunk = %trunk_name, "trunk_register: shutting down"); + return; + } + } + + retry_delay_secs = (retry_delay_secs * 2).min(max_retry_delay_secs); + } + Err(e) => { + warn!( + trunk = %trunk_name, + error = %e, + "trunk_register: registration error" + ); + + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(retry_delay_secs)) => {} + _ = cancel_token.cancelled() => { + info!(trunk = %trunk_name, "trunk_register: shutting down"); + return; + } + } + + retry_delay_secs = (retry_delay_secs * 2).min(max_retry_delay_secs); + } + } + } +} From 844840b5f50edc1f9195e45f2d6c9501bee3f268 Mon Sep 17 00:00:00 2001 From: yeoleobun Date: Thu, 26 Feb 2026 18:54:50 +0800 Subject: [PATCH 2/2] add mimssing field --- src/console/handlers/diagnostics.rs | 2 ++ src/proxy/data.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/console/handlers/diagnostics.rs b/src/console/handlers/diagnostics.rs index 787c5d61..03bf7d8a 100644 --- a/src/console/handlers/diagnostics.rs +++ b/src/console/handlers/diagnostics.rs @@ -621,6 +621,8 @@ fn trunk_config_from_model(model: &sip_trunk::Model) -> Option Option<(String, TrunkConfig)> { country: None, policy: None, origin: ConfigOrigin::embedded(), + register: None, + register_expires: None }; Some((model.name, trunk))