diff --git a/Cargo.lock b/Cargo.lock index 4e1ea7b..3d5f0fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2388,6 +2388,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-src" +version = "300.5.5+3.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f1787d533e03597a7934fd0a765f0d28e94ecc5fb7789f8053b1e699a56f709" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.112" @@ -2396,6 +2405,7 @@ checksum = "57d55af3b3e226502be1526dfdba67ab0e9c96fc293004e79576b2b9edb0dbdb" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -3062,6 +3072,7 @@ dependencies = [ "futures", "libc", "log", + "openssl", "serde", "sha3", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 0180356..5d5000e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,15 @@ chrono = { version = "0.4", features = ["clock"] } # OS error codes used in the accept-loop backoff to distinguish EMFILE/ENFILE # (resource exhaustion → log error) from transient errors (log debug). libc = "0.2" +# Vendor OpenSSL source so the binary builds without system libssl-dev headers +# on Linux. native-tls (pulled transitively through arti-client → tor-rtcompat) +# links against OpenSSL on Linux; without this feature flag the build fails on +# any machine that lacks the -dev package. macOS and Windows are unaffected +# (they use Security.framework and SChannel respectively), but the `vendored` +# feature is a no-op on those targets so there is no downside to enabling it +# unconditionally. Build-time cost is ~60 s on first compile; subsequent +# incremental builds are fast because the OpenSSL objects are cached. +openssl = { version = "0.10", features = ["vendored"] } [dev-dependencies] tempfile = "3" diff --git a/src/config/loader.rs b/src/config/loader.rs index c32aca6..a79358b 100644 --- a/src/config/loader.rs +++ b/src/config/loader.rs @@ -35,7 +35,15 @@ fn validate(cfg: &Config) -> Result<()> { // level: LogLevel — invalid levels are already rejected by serde at parse time (4.2). // [site] - if cfg.site.index_file.contains(std::path::MAIN_SEPARATOR) { + // `index_file` must be a bare filename, not a path. + // Use Path::components() rather than checking for MAIN_SEPARATOR: + // on Windows both `/` and `\` are valid separators, so a string-contains + // check on `\` alone misses "sub/index.html" written with forward slashes. + if std::path::Path::new(&cfg.site.index_file) + .components() + .count() + > 1 + { errors.push("[site] index_file must be a filename only, not a path".into()); } { @@ -49,7 +57,16 @@ fn validate(cfg: &Config) -> Result<()> { { errors.push("[site] directory must not contain '..' components".into()); } - if cfg.site.directory.contains(std::path::MAIN_SEPARATOR) { + // Count only Normal components so this check is independent from the + // is_absolute() guard above (a RootDir component would double-trigger). + // As with index_file, Path::components() handles both `/` and `\` on + // Windows, making the check correct on all platforms. + if dir_path + .components() + .filter(|c| matches!(c, std::path::Component::Normal(_))) + .count() + > 1 + { errors.push("[site] directory must be a directory name only, not a path".into()); } } diff --git a/src/console/dashboard.rs b/src/console/dashboard.rs index 23f8964..7f84d16 100644 --- a/src/console/dashboard.rs +++ b/src/console/dashboard.rs @@ -71,7 +71,16 @@ pub fn render_dashboard(state: &AppState, requests: u64, errors: u64, config: &C || match &state.tor_status { TorStatus::Disabled => dim("(disabled)"), TorStatus::Starting => dim("(bootstrapping…)"), - TorStatus::Ready => dim("(reading…)"), + // fix 3.11 — this branch is unreachable in practice because + // set_onion() sets Ready and Some(addr) atomically. If it fires, + // an invariant has been violated; the honest label is "unavailable". + TorStatus::Ready => { + debug_assert!( + false, + "TorStatus::Ready with no onion_address — invariant violated" + ); + dim("(address unavailable)") + } TorStatus::Failed(_) => dim("(unavailable)"), }, |addr| format!("http://{addr}"), diff --git a/src/console/mod.rs b/src/console/mod.rs index ed1b4bc..0417867 100644 --- a/src/console/mod.rs +++ b/src/console/mod.rs @@ -65,6 +65,28 @@ pub fn start( metrics: SharedMetrics, mut shutdown: watch::Receiver, ) -> Result> { + // On Windows, the console host must have VT (Virtual Terminal) escape- + // sequence processing enabled before we write any ANSI colour codes. + // Windows Terminal and modern ConHost (Win 10 1903+) enable it + // automatically, but older ConHost versions (Windows Server 2016/2019 with + // default settings) do not. Without this, colour escape sequences appear + // as literal characters (e.g. "^[[32m") rather than being interpreted. + // + // Failure is non-fatal: the terminal is still functional, just monochrome. + // We warn so the operator knows why colours are missing rather than + // silently degrading. + #[cfg(windows)] + if let Err(e) = execute!( + stdout(), + crossterm::terminal::EnableVirtualTerminalProcessing + ) { + log::warn!( + "Could not enable Windows VT processing: {e}. \ + ANSI colours may not render correctly. \ + Upgrade to Windows Terminal or Windows 10 1903+ for full colour support." + ); + } + // 4.1 — map crossterm io errors to AppError::Console. terminal::enable_raw_mode() .map_err(|e| AppError::Console(format!("Failed to enable raw mode: {e}")))?; diff --git a/src/error.rs b/src/error.rs index 57ff345..94ac159 100644 --- a/src/error.rs +++ b/src/error.rs @@ -37,10 +37,6 @@ pub enum AppError { #[error("Server startup error: {0}")] ServerStartup(String), - /// An error originating in the Tor / Arti subsystem. - #[error("Tor error: {0}")] - Tor(String), - /// Console / terminal I/O error (crossterm or raw-mode operations). #[error("Console error: {0}")] Console(String), diff --git a/src/runtime/lifecycle.rs b/src/runtime/lifecycle.rs index 67b6766..d3820fd 100644 --- a/src/runtime/lifecycle.rs +++ b/src/runtime/lifecycle.rs @@ -192,16 +192,23 @@ async fn normal_run(data_dir: PathBuf, settings_path: &Path) -> Result<()> { }; // 8. Start Tor (if enabled). - // tor::init() spawns a Tokio task and returns immediately. + // tor::init() spawns a Tokio task and returns its JoinHandle. + // fix 3.1 — we store the handle and await it during shutdown so active + // Tor circuits get a chance to close cleanly (max 5 s). + // fix 3.6 — pass config.server.bind so the local proxy connect uses the + // correct loopback address (e.g. ::1 on IPv6-only machines). // 2.10 — pass shutdown_rx so Tor's stream loop exits on clean shutdown. - if config.tor.enabled { - tor::init( + let tor_handle = if config.tor.enabled { + Some(tor::init( data_dir.clone(), bind_port, + config.server.bind, Arc::clone(&state), shutdown_rx.clone(), - ); - } + )) + } else { + None + }; // 9. Start console UI. let key_rx = start_console(&config, &state, &metrics, shutdown_rx.clone()).await?; @@ -223,6 +230,13 @@ async fn normal_run(data_dir: PathBuf, settings_path: &Path) -> Result<()> { // 2.10 — wait for the HTTP server to drain in-flight connections (max 5 s). let _ = tokio::time::timeout(Duration::from_secs(5), server_handle).await; + // fix 3.1 — await the Tor task so active circuits can send a clean + // RELAY_END cell before the runtime tears down. The shutdown watch + // channel was already signalled above; this just drains the task. + if let Some(handle) = tor_handle { + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } + log::info!("RustHost shut down cleanly."); logging::flush(); console::cleanup(); @@ -273,6 +287,58 @@ async fn start_console( } } +// ─── SIGTERM helper ─────────────────────────────────────────────────────────── +// +// `tokio::select!` does not support `#[cfg(...)]` on individual arms; the macro +// expands its arms textually before cfg evaluation, so a guarded arm produces a +// parse error. The solution is a cross-platform helper with identical call-site +// syntax on every target: +// +// • Unix — registers a SIGTERM handler and awaits the first delivery. +// • non-Unix — awaits `std::future::pending()`, which never resolves. +// +// Both variants share the name `next_sigterm()` and return `()`, so a single +// unconditional `select!` arm covers all platforms. The caller pins the +// returned future outside its loop so the Unix `Signal` handle (and its OS-level +// signal pipe) is created exactly once for the lifetime of the event loop. +// +// Failure to register the Unix handler (e.g. signal limit reached) is logged as +// a warning and the function falls back to `pending()` — the process remains +// functional, just without SIGTERM-triggered graceful shutdown. + +/// On Unix, resolve once when `SIGTERM` is delivered; fall back to pending +/// forever if the signal stream cannot be registered. +/// +/// See the module-level comment above for the cross-platform design rationale. +#[cfg(unix)] +async fn next_sigterm() { + use tokio::signal::unix::{signal, SignalKind}; + match signal(SignalKind::terminate()) { + Ok(mut stream) => { + // recv() returns Option<()>; None means the stream was dropped, + // which cannot happen here. Either way we return so the select! + // arm fires and the graceful shutdown path runs. + stream.recv().await; + } + Err(e) => { + log::warn!( + "Could not register SIGTERM handler: {e}. \ + Send Ctrl-C or use --signal-file to stop the process." + ); + std::future::pending::<()>().await; + } + } +} + +/// On non-Unix platforms, pend forever so the `select!` arm is always +/// present in the source but never fires. +/// +/// See the module-level comment above for the cross-platform design rationale. +#[cfg(not(unix))] +async fn next_sigterm() { + std::future::pending::<()>().await +} + async fn event_loop( key_rx: Option>, config: &Arc, @@ -288,6 +354,25 @@ async fn event_loop( let ctrl_c = tokio::signal::ctrl_c(); tokio::pin!(ctrl_c); + // SIGTERM handling — cross-platform design note + // ───────────────────────────────────────────── + // `tokio::select!` is a declarative macro that expands its arms textually; + // it does not honour `#[cfg(...)]` attributes placed on individual arms. + // Putting `#[cfg(unix)] _ = sigterm.recv() => { … }` inside the macro + // causes a parse error ("no rules expected `}`") on every platform. + // + // Solution: a platform-unified helper function `next_sigterm()` with + // identical call-site syntax on all targets: + // • Unix — awaits the next SIGTERM delivery from the OS. + // • non-Unix — awaits `std::future::pending()` (never resolves). + // Both branches return `()` so `select!` sees one unconditional arm. + // + // The future is pinned here, outside the loop, so the Unix `Signal` handle + // (and its internal OS registration) is created exactly once and reused + // across every `select!` iteration — same pattern as `ctrl_c` above. + let sigterm = next_sigterm(); + tokio::pin!(sigterm); + loop { // Build a future that yields the next key, or pends forever once the // channel closes (avoids repeated None-match after input task death). @@ -326,6 +411,16 @@ async fn event_loop( } break; } + // Graceful shutdown on SIGTERM. + // On Unix this arm fires when the OS delivers SIGTERM, covering + // `systemctl stop`, `docker stop`, launchd unload, and any process + // supervisor that sends SIGTERM before SIGKILL. + // On non-Unix platforms `next_sigterm()` pends forever, so this + // arm is syntactically present but never selected. + () = &mut sigterm => { + log::info!("SIGTERM received — shutting down gracefully."); + break; + } } } Ok(()) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 46c503e..56782bc 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -21,8 +21,16 @@ pub mod state; pub fn open_browser(url: &str) { #[cfg(target_os = "macos")] let _ = std::process::Command::new("open").arg(url).spawn(); + // `explorer.exe ` is unreliable — on some Windows configurations it + // opens File Explorer instead of the default browser. `cmd /c start` + // delegates to the Windows shell association table, which always picks the + // correct handler. The empty-string third argument is required to prevent + // `start` from treating the URL (which may contain special chars) as the + // window title. #[cfg(target_os = "windows")] - let _ = std::process::Command::new("explorer").arg(url).spawn(); + let _ = std::process::Command::new("cmd") + .args(["/c", "start", "", url]) + .spawn(); #[cfg(not(any(target_os = "macos", target_os = "windows")))] let _ = std::process::Command::new("xdg-open").arg(url).spawn(); } diff --git a/src/server/handler.rs b/src/server/handler.rs index 400dda8..fd555a3 100644 --- a/src/server/handler.rs +++ b/src/server/handler.rs @@ -174,13 +174,20 @@ pub async fn handle( } Resolved::Redirect(location) => { let body = format!("Redirecting to {location}"); - let mut hdr = String::new(); - let _ = std::fmt::write( - &mut hdr, - format_args!( - "HTTP/1.1 301 Moved Permanently\r\n Location: {location}\r\n Content-Type: text/plain\r\n Content-Length: {len}\r\n Connection: close\r\n \r\n", - len = body.len() - ), + // The original format_args! string used source indentation that + // injected leading spaces after each \r\n, producing + // "folded header" lines (RFC 7230 §3.2.6 deprecated, + // RFC 9112 §5.1 forbidden). Strict HTTP clients reject them. + // Use a raw string with explicit \r\n\ continuations so the + // indentation is NOT part of the emitted bytes. + let body_len = body.len(); + let hdr = format!( + "HTTP/1.1 301 Moved Permanently\r\n\ + Location: {location}\r\n\ + Content-Type: text/plain\r\n\ + Content-Length: {body_len}\r\n\ + Connection: close\r\n\ + \r\n" ); stream.write_all(hdr.as_bytes()).await?; if !is_head { diff --git a/src/server/mod.rs b/src/server/mod.rs index 5400ef8..22d9397 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -231,21 +231,31 @@ fn bind_with_fallback(addr: IpAddr, port: u16, fallback: bool) -> Result<(TcpLis }) } -/// Return `true` when `e` represents file-descriptor exhaustion (`EMFILE` or -/// `ENFILE`) on Unix platforms. +/// Return `true` when `e` represents file-descriptor exhaustion. /// -/// On non-Unix targets (Windows) where these error codes have no equivalent, -/// always returns `false`. +/// On Unix this matches `EMFILE` (24, per-process FD limit) and `ENFILE` +/// (23, system-wide FD limit), both specified by POSIX and identical on +/// Linux, macOS, FreeBSD, and other POSIX-conformant systems. +/// +/// On Windows this matches `WSAEMFILE` (10024), the Winsock equivalent of +/// `EMFILE` — it fires when the per-process socket descriptor table is full. +/// +/// On all other targets the function always returns `false`. fn is_fd_exhaustion(e: &std::io::Error) -> bool { #[cfg(unix)] { // EMFILE (24): too many open files for the process. // ENFILE (23): too many open files system-wide. - // Both values are specified by POSIX and identical on Linux, macOS, - // FreeBSD, and other POSIX-conformant systems. matches!(e.raw_os_error(), Some(libc::EMFILE | libc::ENFILE)) } - #[cfg(not(unix))] + #[cfg(windows)] + { + // WSAEMFILE (10024): per-process socket handle limit reached. + // This is the Windows Sockets equivalent of POSIX EMFILE and fires + // when the process has exhausted its socket descriptor table. + matches!(e.raw_os_error(), Some(10_024)) + } + #[cfg(not(any(unix, windows)))] { let _ = e; false diff --git a/src/tor/mod.rs b/src/tor/mod.rs index ce13e81..08a49c7 100644 --- a/src/tor/mod.rs +++ b/src/tor/mod.rs @@ -16,10 +16,13 @@ //! //! ## Flow //! -//! 1. `init()` spawns a Tokio task (non-blocking, same public API as before). +//! 1. `init()` spawns a Tokio task and returns its `JoinHandle`. +//! The handle is awaited by lifecycle during graceful shutdown so active +//! Tor circuits get a chance to close cleanly (fix 3.1). //! 2. `TorClient::create_bootstrapped()` connects to the Tor network. -//! First run downloads ~2 MB of directory consensus (~30 s). Subsequent -//! runs reuse the cache in `rusthost-data/arti_cache/` and are fast. +//! A 120-second timeout prevents an indefinite hang on firewalled networks +//! (fix 3.3). First run downloads ~2 MB of directory consensus (~30 s). +//! Subsequent runs reuse the cache in `rusthost-data/arti_cache/` and are fast. //! 3. `tor_client.launch_onion_service()` registers the hidden service. //! The address is derived from the keypair and is available immediately. //! The keypair is persisted in `rusthost-data/arti_state/keys/` so the @@ -29,41 +32,144 @@ //! arriving on `HiddenServicePort 80 127.0.0.1:{port}`). //! 5. Each `StreamRequest` is accepted and bridged to the local HTTP server //! with `tokio::io::copy_bidirectional` in its own Tokio task. -//! 6. `kill()` is a no-op — the `TorClient` is dropped when the task exits +//! Both the local connect and the bidirectional copy carry timeouts to +//! bound the lifetime of stalled connections (fix 3.1, fix 3.2). +//! 6. When the `stream_requests` stream ends unexpectedly (transient network +//! disruption, Arti circuit reset) the module re-bootstraps with +//! exponential backoff up to `MAX_RETRIES` times before giving up +//! (fix 3.4). +//! 7. `kill()` is a no-op — the `TorClient` is dropped when the task exits //! during normal Tokio runtime shutdown, which closes all circuits cleanly. +use std::net::IpAddr; use std::path::PathBuf; +use std::time::Duration; use arti_client::config::TorClientConfigBuilder; use arti_client::TorClient; use futures::StreamExt; -use tokio::{net::TcpStream, sync::watch}; +use tokio::{net::TcpStream, sync::watch, task::JoinHandle}; use tor_cell::relaycell::msg::Connected; use tor_hsservice::{config::OnionServiceConfigBuilder, handle_rend_requests, HsId, StreamRequest}; use crate::runtime::state::{SharedState, TorStatus}; -// ─── Public entry point ────────────────────────────────────────────────────── +// ─── Timeout / retry constants ──────────────────────────────────────────────── + +/// How long to wait for Arti to complete the initial directory bootstrap. +/// Covers first-run consensus download plus circuit establishment. +const BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(120); + +/// How long to wait for the local HTTP server to accept a proxied connection. +/// A hung local server should not hold a semaphore permit indefinitely. +const LOCAL_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); + +/// Maximum wall-clock lifetime of a single proxied Tor stream. +/// Prevents stalled or adversarially slow clients from exhausting the +/// semaphore by holding permits open with no forward data progress. +const STREAM_MAX_LIFETIME: Duration = Duration::from_secs(300); + +/// Base delay between re-bootstrap attempts (multiplied by attempt count). +const RETRY_BASE_SECS: u64 = 30; + +/// Maximum number of automatic re-bootstrap attempts after an unexpected +/// stream-end before the module sets `TorStatus::Failed` permanently. +const MAX_RETRIES: u32 = 5; + +// ─── Public entry point ─────────────────────────────────────────────────────── /// Initialise Tor using the embedded Arti client. /// -/// Spawns a Tokio task and returns immediately. Tor status and the onion -/// address are written into `state` as things progress, exactly as before. +/// Spawns a Tokio task and returns its [`JoinHandle`]. The caller **must** +/// await the handle (with a timeout) during graceful shutdown so active Tor +/// circuits can close cleanly before the runtime exits (fix 3.1). /// -/// `shutdown` is a watch channel whose `true` value triggers a clean exit -/// from the stream-request loop (fix 2.10). +/// Tor status and the onion address are written into `state` as things +/// progress. `shutdown` is a watch channel whose `true` value triggers a +/// clean exit from the stream-request loop. +/// +/// `bind_addr` must match `config.server.bind` so the local proxy connect +/// uses the correct loopback address even when the server is bound to `::1` +/// rather than `127.0.0.1` (fix 3.6). pub fn init( data_dir: PathBuf, bind_port: u16, + bind_addr: IpAddr, state: SharedState, shutdown: watch::Receiver, -) { +) -> JoinHandle<()> { tokio::spawn(async move { - if let Err(e) = run(data_dir, bind_port, state.clone(), shutdown).await { - log::error!("Tor: fatal error: {e}"); - set_status(&state, TorStatus::Failed(e.to_string())).await; + let mut attempts = 0u32; + + loop { + // `run()` returns: + // Ok(true) — stream ended unexpectedly; caller should retry + // Ok(false) — clean shutdown signal received; caller should exit + // Err(e) — fatal, unrecoverable error + match run( + data_dir.clone(), + bind_port, + bind_addr, + state.clone(), + shutdown.clone(), + ) + .await + { + Ok(false) => { + // Shutdown signal — exit cleanly without touching status. + log::info!("Tor: task exiting cleanly."); + break; + } + Ok(true) => { + // Stream ended unexpectedly (transient network disruption). + // Use saturating_add to satisfy clippy::integer_arithmetic — + // in practice attempts never exceeds MAX_RETRIES (a small u32). + attempts = attempts.saturating_add(1); + if attempts > MAX_RETRIES { + log::error!( + "Tor: stream ended {MAX_RETRIES} consecutive times; giving up." + ); + set_status( + &state, + TorStatus::Failed("too many reconnect attempts".into()), + ) + .await; + break; + } + + let delay = + Duration::from_secs(RETRY_BASE_SECS.saturating_mul(u64::from(attempts))); + log::warn!( + "Tor: stream ended; re-bootstrapping in {delay:?} \ + (attempt {attempts}/{MAX_RETRIES})" + ); + + // Clear the displayed address while reconnecting. + state.write().await.onion_address = None; + set_status(&state, TorStatus::Starting).await; + + // Honour shutdown signals that arrive during the backoff sleep. + // The cloned receiver must be bound to a local variable so it + // lives for the full duration of the select! borrow (E0716). + let mut backoff_shutdown = shutdown.clone(); + tokio::select! { + () = tokio::time::sleep(delay) => {} + _ = backoff_shutdown.changed() => { + if *backoff_shutdown.borrow() { + log::info!("Tor: shutdown during backoff — exiting."); + break; + } + } + } + } + Err(e) => { + log::error!("Tor: fatal error: {e}"); + set_status(&state, TorStatus::Failed(e.to_string())).await; + break; + } + } } - }); + }) } // `kill()` has been removed (fix 2.10): the `TorClient` is owned by the task @@ -73,12 +179,19 @@ pub fn init( // ─── Core async logic ───────────────────────────────────────────────────────── +/// Run the full Tor lifecycle (bootstrap → launch service → proxy streams). +/// +/// Returns: +/// - `Ok(false)` — shutdown signal received; caller should exit. +/// - `Ok(true)` — stream ended unexpectedly; caller should retry. +/// - `Err(e)` — unrecoverable error; caller should set `Failed` and exit. async fn run( data_dir: PathBuf, bind_port: u16, + bind_addr: IpAddr, state: SharedState, mut shutdown: watch::Receiver, -) -> Result<(), Box> { +) -> Result> { set_status(&state, TorStatus::Starting).await; // ── 1. Build TorClientConfig ────────────────────────────────────────── @@ -99,12 +212,19 @@ async fn run( // ── 2. Bootstrap ────────────────────────────────────────────────────── // - // Async-blocks until Tor has fetched enough directory info to open - // circuits safely. Subsequent runs reuse the cached consensus and - // finish in a few seconds. - let tor_client = TorClient::create_bootstrapped(config) - .await - .map_err(|e| format!("Tor bootstrap failed: {e}"))?; + // fix 3.3 — wrap in a timeout so a firewalled network (where Tor traffic + // is silently dropped) does not cause the task to stall indefinitely with + // TorStatus::Starting showing in the dashboard. + let tor_client = + tokio::time::timeout(BOOTSTRAP_TIMEOUT, TorClient::create_bootstrapped(config)) + .await + .map_err(|_| { + format!( + "Tor bootstrap timed out after {}s — check network connectivity", + BOOTSTRAP_TIMEOUT.as_secs() + ) + })? + .map_err(|e| format!("Tor bootstrap failed: {e}"))?; log::info!("Tor: connected to the Tor network"); @@ -162,18 +282,29 @@ async fn run( // each other. Dropping the task naturally closes the Tor circuit. let mut stream_requests = handle_rend_requests(rend_requests); + // fix 3.2 — the semaphore bounds active concurrent proxied streams. + // If `acquire_owned` ever returns Err it means `semaphore` was explicitly + // closed, which we now do when exiting via the shutdown arm so the + // acquire branch is always reachable (fix 3.5). let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(256)); - // 2.10 — use select! so a shutdown signal can break the accept loop cleanly, - // instead of blocking indefinitely in stream_requests.next(). + // 2.10 — use select! so a shutdown signal can break the accept loop + // cleanly, instead of blocking indefinitely in stream_requests.next(). loop { tokio::select! { next = stream_requests.next() => { if let Some(stream_req) = next { - let local_addr = format!("127.0.0.1:{bind_port}"); - let Ok(permit) = std::sync::Arc::clone(&semaphore).acquire_owned().await else { - break; // semaphore closed - }; + // fix 3.6 — derive local address from the actual bind address, + // not a hardcoded "127.0.0.1", so IPv6 bind configs work. + let local_addr = format!("{bind_addr}:{bind_port}"); + + // fix 3.5 — propagate semaphore errors rather than silently + // breaking; `acquire_owned` only fails if closed explicitly. + let permit = std::sync::Arc::clone(&semaphore) + .acquire_owned() + .await + .map_err(|e| format!("semaphore closed unexpectedly: {e}"))?; + tokio::spawn(async move { let _permit = permit; if let Err(e) = proxy_stream(stream_req, &local_addr).await { @@ -184,20 +315,21 @@ async fn run( } else { // The onion service stream ended unexpectedly (Tor network // disruption, Arti internal error, resource exhaustion). - // Flip the dashboard to Failed so the operator sees a clear - // signal rather than a permanently green READY badge. + // Return Ok(true) so init()'s retry loop can re-bootstrap + // instead of dying permanently (fix 3.4). log::warn!( - "Tor: stream_requests stream ended — onion service is no longer active" + "Tor: stream_requests stream ended — will attempt re-bootstrap" ); - // 2.9 — use Failed(String) with a human-readable reason - set_status(&state, TorStatus::Failed("stream ended".into())).await; state.write().await.onion_address = None; - return Ok(()); + return Ok(true); // signal: retry } } _ = shutdown.changed() => { if *shutdown.borrow() { log::info!("Tor: shutdown signal received — stopping stream loop"); + // fix 3.5 — close the semaphore so any in-progress + // acquire_owned() call in the `next` arm returns immediately. + semaphore.close(); break; } } @@ -206,7 +338,7 @@ async fn run( // Clean shutdown: clear the displayed onion address. state.write().await.onion_address = None; - Ok(()) + Ok(false) // signal: do not retry } // ─── Stream proxying ───────────────────────────────────────────────────────── @@ -224,13 +356,46 @@ async fn run( /// `DataStream` implements `tokio::io::AsyncRead + AsyncWrite` when the /// `tokio` feature is enabled on `arti-client`, so `copy_bidirectional` /// works with no adapter needed. +/// +/// fix 3.1 — both the local connect and the bidirectional copy are wrapped in +/// timeouts to prevent stalled connections from holding semaphore permits +/// indefinitely and exhausting the connection pool. async fn proxy_stream( stream_req: StreamRequest, local_addr: &str, ) -> Result<(), Box> { let mut tor_stream = stream_req.accept(Connected::new_empty()).await?; - let mut local = TcpStream::connect(local_addr).await?; - tokio::io::copy_bidirectional(&mut tor_stream, &mut local).await?; + + // fix 3.1a — bound the time spent waiting for the local server to accept. + // If the HTTP server is wedged or still starting, we release the permit + // quickly rather than holding it until the OS TCP timeout fires. + let mut local = tokio::time::timeout(LOCAL_CONNECT_TIMEOUT, TcpStream::connect(local_addr)) + .await + .map_err(|_| { + format!( + "timed out connecting to local server at {local_addr} \ + after {}s", + LOCAL_CONNECT_TIMEOUT.as_secs() + ) + })? + .map_err(|e| format!("local connect to {local_addr} failed: {e}"))?; + + // fix 3.2 — cap the total wall-clock lifetime of a proxied stream. + // A slow or adversarially idle client cannot hold a permit forever; after + // STREAM_MAX_LIFETIME the connection is closed from our side. + tokio::time::timeout( + STREAM_MAX_LIFETIME, + tokio::io::copy_bidirectional(&mut tor_stream, &mut local), + ) + .await + .map_err(|_| { + format!( + "stream lifetime exceeded {}s — closing", + STREAM_MAX_LIFETIME.as_secs() + ) + })? // timeout → Err + .map_err(|e| format!("bidirectional copy failed: {e}"))?; // io::Error → Err + Ok(()) } @@ -274,12 +439,17 @@ pub(crate) fn onion_address_from_pubkey(pubkey: &[u8; 32]) -> String { // ADDRESS_BYTES = PUBKEY (32) || CHECKSUM (2) || VERSION (1) = 35 bytes let mut address_bytes = [0u8; 35]; address_bytes[..32].copy_from_slice(pubkey); - // Consume the first two checksum bytes via an iterator — clippy cannot - // prove at compile time that a GenericArray has >= 2 elements, so direct - // indexing triggers `indexing_slicing`. SHA3-256 always produces 32 bytes. - let mut hash_iter = hash.iter().copied(); - address_bytes[32] = hash_iter.next().unwrap_or(0); - address_bytes[33] = hash_iter.next().unwrap_or(0); + + // fix 3.8 — index hash directly rather than via a fallible iterator. + // SHA3-256 always produces exactly 32 bytes; the GenericArray is + // guaranteed to have indices 0 and 1. The `indexing_slicing` lint + // fires here because clippy cannot prove the length at compile time for + // GenericArray, so we suppress it with a targeted allow. + #[allow(clippy::indexing_slicing)] + { + address_bytes[32] = hash[0]; + address_bytes[33] = hash[1]; + } address_bytes[34] = version; // RFC 4648 base32, no padding, lowercase → 56 characters @@ -327,11 +497,12 @@ mod tests { let mut bytes = [0u8; 35]; bytes[..32].copy_from_slice(pubkey); - // Use iterator instead of direct indexing to avoid clippy::indexing_slicing. - // SHA3-256 always produces 32 bytes, so next() will never return None. - let mut it = hash.iter().copied(); - bytes[32] = it.next().unwrap_or(0); - bytes[33] = it.next().unwrap_or(0); + // SHA3-256 always produces 32 bytes; direct indexing is safe. + #[allow(clippy::indexing_slicing)] + { + bytes[32] = hash[0]; + bytes[33] = hash[1]; + } bytes[34] = version; format!("{}.onion", BASE32_NOPAD.encode(&bytes).to_ascii_lowercase())