You will need Pixi to build this project.
pixi installThis creates two environments:
- default — development + testing (pytest, pytest-asyncio, pytest-cov, ruff)
- release — packaging + publishing (build, twine)
The library itself has only one runtime dependency: PyYAML.
Always use pixi run — never invoke pytest or ruff directly.
pixi run test # run unit tests
pixi run test-v # verbose output
pixi run test-cov # with coverage report
pixi run lint # ruff check
pixi run fmt # ruff format
pixi run check # lint + test together| File | What it tests | Real subprocesses? |
|---|---|---|
test_config.py |
YAML loading, profiles, memory parsing | No |
test_core.py |
Base Executor logic (submit, poll, cancel, zombies, arrays) |
No — mocks _call() |
test_lsf.py |
LSFExecutor header building, bsub submission, bjobs parsing, array rewriting |
No — mocks _call() |
test_local.py |
LocalExecutor end-to-end (submit, poll, output files, callbacks, array jobs) |
Yes — runs real bash subprocesses |
test_monitor.py |
JobMonitor polling loop, callback dispatch, zombie detection, purging |
No — mocks poll() |
test_reconnect.py |
LSFExecutor.reconnect() — rediscovering running jobs after restart |
No — mocks _call() |
test_integration.py |
Full LSF round-trips (submit, monitor, cancel, arrays, metadata) | Yes — requires a live LSF cluster |
- All async tests use pytest-asyncio with
asyncio_mode = "auto"(set inpyproject.toml), so no@pytest.mark.asynciodecorator is needed. - Mock
Executor._call()withunittest.mock.patch+AsyncMockto avoid hitting a real scheduler. Example:
with patch.object(executor, "_call", new_callable=AsyncMock, return_value="Job <123> ...") as mock:
job = await executor.submit(command="echo hi", name="test", resources=ResourceSpec(work_dir=work_dir))
mock.assert_called_once()- Shared fixtures live in
tests/conftest.py:work_dir(temp directory),default_config(local), andlsf_config(LSF with stdin mode). test_local.pyruns real subprocesses — keep commands fast (echo,pwd, shortsleep).
Integration tests submit real jobs to an LSF cluster and are skipped by default (via the integration pytest marker in pyproject.toml).
pixi run test-integrationPrerequisites:
bsub,bjobs,bkillmust be onPATH- Optionally create
tests/cluster_config.yaml(gitignored) with cluster-specific settings likequeue,lsf_units, etc.
The tests cover: single job success/failure, multiple concurrent jobs, cancellation, job arrays, rich metadata population, and submission without memory limits.
create_executor() # factory: loads config → picks executor class
↓
Executor (core.py) # abstract base: submit, poll, cancel
├── LSFExecutor # bsub/bjobs/bkill via _call()
└── LocalExecutor # asyncio subprocesses
↓
JobMonitor (monitor.py) # async polling loop → callbacks + zombie detection
- Configuration —
load_config()reads YAML, merges base → profile → overrides into aClusterConfigdataclass. - Submission —
Executor.submit()prefixes the job name, calls the subclass_submit_job()which renders a script (viascript.py), writes it to disk, and invokes the scheduler CLI. - Polling —
Executor.poll()runs the status command (e.g.bjobs -json), parses output, and updatesJobRecordfields. For array jobs, element-level statuses are tracked inArrayElementinstances and aggregated viacompute_array_status(). - Monitoring —
JobMonitordrivespoll()in a loop, dispatches callbacks on terminal jobs, detects zombies (jobs missing from scheduler output beyondzombie_timeout_minutes), and purges old records. - Callbacks — registered on
JobRecordviaon_success(),on_failure(), oron_exit(). Both sync and async callables are supported. Fired once, then removed.
script.py renders job scripts using a simple template:
{shebang}
{scheduler directives}
{prologue lines}
{command}
{epilogue lines}
build_header()(per executor) produces directive lines fromResourceSpec+ config defaults.extra_directiveshas two levels with different behaviour:- Config-level (
ClusterConfig.extra_directives): appended verbatim to the script header — users must include the full prefix, e.g."#BSUB -P myproject". - ResourceSpec-level (
ResourceSpec.extra_directives): the directive prefix is added automatically, so users write"-P myproject"and the executor produces"#BSUB -P myproject".
- Config-level (
extra_args(config-level and per-job) append raw arguments to the submit command line, bypassing the script entirely. Both levels are merged at submit time: config-level args come first, then per-job (ResourceSpec.extra_args) args are appended.directives_skipfilters out unwanted directive lines by substring match.- Scripts are written to
{work_dir}/{safe_name}.{counter}.shand made executable.
Default log files include the job ID so each job gets unique output:
| Executor | stdout | stderr |
|---|---|---|
| LSF | stdout.%J.log |
stderr.%J.log |
| LSF array | stdout.%J.%I.log |
stderr.%J.%I.log |
| Local | stdout.{job_id}.log |
stderr.{job_id}.log |
| Local array | stdout.{job_id}.{index}.log |
stderr.{job_id}.{index}.log |
Setting stdout_path / stderr_path on ResourceSpec overrides these defaults.
PENDING → RUNNING → DONE (exit 0)
→ FAILED (non-zero exit, or zombie timeout)
→ KILLED (cancel)
Terminal jobs are purged from memory after completed_retention_minutes (once all callbacks have fired).
- Poll-based monitoring — unlike dask-jobqueue (which relies on workers phoning home), this library actively polls the scheduler. This means it works with any executable, not just Python workers.
- Stdin-based submission — job scripts are written to disk, then submitted via stdin redirection (
bsub < script.sh). The script file is kept on disk for debugging. - Job name prefixing — when
job_name_prefixis configured, all jobs get a{prefix}-{name}name and polling filters by that prefix. When unset, the user controls the full job name and polling queries all jobs.reconnect()requires a prefix to be set. - Array status aggregation — parent array job status is computed from element statuses. Only transitions to terminal when ALL expected elements are terminal.
| Module | Purpose |
|---|---|
__init__.py |
Public API, create_executor() factory |
_types.py |
JobStatus, JobRecord, ResourceSpec, JobExitCondition, ArrayElement |
config.py |
ClusterConfig dataclass, load_config(), YAML search paths |
core.py |
Abstract Executor base class, _call() subprocess helper |
script.py |
render_script(), write_script() |
monitor.py |
JobMonitor — polling loop, callback dispatch, zombie/purge logic |
exceptions.py |
ClusterAPIError, CommandTimeoutError, CommandFailedError, SubmitError |
executors/lsf.py |
LSFExecutor — bsub/bjobs/bkill, header building, bjobs JSON parsing |
executors/local.py |
LocalExecutor — asyncio subprocesses, stdout/stderr capture, array job simulation |
First, increment the version in pyproject.toml and push it to GitHub. Create a Release there and then publish it to PyPI as follows.
To create a Python source package (.tar.gz) and the binary package (.whl) in the dist/ directory, do:
pixi run -e release pypi-buildTo upload the package to PyPI, you'll need one of the project owners to add you as a collaborator. After setting up your access token, do:
pixi run -e release pypi-upload