Skip to content

Commit ef89a32

Browse files
committed
remove use_stdin code paths for simplicity
1 parent 3646062 commit ef89a32

8 files changed

Lines changed: 32 additions & 44 deletions

File tree

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ profiles:
116116
queue: normal
117117
memory: "8 GB"
118118
walltime: "04:00"
119-
use_stdin: true
120119
script_prologue:
121120
- "module load java/11"
122121

@@ -142,7 +141,6 @@ profiles:
142141
| `extra_directives` | `[]` | Additional scheduler flags (directive prefix added automatically) |
143142
| `directives_skip` | `[]` | Substrings to filter out of directives |
144143
| `extra_args` | `[]` | Extra CLI args appended to the submit command (e.g. `bsub`) |
145-
| `use_stdin` | `false` | Submit via stdin (`bsub < script.sh`) |
146144
| `lsf_units` | `"MB"` | LSF memory units (`KB`, `MB`, `GB`) |
147145
| `suppress_job_email` | `true` | Set `LSB_JOB_REPORT_MAIL=N` |
148146
| `command_timeout` | `100.0` | Timeout in seconds for scheduler commands |

cluster_api/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class ClusterConfig:
5353
directives_skip: list[str] = field(default_factory=list)
5454
extra_args: list[str] = field(default_factory=list)
5555
lsf_units: str = "MB"
56-
use_stdin: bool = False
5756
job_name_prefix: str | None = None
5857
zombie_timeout_minutes: float = 30.0
5958
completed_retention_minutes: float = 10.0

cluster_api/core.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ async def _call(
268268
shell: bool = False,
269269
timeout: float = 100.0,
270270
env: dict[str, str] | None = None,
271-
stdin_data: str | None = None,
272271
) -> str:
273272
"""Run a subprocess and return stdout.
274273
@@ -291,12 +290,11 @@ async def _call(
291290
stdout=asyncio.subprocess.PIPE,
292291
stderr=asyncio.subprocess.PIPE,
293292
env=full_env,
294-
stdin=asyncio.subprocess.PIPE if stdin_data else None,
295293
)
296294

297295
try:
298296
stdout, stderr = await asyncio.wait_for(
299-
proc.communicate(stdin_data.encode() if stdin_data else None),
297+
proc.communicate(),
300298
timeout=timeout,
301299
)
302300
except asyncio.TimeoutError:

cluster_api/executors/lsf.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,27 +160,15 @@ def _collect_extra_args(self, resources: ResourceSpec | None = None) -> list[str
160160
return args
161161

162162
async def _bsub(
163-
self, script_path: str, content: str | None, env: dict[str, str] | None,
163+
self, script_path: str, env: dict[str, str] | None,
164164
extra_args: list[str] | None = None,
165165
) -> str:
166-
"""Run bsub via stdin or file and return raw output."""
166+
"""Run bsub with a script file and return raw output."""
167167
submit_env = self._build_submit_env(env)
168-
cmd = [self.submit_command, *(extra_args or [])]
169-
if self.config.use_stdin:
170-
if content is None:
171-
with open(script_path) as f:
172-
content = f.read()
173-
logger.debug("Running: %s (via stdin)", " ".join(cmd))
174-
return await self._call(
175-
cmd,
176-
env=submit_env,
177-
timeout=self.config.command_timeout,
178-
stdin_data=content,
179-
)
180-
full_cmd = [*cmd, script_path]
181-
logger.debug("Running: %s", " ".join(full_cmd))
168+
cmd = [self.submit_command, *(extra_args or []), script_path]
169+
logger.debug("Running: %s", " ".join(cmd))
182170
return await self._call(
183-
full_cmd,
171+
cmd,
184172
env=submit_env,
185173
timeout=self.config.command_timeout,
186174
)
@@ -202,7 +190,7 @@ async def _submit_job(
202190
script_path = write_script(resources.work_dir, script, name, next(self._script_counter))
203191

204192
extra_args = self._collect_extra_args(resources)
205-
out = await self._bsub(script_path, None, env, extra_args)
193+
out = await self._bsub(script_path, env, extra_args)
206194
return self._job_id_from_submit_output(out), script_path
207195

208196
async def _submit_array_job(
@@ -245,7 +233,7 @@ async def _submit_array_job(
245233
f.write(content)
246234

247235
extra_args = self._collect_extra_args(resources)
248-
out = await self._bsub(script_path, content, env, extra_args)
236+
out = await self._bsub(script_path, env, extra_args)
249237
return self._job_id_from_submit_output(out), script_path
250238

251239
def _build_status_args(self) -> list[str]:

docs/Development.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ Terminal jobs are purged from memory after `completed_retention_minutes` (once a
133133
### Key design decisions
134134

135135
- **Poll-based monitoring** — unlike dask-jobqueue (which relies on workers phoning home), this library actively polls the scheduler. This means it works with any executable, not just Python workers.
136-
- **Stdin submission**LSF's `bsub < script.sh` mode avoids filesystem race conditions on shared storage. Controlled by `use_stdin` config.
136+
- **File-based submission**jobs are submitted via `bsub script.sh`, passing the script file path directly. The script is always written to disk before submission.
137137
- **Job name prefixing** — all jobs get a `{prefix}-{name}` name. The prefix is either configured (`job_name_prefix`) or randomly generated, so concurrent sessions don't collide when polling by name.
138138
- **Array status aggregation** — parent array job status is computed from element statuses. Only transitions to terminal when ALL expected elements are terminal.
139139

tests/cluster_config.example.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
queue: normal
66
memory: "1 GB"
7-
use_stdin: true
87
lsf_units: MB
98
suppress_job_email: true
109

tests/conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ def lsf_config():
3737
walltime="04:00",
3838
poll_interval=0.5,
3939
command_timeout=10.0,
40-
use_stdin=True,
4140
suppress_job_email=True,
4241
lsf_units="MB",
4342
)

tests/test_lsf.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ def test_status_args(self, lsf_config):
270270

271271
class TestSubmission:
272272

273-
async def test_submit_stdin(self, lsf_config, work_dir):
273+
async def test_submit(self, lsf_config, work_dir):
274274
executor = LSFExecutor(lsf_config)
275275
with patch.object(
276276
executor, "_call",
@@ -285,9 +285,9 @@ async def test_submit_stdin(self, lsf_config, work_dir):
285285
assert job.job_id == "12345"
286286
assert job.name == "test-my-job"
287287
assert job.status == JobStatus.PENDING
288-
# Verify stdin submission was used
289-
call_args = mock_call.call_args
290-
assert call_args.kwargs.get("stdin_data") is not None
288+
# Verify file-based submission (script path in cmd args)
289+
cmd = mock_call.call_args[0][0]
290+
assert cmd[-1].endswith(".sh")
291291

292292

293293
async def test_submit_email_suppression(self, lsf_config, work_dir):
@@ -322,10 +322,11 @@ async def test_submit_array(self, lsf_config, work_dir):
322322
)
323323
assert job.job_id == "12345"
324324
assert job.metadata["array_range"] == (1, 50)
325-
# Verify stdin submission included array name
326-
call_args = mock_call.call_args
327-
stdin = call_args.kwargs.get("stdin_data", "")
328-
assert "[1-50]" in stdin
325+
# Verify script file contains array name
326+
script_path = mock_call.call_args[0][0][-1]
327+
with open(script_path) as f:
328+
script = f.read()
329+
assert "[1-50]" in script
329330

330331

331332
class TestArrayScriptRewriting:
@@ -343,9 +344,11 @@ async def test_percent_i_substitution(self, lsf_config, work_dir):
343344
array_range=(1, 10),
344345
resources=ResourceSpec(work_dir=work_dir),
345346
)
346-
stdin = mock_call.call_args.kwargs.get("stdin_data", "")
347-
assert "stdout.%J.%I.log" in stdin
348-
assert "stderr.%J.%I.log" in stdin
347+
script_path = mock_call.call_args[0][0][-1]
348+
with open(script_path) as f:
349+
script = f.read()
350+
assert "stdout.%J.%I.log" in script
351+
assert "stderr.%J.%I.log" in script
349352

350353

351354
class TestCancelByName:
@@ -412,8 +415,10 @@ async def test_with_max_concurrent(self, lsf_config, work_dir):
412415
)
413416
assert job.job_id == "12345"
414417
assert job.metadata["max_concurrent"] == 15
415-
stdin = mock_call.call_args.kwargs.get("stdin_data", "")
416-
assert "[1-100%15]" in stdin
418+
script_path = mock_call.call_args[0][0][-1]
419+
with open(script_path) as f:
420+
script = f.read()
421+
assert "[1-100%15]" in script
417422

418423

419424
async def test_without_max_concurrent(self, lsf_config, work_dir):
@@ -429,9 +434,11 @@ async def test_without_max_concurrent(self, lsf_config, work_dir):
429434
array_range=(1, 100),
430435
resources=ResourceSpec(work_dir=work_dir),
431436
)
432-
stdin = mock_call.call_args.kwargs.get("stdin_data", "")
433-
assert "[1-100]" in stdin
434-
j_line = [line for line in stdin.splitlines() if "-J " in line][0]
437+
script_path = mock_call.call_args[0][0][-1]
438+
with open(script_path) as f:
439+
script = f.read()
440+
assert "[1-100]" in script
441+
j_line = [line for line in script.splitlines() if "-J " in line][0]
435442
assert "%" not in j_line
436443
assert "max_concurrent" not in job.metadata
437444

0 commit comments

Comments
 (0)