diff --git a/cluster_api/core.py b/cluster_api/core.py index a7d13ca..9590c1e 100644 --- a/cluster_api/core.py +++ b/cluster_api/core.py @@ -232,7 +232,15 @@ async def reconnect(self) -> list[JobRecord]: async def cancel_all(self, *, done: bool = False) -> None: """Cancel all tracked jobs.""" to_cancel = [jid for jid, r in self._jobs.items() if not r.is_terminal] - await asyncio.gather(*(self.cancel(jid, done=done) for jid in to_cancel)) + results = await asyncio.gather( + *(self.cancel(jid, done=done) for jid in to_cancel), + return_exceptions=True, + ) + errors = [r for r in results if isinstance(r, Exception)] + if errors: + logger.warning("cancel_all: %d/%d cancellations failed", len(errors), len(to_cancel)) + for err in errors: + logger.debug("cancel_all error: %s", err) # --- Status polling --- @@ -257,8 +265,18 @@ async def poll(self) -> dict[str, JobStatus]: args = self._build_status_args() try: out = await self._call(args, timeout=self.config.command_timeout) + except CommandFailedError as e: + # bjobs exits non-zero when some job IDs are gone, but still + # writes valid JSON for the found jobs to stdout. Try to + # parse whatever we got before giving up. + if e.stdout: + logger.debug("Status query returned non-zero but produced output, parsing partial results") + out = e.stdout + else: + logger.warning("Status query failed, skipping poll cycle: %s", e) + return {jid: r.status for jid, r in self._jobs.items()} except (ClusterAPIError, OSError) as e: - logger.warning("Status query failed, skipping poll cycle: %s", e, exc_info=True) + logger.warning("Status query failed, skipping poll cycle: %s", e) return {jid: r.status for jid, r in self._jobs.items()} statuses = self._parse_job_statuses(out) @@ -316,6 +334,12 @@ async def _call( if env: full_env = {**os.environ, **env} + cmd_str = " ".join(cmd) + if stdin_file: + logger.debug("Running: %s < %s", cmd_str, stdin_file) + else: + logger.debug("Running: %s", cmd_str) + stdin_fh = None try: if stdin_file: @@ -348,7 +372,8 @@ async def _call( if proc.returncode != 0: raise CommandFailedError( - f"Command failed (exit {proc.returncode}): {cmd}\nstderr: {err}" + f"Command failed (exit {proc.returncode}): {cmd}\nstderr: {err}", + stdout=out, ) return out diff --git a/cluster_api/exceptions.py b/cluster_api/exceptions.py index 369c408..3948520 100644 --- a/cluster_api/exceptions.py +++ b/cluster_api/exceptions.py @@ -12,6 +12,10 @@ class CommandTimeoutError(ClusterAPIError): class CommandFailedError(ClusterAPIError): """A subprocess command returned a non-zero exit code.""" + def __init__(self, message: str, stdout: str = "") -> None: + super().__init__(message) + self.stdout = stdout + class SubmitError(ClusterAPIError): """Failed to submit a job or parse its ID from scheduler output.""" diff --git a/cluster_api/executors/lsf.py b/cluster_api/executors/lsf.py index cbee2cf..e20282c 100644 --- a/cluster_api/executors/lsf.py +++ b/cluster_api/executors/lsf.py @@ -29,17 +29,12 @@ "UNKWN": JobStatus.UNKNOWN, "WAIT": JobStatus.PENDING, "PROV": JobStatus.PENDING, - "USUSP": JobStatus.PENDING, + "USUSP": JobStatus.RUNNING, "PSUSP": JobStatus.PENDING, - "SSUSP": JobStatus.PENDING, + "SSUSP": JobStatus.RUNNING, } _BJOBS_FIELDS = ( - "jobid stat exit_code exec_host max_mem " - "submit_time start_time finish_time" -) - -_BJOBS_RECONNECT_FIELDS = ( "jobid job_name stat exit_code exec_host max_mem " "submit_time start_time finish_time" ) @@ -146,7 +141,6 @@ async def _bsub( """Run bsub with a script file and return raw output.""" submit_env = self._build_submit_env(env) cmd = [self.submit_command, *(extra_args or [])] - logger.debug("Running: %s < %s", cmd, script_path) return await self._call( cmd, stdin_file=script_path, @@ -204,8 +198,6 @@ async def _submit_array_job( for line in lines: if line.startswith(self.directive_prefix): line = line.replace(f"-J {name}", f"-J {array_name}") - line = line.replace(f"{name}.out", f"{name}.%I.out") - line = line.replace(f"{name}.err", f"{name}.%I.err") line = line.replace("stdout.%J.log", "stdout.%J.%I.log") line = line.replace("stderr.%J.log", "stderr.%J.%I.log") new_lines.append(line) @@ -225,6 +217,9 @@ def _build_status_args(self) -> list[str]: if self._prefix: args.extend(["-J", f"{self._prefix}-*"]) args.extend(["-a", "-o", _BJOBS_FIELDS, "-json"]) + if not self._prefix: + active_ids = [jid for jid, r in self._jobs.items() if not r.is_terminal] + args.extend(active_ids) return args def _parse_job_statuses( @@ -285,13 +280,11 @@ async def _cancel_job(self, job_id: str, *, done: bool = False) -> None: if done: cmd.append("-d") cmd.append(job_id) - logger.debug("Running: %s", " ".join(cmd)) await self._call(cmd, timeout=self.config.command_timeout) async def cancel_by_name(self, name_pattern: str) -> None: """Cancel jobs matching name pattern via bkill -J.""" cmd = [self.cancel_command, "-J", name_pattern] - logger.debug("Running: %s", " ".join(cmd)) try: await self._call(cmd, timeout=self.config.command_timeout) except CommandFailedError as e: @@ -320,7 +313,7 @@ def _build_reconnect_args(self) -> list[str]: args.extend([ "-J", f"{self._prefix}-*", "-a", - "-o", _BJOBS_RECONNECT_FIELDS, + "-o", _BJOBS_FIELDS, "-json", ]) return args diff --git a/pixi.lock b/pixi.lock index b86eff6..8a9579b 100644 --- a/pixi.lock +++ b/pixi.lock @@ -845,8 +845,8 @@ packages: timestamp: 1764896838868 - pypi: ./ name: py-cluster-api - version: 0.4.0 - sha256: 1dd95e2002e0e1b4908c3ea27e6c9b575ae0e2e514cf00a01289b554502ce15d + version: 0.5.0 + sha256: 6a42ec3a63e266d02359e62e4e0fa4d54476083e606b2625beb97828b5de5e60 requires_dist: - pyyaml - pytest ; extra == 'test' diff --git a/pyproject.toml b/pyproject.toml index bb0f39f..f95831d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "py-cluster-api" -version = "0.4.0" +version = "0.5.0" description = "Generic Python library for running jobs on HPC clusters" readme = "README.md" license = { file = "LICENSE" } diff --git a/tests/test_lsf.py b/tests/test_lsf.py index caa0960..fe3680d 100644 --- a/tests/test_lsf.py +++ b/tests/test_lsf.py @@ -248,14 +248,23 @@ def test_multiple_jobs(self, lsf_config): def test_suspended_states(self, lsf_config): executor = LSFExecutor(lsf_config) - for lsf_stat in ("USUSP", "PSUSP", "SSUSP"): + # USUSP/SSUSP map to RUNNING (job had already started) + for lsf_stat in ("USUSP", "SSUSP"): output = self._make_bjobs_json([ {"JOBID": "200", "STAT": lsf_stat, "EXIT_CODE": "-", "EXEC_HOST": "-", "MAX_MEM": "-", "SUBMIT_TIME": "-", "START_TIME": "-", "FINISH_TIME": "-"}, ]) result = executor._parse_job_statuses(output) - assert result["200"][0] == JobStatus.PENDING + assert result["200"][0] == JobStatus.RUNNING + # PSUSP maps to PENDING (job was suspended before running) + output = self._make_bjobs_json([ + {"JOBID": "200", "STAT": "PSUSP", "EXIT_CODE": "-", + "EXEC_HOST": "-", "MAX_MEM": "-", + "SUBMIT_TIME": "-", "START_TIME": "-", "FINISH_TIME": "-"}, + ]) + result = executor._parse_job_statuses(output) + assert result["200"][0] == JobStatus.PENDING class TestBuildStatusArgs: @@ -730,3 +739,89 @@ async def test_poll_partial_failure(self, lsf_config, work_dir): assert job.status == JobStatus.FAILED assert job.failed_element_indices == [2] assert job.array_elements[2].exit_code == 1 + + +class TestPollPartialBjobsFailure: + """bjobs exits non-zero when some job IDs are gone, but still returns + valid JSON for the remaining jobs. poll() should parse those results + instead of skipping the entire cycle.""" + + async def test_poll_uses_stdout_from_failed_bjobs(self, work_dir): + """Two tracked jobs; bjobs fails because one ID is gone, but returns + valid JSON for the other. The surviving job should still be updated.""" + from cluster_api.config import ClusterConfig + + config = ClusterConfig( + executor="lsf", lsf_units="MB", + command_timeout=10.0, poll_interval=0.5, + ) + executor = LSFExecutor(config) + + # Submit two jobs + with patch.object( + executor, "_call", new_callable=AsyncMock, + return_value="Job <100> is submitted to queue .", + ): + job1 = await executor.submit( + command="echo a", name="a", + resources=ResourceSpec(work_dir=work_dir), + ) + with patch.object( + executor, "_call", new_callable=AsyncMock, + return_value="Job <200> is submitted to queue .", + ): + job2 = await executor.submit( + command="echo b", name="b", + resources=ResourceSpec(work_dir=work_dir), + ) + + # bjobs returns DONE for job 100 but exits non-zero because job 200 + # has been cleaned from LSF's history. + bjobs_stdout = json.dumps({"RECORDS": [ + {"JOBID": "100", "STAT": "DONE", "EXIT_CODE": "0", + "EXEC_HOST": "node01", "MAX_MEM": "128 MB", + "SUBMIT_TIME": "-", "START_TIME": "-", "FINISH_TIME": "-"}, + ]}) + + with patch.object( + executor, "_call", new_callable=AsyncMock, + side_effect=CommandFailedError( + "Command failed (exit 255): ['bjobs', ...]\nstderr: Job <200> is not found", + stdout=bjobs_stdout, + ), + ): + statuses = await executor.poll() + + assert job1.status == JobStatus.DONE + assert statuses["100"] == JobStatus.DONE + # job2 stays PENDING (not seen, but not incorrectly skipped either) + assert job2.status == JobStatus.PENDING + + async def test_poll_skips_when_no_stdout(self, work_dir): + """If bjobs fails and produces no stdout, poll skips as before.""" + from cluster_api.config import ClusterConfig + + config = ClusterConfig( + executor="lsf", lsf_units="MB", + command_timeout=10.0, poll_interval=0.5, + ) + executor = LSFExecutor(config) + + with patch.object( + executor, "_call", new_callable=AsyncMock, + return_value="Job <100> is submitted to queue .", + ): + job = await executor.submit( + command="echo a", name="a", + resources=ResourceSpec(work_dir=work_dir), + ) + + with patch.object( + executor, "_call", new_callable=AsyncMock, + side_effect=CommandFailedError("Command failed", stdout=""), + ): + statuses = await executor.poll() + + # Should skip gracefully, job unchanged + assert job.status == JobStatus.PENDING + assert statuses["100"] == JobStatus.PENDING