Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions cluster_api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,15 @@ async def reconnect(self) -> list[JobRecord]:
async def cancel_all(self, *, done: bool = False) -> None:
"""Cancel all tracked jobs."""
to_cancel = [jid for jid, r in self._jobs.items() if not r.is_terminal]
await asyncio.gather(*(self.cancel(jid, done=done) for jid in to_cancel))
results = await asyncio.gather(
*(self.cancel(jid, done=done) for jid in to_cancel),
return_exceptions=True,
)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning("cancel_all: %d/%d cancellations failed", len(errors), len(to_cancel))
for err in errors:
logger.debug("cancel_all error: %s", err)

# --- Status polling ---

Expand All @@ -257,8 +265,18 @@ async def poll(self) -> dict[str, JobStatus]:
args = self._build_status_args()
try:
out = await self._call(args, timeout=self.config.command_timeout)
except CommandFailedError as e:
# bjobs exits non-zero when some job IDs are gone, but still
# writes valid JSON for the found jobs to stdout. Try to
# parse whatever we got before giving up.
if e.stdout:
logger.debug("Status query returned non-zero but produced output, parsing partial results")
out = e.stdout
else:
logger.warning("Status query failed, skipping poll cycle: %s", e)
return {jid: r.status for jid, r in self._jobs.items()}
except (ClusterAPIError, OSError) as e:
logger.warning("Status query failed, skipping poll cycle: %s", e, exc_info=True)
logger.warning("Status query failed, skipping poll cycle: %s", e)
return {jid: r.status for jid, r in self._jobs.items()}

statuses = self._parse_job_statuses(out)
Expand Down Expand Up @@ -316,6 +334,12 @@ async def _call(
if env:
full_env = {**os.environ, **env}

cmd_str = " ".join(cmd)
if stdin_file:
logger.debug("Running: %s < %s", cmd_str, stdin_file)
else:
logger.debug("Running: %s", cmd_str)

stdin_fh = None
try:
if stdin_file:
Expand Down Expand Up @@ -348,7 +372,8 @@ async def _call(

if proc.returncode != 0:
raise CommandFailedError(
f"Command failed (exit {proc.returncode}): {cmd}\nstderr: {err}"
f"Command failed (exit {proc.returncode}): {cmd}\nstderr: {err}",
stdout=out,
)

return out
Expand Down
4 changes: 4 additions & 0 deletions cluster_api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class CommandTimeoutError(ClusterAPIError):
class CommandFailedError(ClusterAPIError):
"""A subprocess command returned a non-zero exit code."""

def __init__(self, message: str, stdout: str = "") -> None:
super().__init__(message)
self.stdout = stdout


class SubmitError(ClusterAPIError):
"""Failed to submit a job or parse its ID from scheduler output."""
19 changes: 6 additions & 13 deletions cluster_api/executors/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,12 @@
"UNKWN": JobStatus.UNKNOWN,
"WAIT": JobStatus.PENDING,
"PROV": JobStatus.PENDING,
"USUSP": JobStatus.PENDING,
"USUSP": JobStatus.RUNNING,
"PSUSP": JobStatus.PENDING,
"SSUSP": JobStatus.PENDING,
"SSUSP": JobStatus.RUNNING,
}

_BJOBS_FIELDS = (
"jobid stat exit_code exec_host max_mem "
"submit_time start_time finish_time"
)

_BJOBS_RECONNECT_FIELDS = (
"jobid job_name stat exit_code exec_host max_mem "
"submit_time start_time finish_time"
)
Expand Down Expand Up @@ -146,7 +141,6 @@ async def _bsub(
"""Run bsub with a script file and return raw output."""
submit_env = self._build_submit_env(env)
cmd = [self.submit_command, *(extra_args or [])]
logger.debug("Running: %s < %s", cmd, script_path)
return await self._call(
cmd,
stdin_file=script_path,
Expand Down Expand Up @@ -204,8 +198,6 @@ async def _submit_array_job(
for line in lines:
if line.startswith(self.directive_prefix):
line = line.replace(f"-J {name}", f"-J {array_name}")
line = line.replace(f"{name}.out", f"{name}.%I.out")
line = line.replace(f"{name}.err", f"{name}.%I.err")
line = line.replace("stdout.%J.log", "stdout.%J.%I.log")
line = line.replace("stderr.%J.log", "stderr.%J.%I.log")
new_lines.append(line)
Expand All @@ -225,6 +217,9 @@ def _build_status_args(self) -> list[str]:
if self._prefix:
args.extend(["-J", f"{self._prefix}-*"])
args.extend(["-a", "-o", _BJOBS_FIELDS, "-json"])
if not self._prefix:
active_ids = [jid for jid, r in self._jobs.items() if not r.is_terminal]
args.extend(active_ids)
return args

def _parse_job_statuses(
Expand Down Expand Up @@ -285,13 +280,11 @@ async def _cancel_job(self, job_id: str, *, done: bool = False) -> None:
if done:
cmd.append("-d")
cmd.append(job_id)
logger.debug("Running: %s", " ".join(cmd))
await self._call(cmd, timeout=self.config.command_timeout)

async def cancel_by_name(self, name_pattern: str) -> None:
"""Cancel jobs matching name pattern via bkill -J."""
cmd = [self.cancel_command, "-J", name_pattern]
logger.debug("Running: %s", " ".join(cmd))
try:
await self._call(cmd, timeout=self.config.command_timeout)
except CommandFailedError as e:
Expand Down Expand Up @@ -320,7 +313,7 @@ def _build_reconnect_args(self) -> list[str]:
args.extend([
"-J", f"{self._prefix}-*",
"-a",
"-o", _BJOBS_RECONNECT_FIELDS,
"-o", _BJOBS_FIELDS,
"-json",
])
return args
Expand Down
4 changes: 2 additions & 2 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "py-cluster-api"
version = "0.4.0"
version = "0.5.0"
description = "Generic Python library for running jobs on HPC clusters"
readme = "README.md"
license = { file = "LICENSE" }
Expand Down
99 changes: 97 additions & 2 deletions tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,23 @@ def test_multiple_jobs(self, lsf_config):

def test_suspended_states(self, lsf_config):
executor = LSFExecutor(lsf_config)
for lsf_stat in ("USUSP", "PSUSP", "SSUSP"):
# USUSP/SSUSP map to RUNNING (job had already started)
for lsf_stat in ("USUSP", "SSUSP"):
output = self._make_bjobs_json([
{"JOBID": "200", "STAT": lsf_stat, "EXIT_CODE": "-",
"EXEC_HOST": "-", "MAX_MEM": "-",
"SUBMIT_TIME": "-", "START_TIME": "-", "FINISH_TIME": "-"},
])
result = executor._parse_job_statuses(output)
assert result["200"][0] == JobStatus.PENDING
assert result["200"][0] == JobStatus.RUNNING
# PSUSP maps to PENDING (job was suspended before running)
output = self._make_bjobs_json([
{"JOBID": "200", "STAT": "PSUSP", "EXIT_CODE": "-",
"EXEC_HOST": "-", "MAX_MEM": "-",
"SUBMIT_TIME": "-", "START_TIME": "-", "FINISH_TIME": "-"},
])
result = executor._parse_job_statuses(output)
assert result["200"][0] == JobStatus.PENDING


class TestBuildStatusArgs:
Expand Down Expand Up @@ -730,3 +739,89 @@ async def test_poll_partial_failure(self, lsf_config, work_dir):
assert job.status == JobStatus.FAILED
assert job.failed_element_indices == [2]
assert job.array_elements[2].exit_code == 1


class TestPollPartialBjobsFailure:
"""bjobs exits non-zero when some job IDs are gone, but still returns
valid JSON for the remaining jobs. poll() should parse those results
instead of skipping the entire cycle."""

async def test_poll_uses_stdout_from_failed_bjobs(self, work_dir):
"""Two tracked jobs; bjobs fails because one ID is gone, but returns
valid JSON for the other. The surviving job should still be updated."""
from cluster_api.config import ClusterConfig

config = ClusterConfig(
executor="lsf", lsf_units="MB",
command_timeout=10.0, poll_interval=0.5,
)
executor = LSFExecutor(config)

# Submit two jobs
with patch.object(
executor, "_call", new_callable=AsyncMock,
return_value="Job <100> is submitted to queue <normal>.",
):
job1 = await executor.submit(
command="echo a", name="a",
resources=ResourceSpec(work_dir=work_dir),
)
with patch.object(
executor, "_call", new_callable=AsyncMock,
return_value="Job <200> is submitted to queue <normal>.",
):
job2 = await executor.submit(
command="echo b", name="b",
resources=ResourceSpec(work_dir=work_dir),
)

# bjobs returns DONE for job 100 but exits non-zero because job 200
# has been cleaned from LSF's history.
bjobs_stdout = json.dumps({"RECORDS": [
{"JOBID": "100", "STAT": "DONE", "EXIT_CODE": "0",
"EXEC_HOST": "node01", "MAX_MEM": "128 MB",
"SUBMIT_TIME": "-", "START_TIME": "-", "FINISH_TIME": "-"},
]})

with patch.object(
executor, "_call", new_callable=AsyncMock,
side_effect=CommandFailedError(
"Command failed (exit 255): ['bjobs', ...]\nstderr: Job <200> is not found",
stdout=bjobs_stdout,
),
):
statuses = await executor.poll()

assert job1.status == JobStatus.DONE
assert statuses["100"] == JobStatus.DONE
# job2 stays PENDING (not seen, but not incorrectly skipped either)
assert job2.status == JobStatus.PENDING

async def test_poll_skips_when_no_stdout(self, work_dir):
"""If bjobs fails and produces no stdout, poll skips as before."""
from cluster_api.config import ClusterConfig

config = ClusterConfig(
executor="lsf", lsf_units="MB",
command_timeout=10.0, poll_interval=0.5,
)
executor = LSFExecutor(config)

with patch.object(
executor, "_call", new_callable=AsyncMock,
return_value="Job <100> is submitted to queue <normal>.",
):
job = await executor.submit(
command="echo a", name="a",
resources=ResourceSpec(work_dir=work_dir),
)

with patch.object(
executor, "_call", new_callable=AsyncMock,
side_effect=CommandFailedError("Command failed", stdout=""),
):
statuses = await executor.poll()

# Should skip gracefully, job unchanged
assert job.status == JobStatus.PENDING
assert statuses["100"] == JobStatus.PENDING