-
Notifications
You must be signed in to change notification settings - Fork 35
feat(cwl): integration of CWL job submission and execution into DiracX #877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
ryuwd
wants to merge
67
commits into
DIRACGrid:main
Choose a base branch
from
ryuwd:feat/cwl-job-submission
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
67 commits
Select commit
Hold shift + click to select a range
80be181
feat: add CWL workflow submission endpoint and storage model
ryuwd 5d8ed95
fix: enforce schema_version as Literal["1.0"] with default in JobHint
ryuwd 001e8ff
fix: normalize CWL before hashing to prevent duplicate workflow rows
ryuwd 13f8c14
feat: use JSON body for CWL submission and add job wrapper template
ryuwd 93f3215
chore: regenerate diracx client
ryuwd 13bba67
refactor: move job wrapper and commands from dirac-cwl into diracx
ryuwd 0de1f46
fix: job wrapper uses jobs.search
ryuwd f649f63
feat: ReplicaMap integration and added dirac cwl executor
ryuwd 69bb5ae
refactor: cleaned executor significantly
ryuwd ddadf76
feat(core): extend replica map key validation to accept SB: sandbox r…
ryuwd e4416cf
refactor(core): remove path field from IOSource — tar structure deter…
ryuwd ebf46f5
feat(api): extend DiracReplicaMapFsAccess to resolve SB: sandbox paths
ryuwd bdad870
feat(api): add SB: path parsing and sandbox replica map injection
ryuwd b0ea2d5
feat(api): add cwltool as explicit dependency of diracx-api
ryuwd dd118a6
test(api): add unit tests for DiracExecutor, DiracPathMapper, DiracCo…
ryuwd 2c46728
test(api): extend FsAccess tests with LFN resolution coverage
ryuwd f1ce58d
test(api): add unit tests for JobWrapper commands, output parsing, re…
ryuwd c587a18
test(api): add integration test for dirac-cwl-run subprocess with rep…
ryuwd 53e3c55
refactor(api): remove backward-compat paths from CWL-only JobWrapper
ryuwd c97208a
test(api): add JobWrapper integration test for full CWL execution chain
ryuwd 836081f
refactor(api): remove unnecessary module mocking from test files
ryuwd 8d9635c
refactor(cli): move CWL executor package from diracx-api to diracx-cli
ryuwd 25c45be
fix: depend on cwl-utils
ryuwd d6c3341
chore: update lockfile
ryuwd 1db8f7b
feat: add CWL input parsing module (YAML, JSON, CLI args, range)
ryuwd bd85cab
feat: add sandbox scanning, grouping, and path rewriting
ryuwd fc6b398
test: add missing SB passthrough test for rewrite_sandbox_refs
ryuwd 6c7f8cd
feat: add submission confirmation prompt
ryuwd 6c6964d
feat: add shared CWL submission pipeline
ryuwd 2633bdd
feat: add dirac cwl submit command
ryuwd aea7e6b
feat: add dirac submit command (simple mode)
ryuwd 63763ce
feat: add dirac job command group with search
ryuwd d16bf1f
feat: add server-side range expansion for parametric CWL jobs
ryuwd 67e07a7
test: add integration tests for CWL submission pipeline
ryuwd 1f88445
refactor(cli): restructure to resource-based hierarchy
ryuwd a851b97
chore: regenerated client
ryuwd 636bc7d
chore: updated lockfile
ryuwd 32a33a2
test: no skipping if cwltool or dirac-cwl-run not available
ryuwd f1c3202
chore: update lockfile
ryuwd 763ead5
fix(core): SoftwareDistModule should be empty to not break the pilot
ryuwd 4234958
feat: wire up --range submission to server-side CWL model
ryuwd 26e3f35
fix: use explicit import for diracxTokenFromPEM in job wrapper
ryuwd 5f32354
fix: clean up temp CWL file and extract sandbox PFNs in job wrapper
ryuwd 20238d7
refactor: remove unnecessary sandbox PFN extraction from job wrapper
ryuwd 8c19b4d
feat: append input parameters to job name for parametric jobs
ryuwd 483ceb7
fix: accept jobID as last argument in job wrapper
ryuwd 3fd7812
feat: set ApplicationStatus from CWL task label
ryuwd 3ebc763
feat: stream stderr lines as ApplicationStatus via Popen
ryuwd 49a4bae
fix: drain stdout/stderr concurrently and restore failure Application…
ryuwd d09c0e3
refactor: use asyncio subprocess for non-blocking stderr streaming
ryuwd 76acbf8
feat(cli): capture stdout/stderr to log files in generated CWL
ryuwd 6f13421
fix: use stdout.log instead of std.out in integration test
ryuwd 28ea1ff
fix: rate-limit status commits and handle commit failures gracefully
ryuwd 424f524
feat: filter ApplicationStatus to cwltool lifecycle transitions only
ryuwd 42428ef
fix: anchor status regex to log-level prefix and use capture group
ryuwd 36fbae1
fix(cli): sandbox submit
ryuwd 582ac77
fix: strip #fragment from InputSandbox JDL field
ryuwd 85eea01
fix: no temp files
ryuwd 0beb974
fix: some things need uploading regardless
ryuwd 1f35e11
refactor: code around job wrapper sandbox handling
ryuwd b3cc06a
fix: resolve relative path before as_uri() in sandbox replica map
ryuwd de7349e
feat(cli): add dirac job sandbox commands for output sandbox exploration
ryuwd 82fdbfb
fix(cli): use SandboxType enum instead of string for get_job_sandbox
ryuwd caf9f89
fix(cli): strip SB:SE| prefix before get_sandbox_file client call
ryuwd e1ecb19
fix: output sandbox assignment — don't wrap SB: ref in Path()
ryuwd 933c6dc
fix: remove unnecessary doc
ryuwd eeb15b1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,8 @@ pip-log.txt | |
| .tox | ||
| .ruff_cache | ||
| .mypy_cache | ||
| workernode | ||
|
|
||
|
|
||
| # Eclipse | ||
| .project | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| """All classes related to job reports.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime, timezone | ||
|
|
||
| from diracx.client.aio import AsyncDiracClient # type: ignore[attr-defined] | ||
| from diracx.core.models.job import JobMinorStatus, JobStatus, JobStatusUpdate | ||
|
|
||
|
|
||
| class JobReport: | ||
| """JobReport.""" | ||
|
|
||
| def __init__(self, job_id: int, source: str, client: AsyncDiracClient) -> None: | ||
| """Initialize Job Report. | ||
|
|
||
| :param job_id: the job ID | ||
| :param source: source for the reports | ||
| :param client: DiracX client instance | ||
| """ | ||
| self.job_status_info: dict[ | ||
| str, dict[str, str] | ||
| ] = {} # where job status updates are cumulated | ||
| self.job_id = job_id | ||
| self.source = source | ||
| self._client = client | ||
|
|
||
| def set_job_status( | ||
| self, | ||
| status: JobStatus | None = None, | ||
| minor_status: JobMinorStatus | None = None, | ||
| application_status: str | None = None, | ||
| ) -> None: | ||
| """Add a new job status to the job report. | ||
|
|
||
| :param status: job status | ||
| :param minor_status: job minor status | ||
| :param application_status: application status | ||
| """ | ||
| timestamp = str(datetime.now(timezone.utc)) | ||
| # add job status record | ||
| self.job_status_info.update( | ||
| { | ||
| timestamp: JobStatusUpdate( | ||
| Status=status, | ||
| MinorStatus=minor_status, | ||
| ApplicationStatus=application_status, | ||
| Source=self.source, | ||
| ).model_dump() | ||
| } | ||
| ) | ||
|
|
||
| async def send_stored_status_info(self): | ||
| """Send all the accumulated job status information.""" | ||
| if not self.job_status_info: | ||
| return | ||
| body = {self.job_id: self.job_status_info} | ||
| ret = await self._client.jobs.set_job_statuses(body) | ||
| if ret.success: | ||
| self.job_status_info = {} | ||
| else: | ||
| raise RuntimeError(f"Could not set job statuses: {ret}") | ||
|
|
||
| async def commit(self): | ||
| """Send all the accumulated information.""" | ||
| await self.send_stored_status_info() | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be debounced or use some rate limiter in the JobReport class itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would replace the rate limiting logic in the JobWrapper which would be better here