feat(cwl): integration of CWL job submission and execution into DiracX#877
Draft
ryuwd wants to merge 67 commits intoDIRACGrid:mainfrom
Draft
feat(cwl): integration of CWL job submission and execution into DiracX#877ryuwd wants to merge 67 commits intoDIRACGrid:mainfrom
ryuwd wants to merge 67 commits intoDIRACGrid:mainfrom
Conversation
Introduce the `POST /api/jobs/` endpoint for submitting CWL workflows
with per-job input parameters, and `GET /api/workflows/{workflow_id}`
for retrieving stored workflows. CWL definitions are stored once in a
new `Workflows` table (content-addressed by SHA-256), with per-job
parameters and workflow references added as new columns on `Jobs`.
During the transition period, JDL is still produced via
`cwl_to_jdl()` for compatibility with existing DIRAC infrastructure.
Pydantic now validates schema_version at model construction instead of a runtime check in the logic layer. Users can omit schema_version to get the default "1.0", but invalid versions are rejected immediately.
Parse YAML to dict and serialize as sorted JSON before SHA-256 hashing, so whitespace, comments, and key ordering differences produce the same workflow ID. The original YAML is still stored as-is for readability.
Replace UploadFile multipart with a CWLJobSubmission pydantic model to fix autorest client generation. Add job_wrapper_template.py to diracx-logic for worker-side CWL execution via importlib.resources.
11d928f to
b1362d6
Compare
chore: regenerate client for gubbins
Move JobWrapper, JobReport, commands, and submission models into diracx-api and diracx-core so DIRAC does not need dirac-cwl installed.
fix: again
Rename _resolve_lfn to _resolve_path and update all methods (_abs, glob, open, exists, isfile, isdir, size) to handle both LFN: and SB: prefixed paths. SB: keys are stored with prefix in the replica map while LFN keys are stored without prefix.
Parse SB: prefixed paths in input sandbox downloads, cache per PFN, and inject sandbox entries into the replica map for CWL executor resolution.
…mmandLineTool 28 tests covering LFN extraction, replica map filtering/merging, path resolution in DiracPathMapper, and tool factory routing in DiracCommandLineTool. Loads executor modules by file path to bypass __init__.py's mypyc compat hook.
…lica map - Create test_executor_integration.py with three subprocess tests: test_basic_execution_with_replica_map (LFN resolved via replica map), test_execution_without_replica_map (plain local file), and test_sb_reference_in_replica_map (SB: key resolved via replica map) - Fix DiracPathMapper to convert file:// PFNs to local paths for MapperEnt.target so cwltool passes filesystem paths (not file:// URLs) to subcommands - Extend DiracPathMapper.visit to handle SB: URI scheme alongside LFN: - Add DiracPathMapper.mapper() override so SB: keys with '#' fragments are found directly in _pathmap without cwltool's fragment-stripping logic
Skip non-SB: paths in input_sandbox and non-LFN: paths in input_data with a warning instead of silently handling them. This JobWrapper only supports CWL jobs where sandboxes are always SB: prefixed and input data is always LFN: prefixed.
Add integration test exercising the complete run_job() lifecycle: hint parsing, sandbox download, LFN download, replica map building, SB injection, real dirac-cwl-run execution, and output parsing. Also fix dirac-cwl-run logging to use stderr so that only the cwltool JSON output is written to stdout, allowing job_wrapper.py to reliably parse it with json.loads(). test(api): add JobWrapper integration test for full CWL execution chain Add integration test exercising the complete run_job() lifecycle: hint parsing, sandbox download, LFN download, replica map building, SB injection, real dirac-cwl-run execution, and output parsing. Add conftest.py to pre-load real cwl_utils/ruamel.yaml before other test files mock them, ensuring the integration test can access real types. Also fix dirac-cwl-run logging to use stderr so that only the cwltool JSON output is written to stdout, allowing job_wrapper.py to reliably parse it with json.loads().
All dependencies (cwltool, cwl_utils, ruamel.yaml, DIRACCommon, diracx.client.aio, diracx.api.job_report, diracx.api.jobs) are now installed in the test environment, so replace the sys.modules mock scaffolding and importlib file-loading with direct imports. Delete conftest.py which existed solely to save real module references before mocking occurred.
The executor is a CLI tool (dirac-cwl-run entry point), not an API component. Moves source and tests to diracx-cli, updates cwltool dependency location, and rewrites test_no_cwltool_import to verify the mypyc compatibility patch is installed before cwltool loads. chore: update lockfile chore: update .gitignore
fix: cwl-utils
Shows scalar key=value pairs from the input dict in the job name, e.g. "hello-world (seed=42)" so parametric jobs are distinguishable in search results and monitoring.
DIRAC's wrapper script passes only the jobID (not a json-file), so accept 1 or 2 args and always read the jobID from the last argument.
Report meaningful ApplicationStatus using the CWL task label instead of leaving it as "Unknown". Shows e.g. "parametric-echo completed" or "parametric-echo failed (exit 1)" in job search results. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace subprocess.run with Popen to read stderr line-by-line, re-emit each line to the wrapper's stderr, and store it as ApplicationStatus with immediate commit for real-time visibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Status Read stdout in a background thread to prevent pipe deadlock when both stdout and stderr use subprocess.PIPE. Add explicit application_status to the failure path so the final status is not just the last cwltool line. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace subprocess.Popen + threading with asyncio.create_subprocess_exec for proper async cooperation. Stderr is streamed line-by-line without blocking the event loop, stdout collected concurrently via asyncio task. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add stdout/stderr capture fields to generate_cwl() so cwltool writes the tool's output to stdout.log and stderr.log in the output sandbox, preventing job output from being silently discarded.
Avoids naming collision with DIRAC's conventional std.out filename. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Batch ApplicationStatus commits to at most once per 2 seconds instead of per stderr line, reducing HTTP traffic. All lines are still stored via set_job_status — only the commit frequency changes. Commit failures are logged as warnings instead of killing the job. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Only report step/job/workflow lifecycle lines (start, completed, failed, skipped) as ApplicationStatus. Noise lines (validation, file staging, timing) still go to stderr for Watchdog peek but are not sent to the server. Log level prefixes (INFO/WARNING) are stripped — ApplicationStatus shows clean cwltool output like "[job X] completed success". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Require INFO/WARNING/ERROR prefix before cwltool lifecycle patterns to prevent false positives from user application output. Use group(1) to extract the bracket-onward portion, stripping the level prefix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The server checks sandbox ownership using the PFN without the #fragment. The fragment is only needed by the worker to extract the correct file from the tar archive.
Also rename fake_pfn → fake_sb_ref in submission integration tests and fix assertion to expect #filename fragment.
Adds list, peek, and get subcommands: - list: show files and sizes in the output sandbox - peek: display file contents with syntax highlighting - get: download sandbox files to a local directory
The generated client regex has a typo ((:? vs (?:) that rejects the SB: prefix. The server accepts the bare /S3/... path.
Path() mangles the SB: reference. Also filter None values from get_job_sandbox response in CLI.
ryuwd
commented
Apr 10, 2026
| raise RuntimeError(f"Could not set job statuses: {ret}") | ||
|
|
||
| async def commit(self): | ||
| """Send all the accumulated information.""" |
Contributor
Author
There was a problem hiding this comment.
This should be debounced or use some rate limiter in the JobReport class itself
Contributor
Author
There was a problem hiding this comment.
This would replace the rate limiting logic in the JobWrapper which would be better here
ryuwd
commented
Apr 10, 2026
for more information, see https://pre-commit.ci
ryuwd
commented
Apr 10, 2026
| This class extends StdFsAccess to handle LFN: and SB: prefixed paths by | ||
| looking them up in the replica map and using the physical file path instead. | ||
|
|
||
| Key difference: LFN keys are stored WITHOUT prefix, SB keys are stored WITH prefix. |
ryuwd
commented
Apr 10, 2026
| Services: ServicesConfig = ServicesConfig() | ||
| """Configuration for various DIRAC services.""" | ||
| SoftwareDistModule: str = "LocalSoftwareDist" | ||
| SoftwareDistModule: str = "" |
Contributor
Author
There was a problem hiding this comment.
Was causing errors in the Pilot
to be checked with @chaen
ryuwd
commented
Apr 10, 2026
Comment on lines
+75
to
+81
| # TODO: Compute Adler32 checksum before upload | ||
| # TODO: Extract POOL/ROOT GUID if applicable | ||
| # TODO: Prefer local SEs (getSEsForSite) before remote ones | ||
| # TODO: Implement retry with exponential backoff on transient failures | ||
| # TODO: On complete failure, create a failover Request (RMS) | ||
| # for async recovery instead of raising immediately | ||
| # TODO: Report upload progress via job status updates |
Contributor
Author
There was a problem hiding this comment.
This command is still untested in cert. StoreOutputData still needs fuller implementation, discussion, and testing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
End-to-end CWL job submission and execution for DiracX — from CLI to worker node and back.
Follows the plan in #858.
Goes with DIRACGrid/DIRAC#8506
CLI (
diracx-cli)dirac job submit cwl <workflow> [inputs...]— submit CWL jobs with local file sandbox upload, LFN references, and parametric--rangeexpansiondirac job submit cmd -- <command>— quick submission with auto-generated CWL (captures stdout/stderr to log files)dirac job search— search jobs with conditions and rich table outputdirac job sandbox list|peek|get <job_id>— explore and retrieve output sandbox filesdirac-cwl-run): custom cwltool executor with DIRAC-awareFsAccessandPathMapperfor LFN resolution via replica mapsWorker Node (
diracx-api)[job echo-tool] completed success,[workflow ] starting step greet) streamed as ApplicationStatus with rate-limited commitsSB:<se>|<s3_path>#<filename>URI scheme —#fragmentidentifies the file inside the tar archive; sandbox reference preserved withSB:prefix throughout the systemServer (
diracx-logic,diracx-routers,diracx-core)cwl_to_jdl()extracts dirac:Job hints from CWL, maps to JDL fields (CPUTime, Sites, Tags, I/O sandboxes, InputData, OutputData)stdout:/stderr:fields automatically added to OutputSandbox in JDL#fragmentstripping: JDL InputSandbox contains bare SB: refs (no fragment) for server ownership checks; full URI with#filenamepreserved in CWL inputs for worker extraction--rangespecJobHint,IOSource,OutputDataEntry,ReplicaMap, pre/post-process command frameworkClient (
diracx-client)Key design decisions
SB:URI scheme:SB:<se>|<s3_path>#<relative_path>— logical reference (not PFN), server resolves to presigned URLTest coverage
Status
Under certification testing on
diracx-cert.app.cern.ch. Actively fixing issues found during grid execution.cc @aldbr