From f8420c7fe2c15524ccb4d67e587bd6e205656a6d Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 11:40:33 +0800 Subject: [PATCH 01/11] feat: add `CancellationToken` support for `spawn_inherited` and `spawn_with_tracking` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Accept a `tokio_util::sync::CancellationToken` in the spawn pipeline so in-flight child processes can be killed when cancellation is signalled. For fspy-tracked processes, the token is passed into fspy's background task which selects between `child.wait()` and `token.cancelled()`. For plain tokio processes, `spawn_with_tracking` spawns its own background task with the same pattern. Both kill the child via `Child::start_kill()` then await normal exit — no PID-based killing. The read loop needs no cancellation branch: killing the child closes its pipes and reads return EOF naturally. `spawn_inherited` uses the select pattern directly since it has no read loop. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 3 + crates/fspy/Cargo.toml | 1 + crates/fspy/examples/cli.rs | 2 +- crates/fspy/src/command.rs | 8 ++- crates/fspy/src/unix/mod.rs | 15 +++- crates/fspy/src/windows/mod.rs | 15 +++- crates/fspy/tests/cancellation.rs | 32 +++++++++ crates/fspy/tests/node_fs.rs | 2 +- crates/fspy/tests/oxlint.rs | 2 +- crates/fspy/tests/static_executable.rs | 2 +- crates/fspy/tests/test_utils/mod.rs | 6 +- crates/fspy_e2e/Cargo.toml | 1 + crates/fspy_e2e/src/main.rs | 3 +- crates/vite_task/Cargo.toml | 1 + crates/vite_task/src/session/execute/mod.rs | 32 +++++++-- crates/vite_task/src/session/execute/spawn.rs | 69 ++++++++++++------- crates/vite_task/src/session/mod.rs | 1 + 17 files changed, 152 insertions(+), 43 deletions(-) create mode 100644 crates/fspy/tests/cancellation.rs diff --git a/Cargo.lock b/Cargo.lock index cc0b85be..0dbe53ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1176,6 +1176,7 @@ dependencies = [ "test-log", "thiserror 2.0.18", "tokio", + "tokio-util", "which", "winapi", "winsafe 0.0.24", @@ -1200,6 +1201,7 @@ dependencies = [ "rustc-hash", "serde", "tokio", + "tokio-util", "toml", ] @@ -3883,6 +3885,7 @@ dependencies = [ "tempfile", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", "twox-hash", "vite_path", diff --git a/crates/fspy/Cargo.toml b/crates/fspy/Cargo.toml index abcc4b4c..7269f0f9 100644 --- a/crates/fspy/Cargo.toml +++ b/crates/fspy/Cargo.toml @@ -20,6 +20,7 @@ rustc-hash = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["net", "process", "io-util", "sync", "rt"] } +tokio-util = { workspace = true } which = { workspace = true, features = ["tracing"] } xxhash-rust = { workspace = true } diff --git a/crates/fspy/examples/cli.rs b/crates/fspy/examples/cli.rs index 1de519c0..4ae34790 100644 --- a/crates/fspy/examples/cli.rs +++ b/crates/fspy/examples/cli.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let mut command = fspy::Command::new(program); command.envs(std::env::vars_os()).args(args); - let child = command.spawn().await?; + let child = command.spawn(tokio_util::sync::CancellationToken::new()).await?; let termination = child.wait_handle.await?; let mut path_count = 0usize; diff --git a/crates/fspy/src/command.rs b/crates/fspy/src/command.rs index 5e9900c9..2a72c67d 100644 --- a/crates/fspy/src/command.rs +++ b/crates/fspy/src/command.rs @@ -8,6 +8,7 @@ use std::{ use fspy_shared_unix::exec::Exec; use rustc_hash::FxHashMap; use tokio::process::Command as TokioCommand; +use tokio_util::sync::CancellationToken; use crate::{SPY_IMPL, TrackedChild, error::SpawnError}; @@ -167,9 +168,12 @@ impl Command { /// # Errors /// /// Returns [`SpawnError`] if program resolution fails or the process cannot be spawned. - pub async fn spawn(mut self) -> Result { + pub async fn spawn( + mut self, + cancellation_token: CancellationToken, + ) -> Result { self.resolve_program()?; - SPY_IMPL.spawn(self).await + SPY_IMPL.spawn(self, cancellation_token).await } /// Resolve program name to full path using `PATH` and cwd. diff --git a/crates/fspy/src/unix/mod.rs b/crates/fspy/src/unix/mod.rs index 6e77f9cb..7f946005 100644 --- a/crates/fspy/src/unix/mod.rs +++ b/crates/fspy/src/unix/mod.rs @@ -22,6 +22,7 @@ use futures_util::FutureExt; #[cfg(target_os = "linux")] use syscall_handler::SyscallHandler; use tokio::task::spawn_blocking; +use tokio_util::sync::CancellationToken; use crate::{ ChildTermination, Command, TrackedChild, @@ -80,7 +81,11 @@ impl SpyImpl { }) } - pub(crate) async fn spawn(&self, mut command: Command) -> Result { + pub(crate) async fn spawn( + &self, + mut command: Command, + cancellation_token: CancellationToken, + ) -> Result { #[cfg(target_os = "linux")] let supervisor = supervise::().map_err(SpawnError::Supervisor)?; @@ -143,7 +148,13 @@ impl SpyImpl { // Keep polling for the child to exit in the background even if `wait_handle` is not awaited, // because we need to stop the supervisor and lock the channel as soon as the child exits. wait_handle: tokio::spawn(async move { - let status = child.wait().await?; + let status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; let arenas = std::iter::once(exec_resolve_accesses); // Stop the supervisor and collect path accesses from it. diff --git a/crates/fspy/src/windows/mod.rs b/crates/fspy/src/windows/mod.rs index b4f1c75e..b882db63 100644 --- a/crates/fspy/src/windows/mod.rs +++ b/crates/fspy/src/windows/mod.rs @@ -13,6 +13,7 @@ use fspy_shared::{ windows::{PAYLOAD_ID, Payload}, }; use futures_util::FutureExt; +use tokio_util::sync::CancellationToken; use winapi::{ shared::minwindef::TRUE, um::{processthreadsapi::ResumeThread, winbase::CREATE_SUSPENDED}, @@ -73,7 +74,11 @@ impl SpyImpl { } #[expect(clippy::unused_async, reason = "async signature required by SpyImpl trait")] - pub(crate) async fn spawn(&self, mut command: Command) -> Result { + pub(crate) async fn spawn( + &self, + mut command: Command, + cancellation_token: CancellationToken, + ) -> Result { let ansi_dll_path_with_nul = Arc::clone(&self.ansi_dll_path_with_nul); command.env("FSPY", "1"); let mut command = command.into_tokio_command(); @@ -142,7 +147,13 @@ impl SpyImpl { // Keep polling for the child to exit in the background even if `wait_handle` is not awaited, // because we need to stop the supervisor and lock the channel as soon as the child exits. wait_handle: tokio::spawn(async move { - let status = child.wait().await?; + let status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; // Lock the ipc channel after the child has exited. // We are not interested in path accesses from descendants after the main child has exited. let ipc_receiver_lock_guard = OwnedReceiverLockGuard::lock_async(receiver).await?; diff --git a/crates/fspy/tests/cancellation.rs b/crates/fspy/tests/cancellation.rs new file mode 100644 index 00000000..ff65cf2c --- /dev/null +++ b/crates/fspy/tests/cancellation.rs @@ -0,0 +1,32 @@ +use std::process::Stdio; + +use tokio::io::AsyncReadExt as _; +use tokio_util::sync::CancellationToken; + +#[test_log::test(tokio::test)] +async fn cancellation_kills_tracked_child() -> anyhow::Result<()> { + let cmd = subprocess_test::command_for_fn!((), |()| { + use std::io::Write as _; + // Signal readiness via stdout + std::io::stdout().write_all(b"ready\n").unwrap(); + std::io::stdout().flush().unwrap(); + // Block on stdin — will be killed by cancellation + let _ = std::io::stdin().read_line(&mut String::new()); + }); + let token = CancellationToken::new(); + let mut fspy_cmd = fspy::Command::from(cmd); + fspy_cmd.stdout(Stdio::piped()).stdin(Stdio::piped()); + let mut child = fspy_cmd.spawn(token.clone()).await?; + + // Wait for child to signal readiness + let mut stdout = child.stdout.take().unwrap(); + let mut buf = vec![0u8; 64]; + let n = stdout.read(&mut buf).await?; + assert!(std::str::from_utf8(&buf[..n])?.contains("ready")); + + // Cancel — fspy background task calls start_kill + token.cancel(); + let termination = child.wait_handle.await?; + assert!(!termination.status.success()); + Ok(()) +} diff --git a/crates/fspy/tests/node_fs.rs b/crates/fspy/tests/node_fs.rs index bbbb2bb5..6f2afb95 100644 --- a/crates/fspy/tests/node_fs.rs +++ b/crates/fspy/tests/node_fs.rs @@ -16,7 +16,7 @@ async fn track_node_script(script: &str, args: &[&OsStr]) -> anyhow::Result anyhow::Result) -> PathAccessIterable cmd.current_dir(cwd); } cmd.args(args); - let tracked_child = cmd.spawn().await.unwrap(); + let tracked_child = cmd.spawn(tokio_util::sync::CancellationToken::new()).await.unwrap(); let termination = tracked_child.wait_handle.await.unwrap(); assert!(termination.status.success()); diff --git a/crates/fspy/tests/test_utils/mod.rs b/crates/fspy/tests/test_utils/mod.rs index 20e7a2c4..cfa46c4a 100644 --- a/crates/fspy/tests/test_utils/mod.rs +++ b/crates/fspy/tests/test_utils/mod.rs @@ -78,7 +78,11 @@ macro_rules! track_fn { )] #[allow(dead_code, reason = "used by track_fn! macro; not all test files use this macro")] pub async fn spawn_command(cmd: subprocess_test::Command) -> anyhow::Result { - let termination = fspy::Command::from(cmd).spawn().await?.wait_handle.await?; + let termination = fspy::Command::from(cmd) + .spawn(tokio_util::sync::CancellationToken::new()) + .await? + .wait_handle + .await?; assert!(termination.status.success()); Ok(termination.path_accesses) } diff --git a/crates/fspy_e2e/Cargo.toml b/crates/fspy_e2e/Cargo.toml index db8833ee..90ece05e 100644 --- a/crates/fspy_e2e/Cargo.toml +++ b/crates/fspy_e2e/Cargo.toml @@ -9,6 +9,7 @@ fspy = { workspace = true } rustc-hash = { workspace = true } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } toml = { workspace = true } [lints] diff --git a/crates/fspy_e2e/src/main.rs b/crates/fspy_e2e/src/main.rs index 8e9f1c59..9d6a3052 100644 --- a/crates/fspy_e2e/src/main.rs +++ b/crates/fspy_e2e/src/main.rs @@ -84,7 +84,8 @@ async fn main() { .stderr(Stdio::piped()) .current_dir(&dir); - let mut tracked_child = cmd.spawn().await.unwrap(); + let mut tracked_child = + cmd.spawn(tokio_util::sync::CancellationToken::new()).await.unwrap(); let mut stdout_bytes = Vec::::new(); tracked_child.stdout.take().unwrap().read_to_end(&mut stdout_bytes).await.unwrap(); diff --git a/crates/vite_task/Cargo.toml b/crates/vite_task/Cargo.toml index 6fedfaa3..28ebbe3b 100644 --- a/crates/vite_task/Cargo.toml +++ b/crates/vite_task/Cargo.toml @@ -30,6 +30,7 @@ serde = { workspace = true, features = ["derive", "rc"] } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "io-std", "io-util", "macros", "sync"] } +tokio-util = { workspace = true } tracing = { workspace = true } twox-hash = { workspace = true } vite_path = { workspace = true } diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index f520ed35..85b169d1 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -5,6 +5,7 @@ pub mod spawn; use std::{collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; use futures_util::FutureExt; +use tokio_util::sync::CancellationToken; use vite_path::AbsolutePath; use vite_task_plan::{ ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnCommand, @@ -57,6 +58,8 @@ struct ExecutionContext<'a> { /// Base path for resolving relative paths in cache entries. /// Typically the workspace root. cache_base_path: &'a Arc, + /// Token for cancelling in-flight child processes. + cancellation_token: CancellationToken, } impl ExecutionContext<'_> { @@ -141,9 +144,14 @@ impl ExecutionContext<'_> { clippy::large_futures, reason = "spawn execution with cache management creates large futures" )] - let outcome = - execute_spawn(leaf_reporter, spawn_execution, self.cache, self.cache_base_path) - .await; + let outcome = execute_spawn( + leaf_reporter, + spawn_execution, + self.cache, + self.cache_base_path, + self.cancellation_token.clone(), + ) + .await; match outcome { SpawnOutcome::CacheHit => false, SpawnOutcome::Spawned(status) => !status.success(), @@ -178,6 +186,7 @@ pub async fn execute_spawn( spawn_execution: &SpawnExecution, cache: &ExecutionCache, cache_base_path: &Arc, + cancellation_token: CancellationToken, ) -> SpawnOutcome { let cache_metadata = spawn_execution.cache_metadata.as_ref(); @@ -270,7 +279,7 @@ pub async fn execute_spawn( // while the child also writes to the same FD. drop(stdio_config); - match spawn_inherited(&spawn_execution.spawn_command).await { + match spawn_inherited(&spawn_execution.spawn_command, cancellation_token).await { Ok(result) => { leaf_reporter.finish( Some(result.exit_status), @@ -341,6 +350,7 @@ pub async fn execute_spawn( std_outputs.as_mut(), path_accesses.as_mut(), &resolved_negatives, + cancellation_token, ) .await { @@ -438,7 +448,10 @@ pub async fn execute_spawn( /// The child process will see `is_terminal() == true` for stdout/stderr when the /// parent is running in a terminal. This is expected behavior. #[tracing::instrument(level = "debug", skip_all)] -async fn spawn_inherited(spawn_command: &SpawnCommand) -> anyhow::Result { +async fn spawn_inherited( + spawn_command: &SpawnCommand, + cancellation_token: CancellationToken, +) -> anyhow::Result { let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); cmd.envs(spawn_command.all_envs.iter()); @@ -480,7 +493,13 @@ async fn spawn_inherited(spawn_command: &SpawnCommand) -> anyhow::Result status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; Ok(SpawnResult { exit_status, duration: start.elapsed() }) } @@ -518,6 +537,7 @@ impl Session<'_> { reporter: &mut *reporter, cache, cache_base_path: &self.workspace_path, + cancellation_token: CancellationToken::new(), }; // Execute the graph with fast-fail: if any task fails, remaining tasks diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 99ebae87..ad93a78f 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -12,6 +12,7 @@ use fspy::AccessMode; use rustc_hash::FxHashSet; use serde::Serialize; use tokio::io::AsyncReadExt as _; +use tokio_util::sync::CancellationToken; use vite_path::{AbsolutePath, RelativePathBuf}; use vite_task_plan::SpawnCommand; use wax::Program as _; @@ -70,6 +71,10 @@ pub struct TrackedPathAccesses { clippy::too_many_lines, reason = "spawn logic is inherently sequential and splitting would reduce clarity" )] +#[expect( + clippy::too_many_arguments, + reason = "spawn parameters are all distinct concerns that don't form a natural group" +)] pub async fn spawn_with_tracking( spawn_command: &SpawnCommand, workspace_root: &AbsolutePath, @@ -78,15 +83,19 @@ pub async fn spawn_with_tracking( std_outputs: Option<&mut Vec>, path_accesses: Option<&mut TrackedPathAccesses>, resolved_negatives: &[wax::Glob<'static>], + cancellation_token: CancellationToken, ) -> anyhow::Result { - /// The tracking state of the spawned process. - /// Determined by whether `path_accesses` is `Some` (fspy enabled) or `None` (fspy disabled). - enum TrackingState { - /// fspy tracking is enabled + /// How the child process is awaited after stdout/stderr are drained. + /// + /// Both variants run a background task that monitors the cancellation token + /// and kills the child when cancelled. The read loop needs no cancellation + /// branch — killing the child closes its pipes, which makes reads return EOF. + enum WaitState { + /// fspy tracking enabled — background task managed by fspy. FspyEnabled(fspy::TrackedChild), - /// fspy tracking is disabled, using plain tokio process - FspyDisabled(tokio::process::Child), + /// Plain tokio process — we spawn our own cancellation-aware background task. + TokioChild(tokio::task::JoinHandle>), } let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); @@ -95,21 +104,27 @@ pub async fn spawn_with_tracking( cmd.current_dir(&*spawn_command.cwd); cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); - let mut tracking_state = if path_accesses.is_some() { - // path_accesses is Some, spawn with fspy tracking enabled - TrackingState::FspyEnabled(cmd.spawn().await?) + let (mut child_stdout, mut child_stderr, wait_state) = if path_accesses.is_some() { + // fspy tracking enabled — fspy's background task handles cancellation + let mut tracked_child = cmd.spawn(cancellation_token).await?; + let stdout = tracked_child.stdout.take().unwrap(); + let stderr = tracked_child.stderr.take().unwrap(); + (stdout, stderr, WaitState::FspyEnabled(tracked_child)) } else { - // path_accesses is None, spawn without fspy - TrackingState::FspyDisabled(cmd.into_tokio_command().spawn()?) - }; - - let mut child_stdout = match &mut tracking_state { - TrackingState::FspyEnabled(tracked_child) => tracked_child.stdout.take().unwrap(), - TrackingState::FspyDisabled(tokio_child) => tokio_child.stdout.take().unwrap(), - }; - let mut child_stderr = match &mut tracking_state { - TrackingState::FspyEnabled(tracked_child) => tracked_child.stderr.take().unwrap(), - TrackingState::FspyDisabled(tokio_child) => tokio_child.stderr.take().unwrap(), + // No fspy — spawn a background task that waits for exit or cancellation + let mut child = cmd.into_tokio_command().spawn()?; + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + let wait_handle = tokio::spawn(async move { + tokio::select! { + status = child.wait() => status, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await + } + } + }); + (stdout, stderr, WaitState::TokioChild(wait_handle)) }; // Output capturing is independent of fspy tracking @@ -122,6 +137,9 @@ pub async fn spawn_with_tracking( let start = Instant::now(); // Read from both stdout and stderr concurrently using select! + // No cancellation branch needed — both WaitState variants run a background task + // that kills the child on cancellation, which closes the pipes and makes reads + // return EOF naturally. loop { tokio::select! { result = child_stdout.read(&mut stdout_buf), if !stdout_done => { @@ -170,9 +188,10 @@ pub async fn spawn_with_tracking( } } - // Wait for process termination and process path accesses if fspy was enabled - let (termination, path_accesses) = match tracking_state { - TrackingState::FspyEnabled(tracked_child) => { + // Wait for process termination. Both variants' background tasks handle + // cancellation internally, so these awaits need no additional select. + let (termination, path_accesses) = match wait_state { + WaitState::FspyEnabled(tracked_child) => { let termination = tracked_child.wait_handle.await?; // path_accesses must be Some when fspy is enabled (they're set together) let path_accesses = path_accesses.ok_or_else(|| { @@ -180,8 +199,8 @@ pub async fn spawn_with_tracking( })?; (termination, path_accesses) } - TrackingState::FspyDisabled(mut tokio_child) => { - let exit_status = tokio_child.wait().await?; + WaitState::TokioChild(wait_handle) => { + let exit_status = wait_handle.await.map_err(|err| anyhow::anyhow!(err))??; return Ok(SpawnResult { exit_status, duration: start.elapsed() }); } }; diff --git a/crates/vite_task/src/session/mod.rs b/crates/vite_task/src/session/mod.rs index 1c0b58a7..c378794f 100644 --- a/crates/vite_task/src/session/mod.rs +++ b/crates/vite_task/src/session/mod.rs @@ -630,6 +630,7 @@ impl<'a> Session<'a> { &spawn_execution, cache, &self.workspace_path, + tokio_util::sync::CancellationToken::new(), ) .await; match outcome { From 3cfe69eb0b51b1d797137d152940396e0f56dffe Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 19:17:07 +0800 Subject: [PATCH 02/11] feat: concurrent task execution with DAG scheduler Replace the sequential execution loop with a concurrent DAG scheduler. Independent tasks now run in parallel, bounded by a per-graph semaphore (limit 10). Failure cancels all in-flight tasks via CancellationToken. - Use FuturesUnordered + tokio::sync::Semaphore for bounded concurrency - Each nested ExecutionGraph (Expanded items) gets its own semaphore - Wrap reporter in RefCell for shared access from concurrent futures - On failure, close semaphore so pending acquires fail immediately - Add `barrier` test tool (fs.watch-based cross-platform barrier) - Add e2e tests proving concurrency (barrier) and kill-on-failure - Stabilize existing e2e tests by adding deps between independent packages Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 1 + crates/vite_task/Cargo.toml | 1 + crates/vite_task/src/session/execute/mod.rs | 185 ++++++++++++------ .../concurrent-execution/package.json | 4 + .../packages/a/package.json | 7 + .../packages/b/package.json | 7 + .../concurrent-execution/pnpm-workspace.yaml | 2 + .../concurrent-execution/snapshots.toml | 16 ++ .../failure kills concurrent tasks.snap | 11 ++ .../independent tasks run concurrently.snap | 11 ++ .../concurrent-execution/vite-task.json | 3 + .../fixtures/grouped-stdio/package.json | 5 +- .../packages/rw-pkg/package.json | 2 +- .../packages/touch-pkg/package.json | 3 + .../fixtures/interleaved-stdio/package.json | 5 +- .../fixtures/labeled-stdio/package.json | 5 +- packages/tools/package.json | 1 + packages/tools/src/barrier.js | 68 +++++++ 18 files changed, 276 insertions(+), 61 deletions(-) create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json create mode 100755 packages/tools/src/barrier.js diff --git a/Cargo.lock b/Cargo.lock index 0dbe53ea..fb99639a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3876,6 +3876,7 @@ dependencies = [ "nix 0.30.1", "once_cell", "owo-colors", + "petgraph", "pty_terminal_test_client", "rayon", "rusqlite", diff --git a/crates/vite_task/Cargo.toml b/crates/vite_task/Cargo.toml index 28ebbe3b..1b4861ec 100644 --- a/crates/vite_task/Cargo.toml +++ b/crates/vite_task/Cargo.toml @@ -22,6 +22,7 @@ fspy = { workspace = true } futures-util = { workspace = true } once_cell = { workspace = true } owo-colors = { workspace = true } +petgraph = { workspace = true } pty_terminal_test_client = { workspace = true } rayon = { workspace = true } rusqlite = { workspace = true, features = ["bundled"] } diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 85b169d1..8dafbfb3 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -2,14 +2,17 @@ pub mod fingerprint; pub mod glob_inputs; pub mod spawn; -use std::{collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; +use std::{cell::RefCell, collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; -use futures_util::FutureExt; +use futures_util::{FutureExt, StreamExt, future::LocalBoxFuture, stream::FuturesUnordered}; +use petgraph::Direction; +use rustc_hash::FxHashMap; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use vite_path::AbsolutePath; use vite_task_plan::{ ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnCommand, - SpawnExecution, + SpawnExecution, execution_graph::ExecutionNodeIndex, }; use self::{ @@ -46,13 +49,22 @@ pub enum SpawnOutcome { Failed, } -/// Holds mutable references needed during graph execution. +/// Maximum number of tasks that can execute concurrently within a single +/// execution graph level. +const CONCURRENCY_LIMIT: usize = 10; + +/// Holds shared references needed during graph execution. +/// +/// The `reporter` field is wrapped in `RefCell` because concurrent futures +/// (via `FuturesUnordered`) need shared access to create leaf reporters. +/// Since all futures run on a single thread (no `tokio::spawn`), `RefCell` +/// is sufficient for interior mutability. /// -/// The `reporter` field is used to create leaf reporters for individual executions. /// Cache fields are passed through to [`execute_spawn`] for cache-aware execution. struct ExecutionContext<'a> { /// The graph-level reporter, used to create leaf reporters via `new_leaf_execution()`. - reporter: &'a mut dyn GraphExecutionReporter, + /// Wrapped in `RefCell` for shared access from concurrent task futures. + reporter: &'a RefCell>, /// The execution cache for looking up and storing cached results. cache: &'a ExecutionCache, /// Base path for resolving relative paths in cache entries. @@ -63,65 +75,121 @@ struct ExecutionContext<'a> { } impl ExecutionContext<'_> { - /// Execute all tasks in an execution graph in dependency order. - /// - /// `ExecutionGraph` guarantees acyclicity at construction time. - /// We compute a topological order and iterate in reverse to get execution order - /// (dependencies before dependents). + /// Execute all tasks in an execution graph concurrently, respecting dependencies. /// - /// Fast-fail: if any task fails (non-zero exit or infrastructure error), remaining - /// tasks and `&&`-chained items are skipped. Leaf-level errors are reported through - /// the reporter. Cycle detection is handled at plan time. + /// Uses a DAG scheduler: tasks whose dependencies have all completed are scheduled + /// onto a `FuturesUnordered`, bounded by a per-graph `Semaphore` with + /// [`CONCURRENCY_LIMIT`] permits. Each recursive `Expanded` graph creates its own + /// semaphore, so nested graphs have independent concurrency limits. /// - /// Returns `true` if all tasks succeeded, `false` if any task failed. + /// Fast-fail: if any task fails, `execute_leaf` cancels the `CancellationToken` + /// (killing in-flight child processes). This method detects the cancellation, + /// closes the semaphore, drains remaining futures, and returns. #[tracing::instrument(level = "debug", skip_all)] - async fn execute_expanded_graph(&mut self, graph: &ExecutionGraph) -> bool { - // `compute_topological_order()` returns nodes in topological order: for every - // edge A→B, A appears before B. Since our edges mean "A depends on B", - // dependencies (B) appear after their dependents (A). We iterate in reverse - // to get execution order where dependencies run first. - - // Execute tasks in dependency-first order. Each task may have multiple items - // (from `&&`-split commands), which are executed sequentially. - // If any task fails, subsequent tasks and items are skipped (fast-fail). - let topo_order = graph.compute_topological_order(); - for &node_ix in topo_order.iter().rev() { - let task_execution = &graph[node_ix]; - - for item in &task_execution.items { - let failed = match &item.kind { - ExecutionItemKind::Leaf(leaf_kind) => { - self.execute_leaf(&item.execution_item_display, leaf_kind) - .boxed_local() - .await - } - ExecutionItemKind::Expanded(nested_graph) => { - !self.execute_expanded_graph(nested_graph).boxed_local().await - } - }; - if failed { - return false; + async fn execute_expanded_graph(&self, graph: &ExecutionGraph) { + if graph.node_count() == 0 { + return; + } + + let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); + + // Compute dependency count for each node. + // Edge A→B means "A depends on B", so A's dependency count = outgoing edge count. + let mut dep_count: FxHashMap = FxHashMap::default(); + for node_ix in graph.node_indices() { + dep_count.insert(node_ix, graph.neighbors(node_ix).count()); + } + + let mut futures = FuturesUnordered::new(); + + // Schedule initially ready nodes (no dependencies). + for (&node_ix, &count) in &dep_count { + if count == 0 { + futures.push(self.spawn_node(graph, node_ix, &semaphore)); + } + } + + // Process completions and schedule newly ready dependents. + // On failure, `execute_leaf` cancels the token — we detect it here, close + // the semaphore (so pending acquires fail immediately), and drain. + while let Some(completed_ix) = futures.next().await { + if self.cancellation_token.is_cancelled() { + semaphore.close(); + while futures.next().await.is_some() {} + return; + } + + // Find dependents of the completed node (nodes that depend on it). + // Edge X→completed means "X depends on completed", so X is a predecessor + // in graph direction = neighbor in Incoming direction. + for dependent in graph.neighbors_directed(completed_ix, Direction::Incoming) { + let count = dep_count.get_mut(&dependent).expect("all nodes are in dep_count"); + *count -= 1; + if *count == 0 { + futures.push(self.spawn_node(graph, dependent, &semaphore)); + } + } + } + } + + /// Create a future that acquires a semaphore permit, then executes a graph node. + /// + /// On failure, `execute_node` cancels the `CancellationToken` — the caller + /// detects this after the future completes. On semaphore closure or prior + /// cancellation, the node is skipped. + fn spawn_node<'a>( + &'a self, + graph: &'a ExecutionGraph, + node_ix: ExecutionNodeIndex, + semaphore: &Arc, + ) -> LocalBoxFuture<'a, ExecutionNodeIndex> { + let sem = semaphore.clone(); + async move { + if let Ok(_permit) = sem.acquire_owned().await { + if !self.cancellation_token.is_cancelled() { + self.execute_node(graph, node_ix).await; + } + } + node_ix + } + .boxed_local() + } + + /// Execute a single node's items sequentially. + /// + /// A node may have multiple items (from `&&`-split commands). Items are executed + /// in order; if any item fails, `execute_leaf` cancels the `CancellationToken` + /// and remaining items are skipped (preserving `&&` semantics). + async fn execute_node(&self, graph: &ExecutionGraph, node_ix: ExecutionNodeIndex) { + let task_execution = &graph[node_ix]; + + for item in &task_execution.items { + if self.cancellation_token.is_cancelled() { + return; + } + match &item.kind { + ExecutionItemKind::Leaf(leaf_kind) => { + self.execute_leaf(&item.execution_item_display, leaf_kind).boxed_local().await; + } + ExecutionItemKind::Expanded(nested_graph) => { + self.execute_expanded_graph(nested_graph).boxed_local().await; } } } - true } /// Execute a single leaf item (in-process command or spawned process). /// /// Creates a [`LeafExecutionReporter`] from the graph reporter and delegates - /// to the appropriate execution method. - /// - /// Returns `true` if the execution failed (non-zero exit or infrastructure error). + /// to the appropriate execution method. On failure (non-zero exit or + /// infrastructure error), cancels the `CancellationToken`. #[tracing::instrument(level = "debug", skip_all)] - async fn execute_leaf( - &mut self, - display: &ExecutionItemDisplay, - leaf_kind: &LeafExecutionKind, - ) -> bool { - let mut leaf_reporter = self.reporter.new_leaf_execution(display, leaf_kind); - - match leaf_kind { + async fn execute_leaf(&self, display: &ExecutionItemDisplay, leaf_kind: &LeafExecutionKind) { + // Borrow the reporter briefly to create the leaf reporter, then drop + // the RefCell guard before any `.await` point. + let mut leaf_reporter = self.reporter.borrow_mut().new_leaf_execution(display, leaf_kind); + + let failed = match leaf_kind { LeafExecutionKind::InProcess(in_process_execution) => { // In-process (built-in) commands: caching is disabled, execute synchronously let mut stdio_config = leaf_reporter @@ -158,6 +226,9 @@ impl ExecutionContext<'_> { SpawnOutcome::Failed => true, } } + }; + if failed { + self.cancellation_token.cancel(); } } } @@ -531,10 +602,10 @@ impl Session<'_> { } }; - let mut reporter = builder.build(); + let reporter = RefCell::new(builder.build()); - let mut execution_context = ExecutionContext { - reporter: &mut *reporter, + let execution_context = ExecutionContext { + reporter: &reporter, cache, cache_base_path: &self.workspace_path, cancellation_token: CancellationToken::new(), @@ -546,6 +617,6 @@ impl Session<'_> { // Leaf-level errors and non-zero exit statuses are tracked internally // by the reporter. - reporter.finish() + reporter.into_inner().finish() } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json new file mode 100644 index 00000000..e6a3b536 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/package.json @@ -0,0 +1,4 @@ +{ + "name": "concurrent-execution-test", + "private": true +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json new file mode 100644 index 00000000..c2ff92c4 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json @@ -0,0 +1,7 @@ +{ + "name": "@concurrent/a", + "scripts": { + "build": "barrier ../../.barrier sync 2", + "test": "barrier ../../.barrier test-sync 2 --exit=1" + } +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json new file mode 100644 index 00000000..0643ed5b --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json @@ -0,0 +1,7 @@ +{ + "name": "@concurrent/b", + "scripts": { + "build": "barrier ../../.barrier sync 2", + "test": "barrier ../../.barrier test-sync 2 --hang" + } +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml new file mode 100644 index 00000000..924b55f4 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/pnpm-workspace.yaml @@ -0,0 +1,2 @@ +packages: + - packages/* diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml new file mode 100644 index 00000000..1280b971 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml @@ -0,0 +1,16 @@ +# Tests that independent tasks execute concurrently. +# Packages a and b have no dependency relationship. +# Both use a barrier that requires 2 participants — if run sequentially, +# the first would wait forever and the test would timeout. + +[[e2e]] +name = "independent tasks run concurrently" +steps = ["vt run -r build"] + +# Both tasks use a single-command barrier (no && splitting). Task a exits with +# code 1 after the barrier, task b hangs on stdin after the barrier. Since both +# participate in the same barrier, b is guaranteed to be running when a fails. +# The test completing without timeout proves cancellation kills b. +[[e2e]] +name = "failure kills concurrent tasks" +steps = ["vt run -r test"] diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap new file mode 100644 index 00000000..6fd09500 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent tasks.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +[1]> vt run -r test +~/packages/a$ barrier ../../.barrier test-sync 2 --exit=1 ⊘ cache disabled +~/packages/b$ barrier ../../.barrier test-sync 2 --hang ⊘ cache disabled + + +--- +vt run: 0/2 cache hit (0%), 2 failed. (Run `vt run --last-details` for full details) diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap new file mode 100644 index 00000000..c8b10252 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/independent tasks run concurrently.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +> vt run -r build +~/packages/a$ barrier ../../.barrier sync 2 ⊘ cache disabled +~/packages/b$ barrier ../../.barrier sync 2 ⊘ cache disabled + + +--- +vt run: 0/2 cache hit (0%). (Run `vt run --last-details` for full details) diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json new file mode 100644 index 00000000..b39113d0 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/vite-task.json @@ -0,0 +1,3 @@ +{ + "cache": false +} diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json index 3850a3ac..21dde7e8 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/grouped-stdio/package.json @@ -1,4 +1,7 @@ { "name": "grouped-stdio-test", - "private": true + "private": true, + "dependencies": { + "other": "workspace:*" + } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json index 88a7b0e0..72ebe436 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/rw-pkg/package.json @@ -4,6 +4,6 @@ "task": "replace-file-content src/data.txt original modified" }, "dependencies": { - "@test/normal-pkg": "workspace:*" + "@test/touch-pkg": "workspace:*" } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json index a34c6a22..ebf215e5 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/input-read-write-not-cached/packages/touch-pkg/package.json @@ -2,5 +2,8 @@ "name": "@test/touch-pkg", "scripts": { "task": "touch-file src/data.txt" + }, + "dependencies": { + "@test/normal-pkg": "workspace:*" } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json index 59356dd6..85362482 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/interleaved-stdio/package.json @@ -1,4 +1,7 @@ { "name": "interleaved-stdio-test", - "private": true + "private": true, + "dependencies": { + "other": "workspace:*" + } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json index ee07db4a..8d842493 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/labeled-stdio/package.json @@ -1,4 +1,7 @@ { "name": "labeled-stdio-test", - "private": true + "private": true, + "dependencies": { + "other": "workspace:*" + } } diff --git a/packages/tools/package.json b/packages/tools/package.json index 90810f51..e1e6acc3 100644 --- a/packages/tools/package.json +++ b/packages/tools/package.json @@ -2,6 +2,7 @@ "name": "vite-task-tools", "private": true, "bin": { + "barrier": "./src/barrier.js", "check-tty": "./src/check-tty.js", "json-edit": "./src/json-edit.ts", "print": "./src/print.ts", diff --git a/packages/tools/src/barrier.js b/packages/tools/src/barrier.js new file mode 100755 index 00000000..7619e4bf --- /dev/null +++ b/packages/tools/src/barrier.js @@ -0,0 +1,68 @@ +#!/usr/bin/env node + +// barrier [--exit=] [--hang] +// +// Cross-platform concurrency barrier for testing. +// Creates /_, then waits (via fs.watch) for files +// matching _* to exist in . +// +// Options: +// --exit= Exit with the given code after the barrier is met. +// --hang Block on stdin after the barrier is met (for kill tests). +// +// If tasks run concurrently, all participants arrive and the barrier resolves. +// If tasks run sequentially, the first participant waits forever (test timeout). + +import fs from 'node:fs'; +import path from 'node:path'; + +const positional = []; +let exitCode = 0; +let hang = false; + +for (const arg of process.argv.slice(2)) { + if (arg.startsWith('--exit=')) { + exitCode = parseInt(arg.slice(7), 10); + } else if (arg === '--hang') { + hang = true; + } else { + positional.push(arg); + } +} + +const [dir, prefix, countStr] = positional; +const count = parseInt(countStr, 10); + +fs.mkdirSync(dir, { recursive: true }); + +// Create this participant's marker file. +const markerName = `${prefix}_${process.pid}`; +fs.writeFileSync(path.join(dir, markerName), ''); + +function countMatches() { + return fs.readdirSync(dir).filter((f) => f.startsWith(`${prefix}_`)).length; +} + +function onBarrierMet() { + if (hang) { + // Keep the process alive — killed via signal when the runner cancels. + process.stdin.resume(); + return; + } + process.exit(exitCode); +} + +// Start watching before the initial check to avoid missing events +// between the check and the watch setup. +const watcher = fs.watch(dir, () => { + if (countMatches() >= count) { + watcher.close(); + onBarrierMet(); + } +}); + +// Check immediately in case all participants already arrived. +if (countMatches() >= count) { + watcher.close(); + onBarrierMet(); +} From 4a5fbee0d2504d55c0858edd938b5d6e5608acc2 Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 19:39:42 +0800 Subject: [PATCH 03/11] fix: clippy collapsible_if and Windows barrier hang - Collapse nested if-let + if in spawn_node (clippy::collapsible_if) - Use setInterval instead of stdin.resume() in barrier --hang for cross-platform reliability (stdin.resume may not keep process alive on Windows in PTY environments) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 8 ++++---- packages/tools/src/barrier.js | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 8dafbfb3..2db1b55a 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -145,10 +145,10 @@ impl ExecutionContext<'_> { ) -> LocalBoxFuture<'a, ExecutionNodeIndex> { let sem = semaphore.clone(); async move { - if let Ok(_permit) = sem.acquire_owned().await { - if !self.cancellation_token.is_cancelled() { - self.execute_node(graph, node_ix).await; - } + if let Ok(_permit) = sem.acquire_owned().await + && !self.cancellation_token.is_cancelled() + { + self.execute_node(graph, node_ix).await; } node_ix } diff --git a/packages/tools/src/barrier.js b/packages/tools/src/barrier.js index 7619e4bf..3c96f390 100755 --- a/packages/tools/src/barrier.js +++ b/packages/tools/src/barrier.js @@ -45,8 +45,9 @@ function countMatches() { function onBarrierMet() { if (hang) { - // Keep the process alive — killed via signal when the runner cancels. - process.stdin.resume(); + // Keep the process alive indefinitely — killed via signal when the runner cancels. + // Use setInterval rather than stdin.resume() for cross-platform reliability. + setInterval(() => {}, 1 << 30); return; } process.exit(exitCode); From 9338dbdeda22656ddeacd2f9c3b87cbdc584bb61 Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 19:57:20 +0800 Subject: [PATCH 04/11] fix: kill process tree on Windows via Job Object On Windows, TerminateProcess only kills the direct child, leaving grandchildren (e.g., node.exe spawned by a .cmd shim) alive. This caused the "failure kills concurrent tasks" e2e test to timeout. Add a Win32 Job Object with JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE to spawn_inherited. The child process and all its descendants are assigned to the job; when the handle drops, the entire tree is killed. This makes the kill-on-failure test cross-platform (no platform split). Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 1 + crates/vite_task/Cargo.toml | 3 + crates/vite_task/src/session/execute/mod.rs | 102 ++++++++++++++++++ .../concurrent-execution/snapshots.toml | 2 +- 4 files changed, 107 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index fb99639a..ce0c3b03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3896,6 +3896,7 @@ dependencies = [ "vite_task_plan", "vite_workspace", "wax", + "winapi", ] [[package]] diff --git a/crates/vite_task/Cargo.toml b/crates/vite_task/Cargo.toml index 1b4861ec..6763b863 100644 --- a/crates/vite_task/Cargo.toml +++ b/crates/vite_task/Cargo.toml @@ -48,5 +48,8 @@ tempfile = { workspace = true } [target.'cfg(unix)'.dependencies] nix = { workspace = true } +[target.'cfg(windows)'.dependencies] +winapi = { workspace = true, features = ["handleapi", "jobapi2", "processthreadsapi", "winnt"] } + [lib] doctest = false diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 2db1b55a..a47928e5 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -564,6 +564,14 @@ async fn spawn_inherited( } let mut child = tokio_cmd.spawn()?; + + // On Windows, assign the child to a Job Object with KILL_ON_JOB_CLOSE so that + // all descendant processes (e.g., node.exe spawned by a .cmd shim) are killed + // when the job handle is dropped. Without this, TerminateProcess only kills the + // direct child, leaving grandchildren alive. + #[cfg(windows)] + let _job = win_job::assign_child_to_kill_on_close_job(&child)?; + let exit_status = tokio::select! { status = child.wait() => status?, () = cancellation_token.cancelled() => { @@ -575,6 +583,100 @@ async fn spawn_inherited( Ok(SpawnResult { exit_status, duration: start.elapsed() }) } +/// Win32 Job Object utilities for process tree management. +/// +/// On Windows, `TerminateProcess` only kills the direct child process, not its +/// descendants. This module creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, +/// which automatically terminates all processes in the job when the handle is dropped. +#[cfg(windows)] +mod win_job { + use std::io; + + use winapi::{ + shared::minwindef::{DWORD, FALSE}, + um::{ + handleapi::CloseHandle, + jobapi2::{AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject}, + processthreadsapi::OpenProcess, + winnt::{ + HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, + PROCESS_SET_QUOTA, PROCESS_TERMINATE, + }, + }, + }; + + /// RAII wrapper around a Win32 `HANDLE` that closes it on drop. + struct OwnedHandle(HANDLE); + + impl Drop for OwnedHandle { + fn drop(&mut self) { + // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW or OpenProcess. + unsafe { CloseHandle(self.0) }; + } + } + + /// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign the child process to it. + /// + /// Returns the job handle wrapped in an RAII guard. When dropped, all processes + /// in the job (the child and its descendants) are terminated. + pub fn assign_child_to_kill_on_close_job( + child: &tokio::process::Child, + ) -> io::Result { + let pid = child.id().ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "child process has no PID (already exited?)") + })?; + + // SAFETY: Creating an anonymous job object with no security attributes. + let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; + if job.is_null() { + return Err(io::Error::last_os_error()); + } + let job = OwnedHandle(job); + + // Configure the job to kill all processes when the handle is closed. + let mut info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION { + BasicLimitInformation: winapi::um::winnt::JOBOBJECT_BASIC_LIMIT_INFORMATION { + LimitFlags: JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, + ..unsafe { std::mem::zeroed() } + }, + ..unsafe { std::mem::zeroed() } + }; + + // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. + let ok = unsafe { + SetInformationJobObject( + job.0, + // JobObjectExtendedLimitInformation = 9 + 9, + std::ptr::from_mut(&mut info).cast(), + std::mem::size_of::().try_into().unwrap(), + ) + }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + // Open a handle to the child process with permissions needed for job assignment. + // SAFETY: pid is the process ID of the just-spawned child. + let process_handle = + unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, FALSE, pid) }; + if process_handle.is_null() { + return Err(io::Error::last_os_error()); + } + let process_handle = OwnedHandle(process_handle); + + // SAFETY: Both handles are valid — job from CreateJobObjectW, process from OpenProcess. + let ok = unsafe { AssignProcessToJobObject(job.0, process_handle.0) }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + // process_handle is dropped here (we only needed it for assignment). + // job handle is returned — when it drops, all processes in the job are killed. + Ok(job) + } +} + impl Session<'_> { /// Execute an execution graph, reporting events through the provided reporter builder. /// diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml index 1280b971..2f95e3f1 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml @@ -8,7 +8,7 @@ name = "independent tasks run concurrently" steps = ["vt run -r build"] # Both tasks use a single-command barrier (no && splitting). Task a exits with -# code 1 after the barrier, task b hangs on stdin after the barrier. Since both +# code 1 after the barrier, task b hangs after the barrier. Since both # participate in the same barrier, b is guaranteed to be running when a fails. # The test completing without timeout proves cancellation kills b. [[e2e]] From fd2406a039cb9e8d14a81931016bb965f681f32b Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 20:24:42 +0800 Subject: [PATCH 05/11] fix: Windows compilation errors in win_job module - Remove unused DWORD import - Use pub(super) visibility for OwnedHandle and assign_child_to_kill_on_close_job Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index a47928e5..b96b2824 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -593,7 +593,7 @@ mod win_job { use std::io; use winapi::{ - shared::minwindef::{DWORD, FALSE}, + shared::minwindef::FALSE, um::{ handleapi::CloseHandle, jobapi2::{AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject}, @@ -606,7 +606,7 @@ mod win_job { }; /// RAII wrapper around a Win32 `HANDLE` that closes it on drop. - struct OwnedHandle(HANDLE); + pub(super) struct OwnedHandle(HANDLE); impl Drop for OwnedHandle { fn drop(&mut self) { @@ -619,7 +619,7 @@ mod win_job { /// /// Returns the job handle wrapped in an RAII guard. When dropped, all processes /// in the job (the child and its descendants) are terminated. - pub fn assign_child_to_kill_on_close_job( + pub(super) fn assign_child_to_kill_on_close_job( child: &tokio::process::Child, ) -> io::Result { let pid = child.id().ok_or_else(|| { From ca53eb734561655368ab2dabf3e6273299ecfdbb Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 21:27:57 +0800 Subject: [PATCH 06/11] refactor: use raw_handle() instead of OpenProcess via PID Use the child's existing process handle directly via AsRawHandle instead of re-opening it by PID. Simpler, removes the OpenProcess call and PROCESS_SET_QUOTA/PROCESS_TERMINATE permissions. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/Cargo.toml | 2 +- crates/vite_task/src/session/execute/mod.rs | 40 ++++++++------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/crates/vite_task/Cargo.toml b/crates/vite_task/Cargo.toml index 6763b863..6f6dc571 100644 --- a/crates/vite_task/Cargo.toml +++ b/crates/vite_task/Cargo.toml @@ -49,7 +49,7 @@ tempfile = { workspace = true } nix = { workspace = true } [target.'cfg(windows)'.dependencies] -winapi = { workspace = true, features = ["handleapi", "jobapi2", "processthreadsapi", "winnt"] } +winapi = { workspace = true, features = ["handleapi", "jobapi2", "winnt"] } [lib] doctest = false diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index b96b2824..98f69611 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -590,27 +590,25 @@ async fn spawn_inherited( /// which automatically terminates all processes in the job when the handle is dropped. #[cfg(windows)] mod win_job { - use std::io; + use std::{io, os::windows::io::AsRawHandle}; use winapi::{ shared::minwindef::FALSE, um::{ handleapi::CloseHandle, jobapi2::{AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject}, - processthreadsapi::OpenProcess, winnt::{ HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, - PROCESS_SET_QUOTA, PROCESS_TERMINATE, }, }, }; - /// RAII wrapper around a Win32 `HANDLE` that closes it on drop. - pub(super) struct OwnedHandle(HANDLE); + /// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. + pub(super) struct OwnedJobHandle(HANDLE); - impl Drop for OwnedHandle { + impl Drop for OwnedJobHandle { fn drop(&mut self) { - // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW or OpenProcess. + // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. unsafe { CloseHandle(self.0) }; } } @@ -621,17 +619,13 @@ mod win_job { /// in the job (the child and its descendants) are terminated. pub(super) fn assign_child_to_kill_on_close_job( child: &tokio::process::Child, - ) -> io::Result { - let pid = child.id().ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "child process has no PID (already exited?)") - })?; - + ) -> io::Result { // SAFETY: Creating an anonymous job object with no security attributes. let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; if job.is_null() { return Err(io::Error::last_os_error()); } - let job = OwnedHandle(job); + let job = OwnedJobHandle(job); // Configure the job to kill all processes when the handle is closed. let mut info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION { @@ -656,23 +650,19 @@ mod win_job { return Err(io::Error::last_os_error()); } - // Open a handle to the child process with permissions needed for job assignment. - // SAFETY: pid is the process ID of the just-spawned child. - let process_handle = - unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, FALSE, pid) }; - if process_handle.is_null() { - return Err(io::Error::last_os_error()); - } - let process_handle = OwnedHandle(process_handle); + // Use the child's raw process handle directly — no need to OpenProcess via PID. + let process_handle = child.raw_handle().ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "child process has no handle (already exited?)") + })?; - // SAFETY: Both handles are valid — job from CreateJobObjectW, process from OpenProcess. - let ok = unsafe { AssignProcessToJobObject(job.0, process_handle.0) }; + // SAFETY: Both handles are valid — job from CreateJobObjectW, process from the + // just-spawned child. The child's handle is borrowed (not consumed), so tokio + // retains ownership for wait/kill. + let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; if ok == FALSE { return Err(io::Error::last_os_error()); } - // process_handle is dropped here (we only needed it for assignment). - // job handle is returned — when it drops, all processes in the job are killed. Ok(job) } } From 08734908f2ae0ebaf068dc527007a1b16cac2a7b Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 21:39:58 +0800 Subject: [PATCH 07/11] fix: apply Job Object to all spawn paths and add fspy kill test - Apply assign_to_kill_on_close_job to spawn_with_tracking (piped/fspy path) in addition to spawn_inherited, covering all spawn modes on Windows - Expose duplicated process_handle on fspy::TrackedChild (Windows) so callers can assign it to a Job Object without modifying fspy internals - Use DuplicateHandle (via try_clone_to_owned) so the handle stays valid after tokio closes its copy when the process exits - Add "failure kills concurrent cached tasks" e2e test exercising the --cache (piped stdio / fspy) path Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/fspy/src/lib.rs | 7 ++++ crates/fspy/src/windows/mod.rs | 12 ++++++ crates/vite_task/src/session/execute/mod.rs | 39 ++++++++++--------- crates/vite_task/src/session/execute/spawn.rs | 21 ++++++++++ .../concurrent-execution/snapshots.toml | 6 +++ ...failure kills concurrent cached tasks.snap | 11 ++++++ 6 files changed, 77 insertions(+), 19 deletions(-) create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap diff --git a/crates/fspy/src/lib.rs b/crates/fspy/src/lib.rs index 8a25ca06..70756003 100644 --- a/crates/fspy/src/lib.rs +++ b/crates/fspy/src/lib.rs @@ -54,6 +54,13 @@ pub struct TrackedChild { /// The future that resolves to exit status and path accesses when the process exits. pub wait_handle: BoxFuture<'static, io::Result>, + + /// A duplicated process handle of the child, captured before the tokio `Child` + /// is moved into the background wait task. This is an independently owned handle + /// (via `DuplicateHandle`) so it remains valid even after tokio closes its copy. + /// Callers can use this to assign the process to a Win32 Job Object. + #[cfg(windows)] + pub process_handle: std::os::windows::io::OwnedHandle, } pub(crate) static SPY_IMPL: LazyLock = LazyLock::new(|| { diff --git a/crates/fspy/src/windows/mod.rs b/crates/fspy/src/windows/mod.rs index b882db63..22560f40 100644 --- a/crates/fspy/src/windows/mod.rs +++ b/crates/fspy/src/windows/mod.rs @@ -140,10 +140,22 @@ impl SpyImpl { if *spawn_success { SpawnError::OsSpawn(err) } else { SpawnError::Injection(err) } })?; + // Duplicate the process handle before the child is moved into the background + // task. The duplicate is independently owned (its own ref count), so it stays + // valid even after tokio closes its copy when the process exits. + let process_handle = { + use std::os::windows::io::BorrowedHandle; + // SAFETY: The child was just spawned and hasn't been moved yet, so its + // raw handle is valid. `borrow_raw` creates a temporary borrow. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + borrowed.try_clone_to_owned().map_err(SpawnError::OsSpawn)? + }; + Ok(TrackedChild { stdin: child.stdin.take(), stdout: child.stdout.take(), stderr: child.stderr.take(), + process_handle, // Keep polling for the child to exit in the background even if `wait_handle` is not awaited, // because we need to stop the supervisor and lock the channel as soon as the child exits. wait_handle: tokio::spawn(async move { diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 98f69611..aa2c45f7 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -570,7 +570,14 @@ async fn spawn_inherited( // when the job handle is dropped. Without this, TerminateProcess only kills the // direct child, leaving grandchildren alive. #[cfg(windows)] - let _job = win_job::assign_child_to_kill_on_close_job(&child)?; + let _job = { + use std::os::windows::io::{AsRawHandle, BorrowedHandle}; + // Duplicate the process handle so the job outlives tokio's handle. + // SAFETY: The child was just spawned, so its raw handle is valid. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + let owned = borrowed.try_clone_to_owned()?; + win_job::assign_to_kill_on_close_job(owned.as_raw_handle())? + }; let exit_status = tokio::select! { status = child.wait() => status?, @@ -590,7 +597,7 @@ async fn spawn_inherited( /// which automatically terminates all processes in the job when the handle is dropped. #[cfg(windows)] mod win_job { - use std::{io, os::windows::io::AsRawHandle}; + use std::{io, os::windows::io::RawHandle}; use winapi::{ shared::minwindef::FALSE, @@ -613,12 +620,12 @@ mod win_job { } } - /// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign the child process to it. + /// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign a process to it. /// /// Returns the job handle wrapped in an RAII guard. When dropped, all processes /// in the job (the child and its descendants) are terminated. - pub(super) fn assign_child_to_kill_on_close_job( - child: &tokio::process::Child, + pub(super) fn assign_to_kill_on_close_job( + process_handle: RawHandle, ) -> io::Result { // SAFETY: Creating an anonymous job object with no security attributes. let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; @@ -628,12 +635,12 @@ mod win_job { let job = OwnedJobHandle(job); // Configure the job to kill all processes when the handle is closed. - let mut info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION { - BasicLimitInformation: winapi::um::winnt::JOBOBJECT_BASIC_LIMIT_INFORMATION { - LimitFlags: JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, - ..unsafe { std::mem::zeroed() } - }, - ..unsafe { std::mem::zeroed() } + // SAFETY: JOBOBJECT_EXTENDED_LIMIT_INFORMATION is a plain C struct (no pointers + // in the zeroed fields). Zeroing then setting LimitFlags is the standard pattern. + let mut info = unsafe { + let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); + info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + info }; // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. @@ -650,14 +657,8 @@ mod win_job { return Err(io::Error::last_os_error()); } - // Use the child's raw process handle directly — no need to OpenProcess via PID. - let process_handle = child.raw_handle().ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "child process has no handle (already exited?)") - })?; - - // SAFETY: Both handles are valid — job from CreateJobObjectW, process from the - // just-spawned child. The child's handle is borrowed (not consumed), so tokio - // retains ownership for wait/kill. + // SAFETY: Both handles are valid — job from CreateJobObjectW, process handle + // from the caller. let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; if ok == FALSE { return Err(io::Error::last_os_error()); diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index ad93a78f..e1f761d4 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -104,17 +104,38 @@ pub async fn spawn_with_tracking( cmd.current_dir(&*spawn_command.cwd); cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); + // On Windows, assign the child to a Job Object so that killing the child also + // kills all descendant processes (e.g., node.exe spawned by a .cmd shim). + // Declared before the branch so it outlives both the fspy and non-fspy paths. + #[cfg(windows)] + let _job; + let (mut child_stdout, mut child_stderr, wait_state) = if path_accesses.is_some() { // fspy tracking enabled — fspy's background task handles cancellation let mut tracked_child = cmd.spawn(cancellation_token).await?; let stdout = tracked_child.stdout.take().unwrap(); let stderr = tracked_child.stderr.take().unwrap(); + #[cfg(windows)] + { + use std::os::windows::io::AsRawHandle; + _job = super::win_job::assign_to_kill_on_close_job( + tracked_child.process_handle.as_raw_handle(), + )?; + } (stdout, stderr, WaitState::FspyEnabled(tracked_child)) } else { // No fspy — spawn a background task that waits for exit or cancellation let mut child = cmd.into_tokio_command().spawn()?; let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); + #[cfg(windows)] + { + use std::os::windows::io::{AsRawHandle, BorrowedHandle}; + // SAFETY: The child was just spawned, so its raw handle is valid. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + let owned = borrowed.try_clone_to_owned()?; + _job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; + } let wait_handle = tokio::spawn(async move { tokio::select! { status = child.wait() => status, diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml index 2f95e3f1..b7f4a6df 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml @@ -14,3 +14,9 @@ steps = ["vt run -r build"] [[e2e]] name = "failure kills concurrent tasks" steps = ["vt run -r test"] + +# Same as above but with --cache to exercise the piped stdio / fspy path +# (spawn_with_tracking) instead of the inherited stdio path (spawn_inherited). +[[e2e]] +name = "failure kills concurrent cached tasks" +steps = ["vt run -r --cache test"] diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap new file mode 100644 index 00000000..71c7dc36 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills concurrent cached tasks.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +[1]> vt run -r --cache test +~/packages/a$ barrier ../../.barrier test-sync 2 --exit=1 +~/packages/b$ barrier ../../.barrier test-sync 2 --hang + + +--- +vt run: 0/2 cache hit (0%), 2 failed. (Run `vt run --last-details` for full details) From 2e83f7f959d1165ac0c6012e97ae4db19d3c9c79 Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 21:50:50 +0800 Subject: [PATCH 08/11] fix: terminate job on cancellation to unblock pipe reads On Windows with piped stdio, killing the direct child (cmd.exe) doesn't close pipes held by grandchild processes (node.exe). The pipe read loop in spawn_with_tracking blocks forever waiting for EOF. Fix: add a cancellation branch to the pipe read loop that calls TerminateJobObject to kill the entire process tree, closing all pipes. Also add TerminateJobObject import and terminate() method on OwnedJobHandle. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 16 ++++++++++++- crates/vite_task/src/session/execute/spawn.rs | 24 ++++++++++++++----- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index aa2c45f7..74c90580 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -603,7 +603,10 @@ mod win_job { shared::minwindef::FALSE, um::{ handleapi::CloseHandle, - jobapi2::{AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject}, + jobapi2::{ + AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject, + TerminateJobObject, + }, winnt::{ HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, }, @@ -613,6 +616,17 @@ mod win_job { /// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. pub(super) struct OwnedJobHandle(HANDLE); + impl OwnedJobHandle { + /// Immediately terminate all processes in the job. + /// + /// This is needed when pipes to a grandchild process must be closed before + /// the job handle is dropped (e.g., to unblock pipe reads in `spawn_with_tracking`). + pub(super) fn terminate(&self) { + // SAFETY: self.0 is a valid job handle from CreateJobObjectW. + unsafe { TerminateJobObject(self.0, 1) }; + } + } + impl Drop for OwnedJobHandle { fn drop(&mut self) { // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index e1f761d4..8d0d25ff 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -108,7 +108,12 @@ pub async fn spawn_with_tracking( // kills all descendant processes (e.g., node.exe spawned by a .cmd shim). // Declared before the branch so it outlives both the fspy and non-fspy paths. #[cfg(windows)] - let _job; + let job; + + // Clone the token before it's moved into the spawn branches. The clone is used + // in the pipe read loop on Windows to terminate the job (killing grandchild + // processes that hold pipes open). + let cancellation_for_pipes = cancellation_token.clone(); let (mut child_stdout, mut child_stderr, wait_state) = if path_accesses.is_some() { // fspy tracking enabled — fspy's background task handles cancellation @@ -118,7 +123,7 @@ pub async fn spawn_with_tracking( #[cfg(windows)] { use std::os::windows::io::AsRawHandle; - _job = super::win_job::assign_to_kill_on_close_job( + job = super::win_job::assign_to_kill_on_close_job( tracked_child.process_handle.as_raw_handle(), )?; } @@ -134,7 +139,7 @@ pub async fn spawn_with_tracking( // SAFETY: The child was just spawned, so its raw handle is valid. let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; let owned = borrowed.try_clone_to_owned()?; - _job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; + job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; } let wait_handle = tokio::spawn(async move { tokio::select! { @@ -158,9 +163,9 @@ pub async fn spawn_with_tracking( let start = Instant::now(); // Read from both stdout and stderr concurrently using select! - // No cancellation branch needed — both WaitState variants run a background task - // that kills the child on cancellation, which closes the pipes and makes reads - // return EOF naturally. + // On cancellation, the background task kills the direct child, but on Windows + // grandchild processes may keep pipes open. The cancellation branch terminates + // the entire job to close all pipe writers. loop { tokio::select! { result = child_stdout.read(&mut stdout_buf), if !stdout_done => { @@ -205,6 +210,13 @@ pub async fn spawn_with_tracking( } } } + // On Windows, kill the entire process tree so that grandchild processes + // release their pipe handles, allowing the reads above to reach EOF. + () = cancellation_for_pipes.cancelled(), if cfg!(windows) => { + #[cfg(windows)] + job.terminate(); + break; + } else => break, } } From 4bdb2f3dcb124f3717bddc518e6b75bbf794a624 Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 22:14:40 +0800 Subject: [PATCH 09/11] fix: pipe read loop hangs on Windows due to pending cancelled() arm The `else => break` in tokio::select! only fires when ALL other arms are disabled. The `cancelled()` arm stays pending (not disabled) even when both pipes have EOF'd, preventing `else` from ever triggering. This caused every piped-stdio task to hang on Windows. Fix: check the exit condition (both pipes done) at the top of the loop instead of relying on the `else` arm. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/src/session/execute/spawn.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 8d0d25ff..4a20b852 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -166,7 +166,14 @@ pub async fn spawn_with_tracking( // On cancellation, the background task kills the direct child, but on Windows // grandchild processes may keep pipes open. The cancellation branch terminates // the entire job to close all pipe writers. + // + // The exit condition is checked at the top of the loop (not via `else =>`), + // because the `cancelled()` arm stays pending even when pipes are done, + // which would prevent `else` from ever firing. loop { + if stdout_done && stderr_done { + break; + } tokio::select! { result = child_stdout.read(&mut stdout_buf), if !stdout_done => { match result? { @@ -217,7 +224,6 @@ pub async fn spawn_with_tracking( job.terminate(); break; } - else => break, } } From 43e983d34f2ca8420ea32482ecd40cbdec6d24a0 Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 22:30:18 +0800 Subject: [PATCH 10/11] refactor: simplify spawn_with_tracking cancellation flow Remove the background tokio::spawn for the non-fspy path. Instead, handle cancellation directly in the pipe read loop alongside pipe reads. This eliminates: - The WaitState enum and background task indirection - The cancellation_for_pipes token clone - The need for Send on the Job Object handle The pipe read loop now has a unified cancellation arm (all platforms) that kills the direct child and terminates the Job Object on Windows. The exit condition is checked at the top of the loop to avoid the tokio::select! else-arm issue with the always-pending cancelled() future. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/src/session/execute/spawn.rs | 197 ++++++++---------- 1 file changed, 90 insertions(+), 107 deletions(-) diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 4a20b852..23e9db33 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -57,6 +57,15 @@ pub struct TrackedPathAccesses { pub path_writes: FxHashSet, } +/// How the child process is awaited after stdout/stderr are drained. +enum ChildWait { + /// fspy tracking enabled — fspy manages cancellation internally. + Fspy(fspy::TrackedChild), + + /// Plain tokio process — cancellation is handled in the pipe read loop. + Tokio(tokio::process::Child), +} + /// Spawn a command with optional file system tracking via fspy, using piped stdio. /// /// Returns the execution result including exit status and duration. @@ -85,19 +94,6 @@ pub async fn spawn_with_tracking( resolved_negatives: &[wax::Glob<'static>], cancellation_token: CancellationToken, ) -> anyhow::Result { - /// How the child process is awaited after stdout/stderr are drained. - /// - /// Both variants run a background task that monitors the cancellation token - /// and kills the child when cancelled. The read loop needs no cancellation - /// branch — killing the child closes its pipes, which makes reads return EOF. - enum WaitState { - /// fspy tracking enabled — background task managed by fspy. - FspyEnabled(fspy::TrackedChild), - - /// Plain tokio process — we spawn our own cancellation-aware background task. - TokioChild(tokio::task::JoinHandle>), - } - let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); cmd.envs(spawn_command.all_envs.iter()); @@ -106,18 +102,13 @@ pub async fn spawn_with_tracking( // On Windows, assign the child to a Job Object so that killing the child also // kills all descendant processes (e.g., node.exe spawned by a .cmd shim). - // Declared before the branch so it outlives both the fspy and non-fspy paths. #[cfg(windows)] let job; - // Clone the token before it's moved into the spawn branches. The clone is used - // in the pipe read loop on Windows to terminate the job (killing grandchild - // processes that hold pipes open). - let cancellation_for_pipes = cancellation_token.clone(); - - let (mut child_stdout, mut child_stderr, wait_state) = if path_accesses.is_some() { - // fspy tracking enabled — fspy's background task handles cancellation - let mut tracked_child = cmd.spawn(cancellation_token).await?; + let (mut child_stdout, mut child_stderr, mut child_wait) = if path_accesses.is_some() { + // fspy tracking enabled — fspy manages cancellation internally via a clone + // of the token. We keep the original for the pipe read loop. + let mut tracked_child = cmd.spawn(cancellation_token.clone()).await?; let stdout = tracked_child.stdout.take().unwrap(); let stderr = tracked_child.stderr.take().unwrap(); #[cfg(windows)] @@ -127,9 +118,8 @@ pub async fn spawn_with_tracking( tracked_child.process_handle.as_raw_handle(), )?; } - (stdout, stderr, WaitState::FspyEnabled(tracked_child)) + (stdout, stderr, ChildWait::Fspy(tracked_child)) } else { - // No fspy — spawn a background task that waits for exit or cancellation let mut child = cmd.into_tokio_command().spawn()?; let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); @@ -141,16 +131,7 @@ pub async fn spawn_with_tracking( let owned = borrowed.try_clone_to_owned()?; job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; } - let wait_handle = tokio::spawn(async move { - tokio::select! { - status = child.wait() => status, - () = cancellation_token.cancelled() => { - child.start_kill()?; - child.wait().await - } - } - }); - (stdout, stderr, WaitState::TokioChild(wait_handle)) + (stdout, stderr, ChildWait::Tokio(child)) }; // Output capturing is independent of fspy tracking @@ -163,13 +144,8 @@ pub async fn spawn_with_tracking( let start = Instant::now(); // Read from both stdout and stderr concurrently using select! - // On cancellation, the background task kills the direct child, but on Windows - // grandchild processes may keep pipes open. The cancellation branch terminates - // the entire job to close all pipe writers. - // - // The exit condition is checked at the top of the loop (not via `else =>`), - // because the `cancelled()` arm stays pending even when pipes are done, - // which would prevent `else` from ever firing. + // Cancellation is handled directly in the loop: kill the child process (and + // on Windows, terminate the Job Object to kill grandchildren holding pipes). loop { if stdout_done && stderr_done { break; @@ -217,9 +193,13 @@ pub async fn spawn_with_tracking( } } } - // On Windows, kill the entire process tree so that grandchild processes - // release their pipe handles, allowing the reads above to reach EOF. - () = cancellation_for_pipes.cancelled(), if cfg!(windows) => { + () = cancellation_token.cancelled() => { + // Kill the direct child (no-op for fspy which handles it internally). + if let ChildWait::Tokio(ref mut child) = child_wait { + let _ = child.start_kill(); + } + // On Windows, terminate the entire process tree so grandchild + // processes release their pipe handles. #[cfg(windows)] job.terminate(); break; @@ -227,83 +207,86 @@ pub async fn spawn_with_tracking( } } - // Wait for process termination. Both variants' background tasks handle - // cancellation internally, so these awaits need no additional select. - let (termination, path_accesses) = match wait_state { - WaitState::FspyEnabled(tracked_child) => { + // Wait for process termination and collect results. + match child_wait { + ChildWait::Fspy(tracked_child) => { let termination = tracked_child.wait_handle.await?; + let duration = start.elapsed(); + // path_accesses must be Some when fspy is enabled (they're set together) let path_accesses = path_accesses.ok_or_else(|| { anyhow::anyhow!("internal error: fspy enabled but path_accesses is None") })?; - (termination, path_accesses) - } - WaitState::TokioChild(wait_handle) => { - let exit_status = wait_handle.await.map_err(|err| anyhow::anyhow!(err))??; - return Ok(SpawnResult { exit_status, duration: start.elapsed() }); - } - }; - let duration = start.elapsed(); - let path_reads = &mut path_accesses.path_reads; - let path_writes = &mut path_accesses.path_writes; + let path_reads = &mut path_accesses.path_reads; + let path_writes = &mut path_accesses.path_writes; - for access in termination.path_accesses.iter() { - // Strip workspace root, clean `..` components, and filter in one pass. - // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. - let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { - let Ok(stripped_path) = strip_result else { - return None; - }; - // On Windows, paths are possible to be still absolute after stripping the workspace root. - // For example: c:\workspace\subdir\c:\workspace\subdir - // Just ignore those accesses. - let relative = RelativePathBuf::new(stripped_path).ok()?; + for access in termination.path_accesses.iter() { + // Strip workspace root, clean `..` components, and filter in one pass. + // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. + let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { + let Ok(stripped_path) = strip_result else { + return None; + }; + // On Windows, paths are possible to be still absolute after stripping the workspace root. + // For example: c:\workspace\subdir\c:\workspace\subdir + // Just ignore those accesses. + let relative = RelativePathBuf::new(stripped_path).ok()?; - // Clean `..` components — fspy may report paths like - // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for - // consistent behavior across platforms and clean user-facing messages. - let relative = relative.clean(); + // Clean `..` components — fspy may report paths like + // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for + // consistent behavior across platforms and clean user-facing messages. + let relative = relative.clean(); - // Skip .git directory accesses (workaround for tools like oxlint) - if relative.as_path().strip_prefix(".git").is_ok() { - return None; - } + // Skip .git directory accesses (workaround for tools like oxlint) + if relative.as_path().strip_prefix(".git").is_ok() { + return None; + } - if !resolved_negatives.is_empty() - && resolved_negatives.iter().any(|neg| neg.is_match(relative.as_str())) - { - return None; - } + if !resolved_negatives.is_empty() + && resolved_negatives.iter().any(|neg| neg.is_match(relative.as_str())) + { + return None; + } - Some(relative) - }); + Some(relative) + }); - let Some(relative_path) = relative_path else { - continue; - }; + let Some(relative_path) = relative_path else { + continue; + }; - if access.mode.contains(AccessMode::READ) { - path_reads.entry(relative_path.clone()).or_insert(PathRead { read_dir_entries: false }); - } - if access.mode.contains(AccessMode::WRITE) { - path_writes.insert(relative_path.clone()); - } - if access.mode.contains(AccessMode::READ_DIR) { - match path_reads.entry(relative_path) { - Entry::Occupied(mut occupied) => occupied.get_mut().read_dir_entries = true, - Entry::Vacant(vacant) => { - vacant.insert(PathRead { read_dir_entries: true }); + if access.mode.contains(AccessMode::READ) { + path_reads + .entry(relative_path.clone()) + .or_insert(PathRead { read_dir_entries: false }); + } + if access.mode.contains(AccessMode::WRITE) { + path_writes.insert(relative_path.clone()); + } + if access.mode.contains(AccessMode::READ_DIR) { + match path_reads.entry(relative_path) { + Entry::Occupied(mut occupied) => { + occupied.get_mut().read_dir_entries = true; + } + Entry::Vacant(vacant) => { + vacant.insert(PathRead { read_dir_entries: true }); + } + } } } - } - } - tracing::debug!( - "spawn finished, path_reads: {}, path_writes: {}, exit_status: {}", - path_reads.len(), - path_writes.len(), - termination.status, - ); + tracing::debug!( + "spawn finished, path_reads: {}, path_writes: {}, exit_status: {}", + path_reads.len(), + path_writes.len(), + termination.status, + ); - Ok(SpawnResult { exit_status: termination.status, duration }) + Ok(SpawnResult { exit_status: termination.status, duration }) + } + ChildWait::Tokio(mut child) => { + let exit_status = child.wait().await?; + Ok(SpawnResult { exit_status, duration: start.elapsed() }) + } + } } From cccd0e0dc03a32088f7882583d74599c855c7f64 Mon Sep 17 00:00:00 2001 From: branchseer Date: Mon, 23 Mar 2026 22:33:28 +0800 Subject: [PATCH 11/11] fix: handle tasks that close stdio without exiting Add cancellation-aware wait after pipe reads in spawn_with_tracking. If a child closes stdout/stderr but stays alive (e.g., daemonizes), the pipe reads EOF but child.wait() would block forever without cancellation support. Add --daemonize flag to barrier test tool and e2e test verifying that daemonized concurrent tasks are properly killed on failure. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/vite_task/src/session/execute/spawn.rs | 12 +++++++++++- .../concurrent-execution/packages/a/package.json | 3 ++- .../concurrent-execution/packages/b/package.json | 3 ++- .../fixtures/concurrent-execution/snapshots.toml | 7 +++++++ ...ailure kills daemonized concurrent tasks.snap | 11 +++++++++++ packages/tools/src/barrier.js | 16 ++++++++++++++-- 6 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 23e9db33..03a055bb 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -208,8 +208,12 @@ pub async fn spawn_with_tracking( } // Wait for process termination and collect results. + // The child may have closed its pipes without exiting (e.g., daemonized), + // so we still need a cancellation arm here. match child_wait { ChildWait::Fspy(tracked_child) => { + // fspy's wait_handle already monitors the cancellation token internally, + // so no additional select! is needed here. let termination = tracked_child.wait_handle.await?; let duration = start.elapsed(); @@ -285,7 +289,13 @@ pub async fn spawn_with_tracking( Ok(SpawnResult { exit_status: termination.status, duration }) } ChildWait::Tokio(mut child) => { - let exit_status = child.wait().await?; + let exit_status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + child.wait().await? + } + }; Ok(SpawnResult { exit_status, duration: start.elapsed() }) } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json index c2ff92c4..6a46b3fa 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/a/package.json @@ -2,6 +2,7 @@ "name": "@concurrent/a", "scripts": { "build": "barrier ../../.barrier sync 2", - "test": "barrier ../../.barrier test-sync 2 --exit=1" + "test": "barrier ../../.barrier test-sync 2 --exit=1", + "daemon": "barrier ../../.barrier daemon-sync 2 --exit=1" } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json index 0643ed5b..40e49f3c 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/packages/b/package.json @@ -2,6 +2,7 @@ "name": "@concurrent/b", "scripts": { "build": "barrier ../../.barrier sync 2", - "test": "barrier ../../.barrier test-sync 2 --hang" + "test": "barrier ../../.barrier test-sync 2 --hang", + "daemon": "barrier ../../.barrier daemon-sync 2 --daemonize" } } diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml index b7f4a6df..b9064498 100644 --- a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots.toml @@ -20,3 +20,10 @@ steps = ["vt run -r test"] [[e2e]] name = "failure kills concurrent cached tasks" steps = ["vt run -r --cache test"] + +# Task b closes stdout/stderr after the barrier but stays alive (daemonizes). +# The pipe reads EOF but the process doesn't exit. The runner must still be +# able to kill it via the cancellation token + Job Object. +[[e2e]] +name = "failure kills daemonized concurrent tasks" +steps = ["vt run -r --cache daemon"] diff --git a/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap new file mode 100644 index 00000000..4713ed55 --- /dev/null +++ b/crates/vite_task_bin/tests/e2e_snapshots/fixtures/concurrent-execution/snapshots/failure kills daemonized concurrent tasks.snap @@ -0,0 +1,11 @@ +--- +source: crates/vite_task_bin/tests/e2e_snapshots/main.rs +expression: e2e_outputs +--- +[1]> vt run -r --cache daemon +~/packages/a$ barrier ../../.barrier daemon-sync 2 --exit=1 +~/packages/b$ barrier ../../.barrier daemon-sync 2 --daemonize + + +--- +vt run: 0/2 cache hit (0%), 2 failed. (Run `vt run --last-details` for full details) diff --git a/packages/tools/src/barrier.js b/packages/tools/src/barrier.js index 3c96f390..87f06aa1 100755 --- a/packages/tools/src/barrier.js +++ b/packages/tools/src/barrier.js @@ -1,6 +1,6 @@ #!/usr/bin/env node -// barrier [--exit=] [--hang] +// barrier [--exit=] [--hang] [--daemonize] // // Cross-platform concurrency barrier for testing. // Creates /_, then waits (via fs.watch) for files @@ -8,7 +8,8 @@ // // Options: // --exit= Exit with the given code after the barrier is met. -// --hang Block on stdin after the barrier is met (for kill tests). +// --hang Keep process alive after the barrier (for kill tests). +// --daemonize Close stdout/stderr but keep process alive (for daemon kill tests). // // If tasks run concurrently, all participants arrive and the barrier resolves. // If tasks run sequentially, the first participant waits forever (test timeout). @@ -19,12 +20,15 @@ import path from 'node:path'; const positional = []; let exitCode = 0; let hang = false; +let daemonize = false; for (const arg of process.argv.slice(2)) { if (arg.startsWith('--exit=')) { exitCode = parseInt(arg.slice(7), 10); } else if (arg === '--hang') { hang = true; + } else if (arg === '--daemonize') { + daemonize = true; } else { positional.push(arg); } @@ -44,6 +48,14 @@ function countMatches() { } function onBarrierMet() { + if (daemonize) { + // Close stdout/stderr but keep the process alive. Simulates a daemon that + // detaches from stdio — tests that the runner can still kill such processes. + process.stdout.end(); + process.stderr.end(); + setInterval(() => {}, 1 << 30); + return; + } if (hang) { // Keep the process alive indefinitely — killed via signal when the runner cancels. // Use setInterval rather than stdin.resume() for cross-platform reliability.