Skip to content

feat(cwl): integration of CWL job submission and execution into DiracX#877

Draft
ryuwd wants to merge 67 commits intoDIRACGrid:mainfrom
ryuwd:feat/cwl-job-submission
Draft

feat(cwl): integration of CWL job submission and execution into DiracX#877
ryuwd wants to merge 67 commits intoDIRACGrid:mainfrom
ryuwd:feat/cwl-job-submission

Conversation

@ryuwd
Copy link
Copy Markdown
Contributor

@ryuwd ryuwd commented Apr 2, 2026

End-to-end CWL job submission and execution for DiracX — from CLI to worker node and back.

Follows the plan in #858.

Goes with DIRACGrid/DIRAC#8506

CLI (diracx-cli)

  • dirac job submit cwl <workflow> [inputs...] — submit CWL jobs with local file sandbox upload, LFN references, and parametric --range expansion
  • dirac job submit cmd -- <command> — quick submission with auto-generated CWL (captures stdout/stderr to log files)
  • dirac job search — search jobs with conditions and rich table output
  • dirac job sandbox list|peek|get <job_id> — explore and retrieve output sandbox files
  • Submission pipeline: CWL/YAML parsing, input validation, sandbox scanning/grouping/upload, confirmation prompt, range expansion
  • CWL executor (dirac-cwl-run): custom cwltool executor with DIRAC-aware FsAccess and PathMapper for LFN resolution via replica maps

Worker Node (diracx-api)

  • JobWrapper: full CWL job lifecycle — pre-process (sandbox download, LFN resolution, replica map building), async subprocess execution with live stderr streaming, post-process (output sandbox upload, output data registration)
  • ApplicationStatus reporting: cwltool lifecycle transitions (e.g. [job echo-tool] completed success, [workflow ] starting step greet) streamed as ApplicationStatus with rate-limited commits
  • Sandbox handling: SB:<se>|<s3_path>#<filename> URI scheme — #fragment identifies the file inside the tar archive; sandbox reference preserved with SB: prefix throughout the system
  • JobReport: accumulates status updates, flushes via HTTP with rate limiting

Server (diracx-logic, diracx-routers, diracx-core)

  • CWL-to-JDL translation: cwl_to_jdl() extracts dirac:Job hints from CWL, maps to JDL fields (CPUTime, Sites, Tags, I/O sandboxes, InputData, OutputData)
  • Auto stdout/stderr collection: CWL stdout:/stderr: fields automatically added to OutputSandbox in JDL
  • InputSandbox #fragment stripping: JDL InputSandbox contains bare SB: refs (no fragment) for server ownership checks; full URI with #filename preserved in CWL inputs for worker extraction
  • Range expansion: server-side parametric job expansion from --range spec
  • Models: JobHint, IOSource, OutputDataEntry, ReplicaMap, pre/post-process command framework

Client (diracx-client)

  • Generated client extensions for CWL submission, workflow retrieval, and sandbox operations

Key design decisions

  • CWL-native: no JDL on the client side — CWL is the job description format, JDL is an internal detail
  • cwltool passthrough for status: ApplicationStatus shows verbatim cwltool lifecycle lines rather than a custom translation layer
  • SB: URI scheme: SB:<se>|<s3_path>#<relative_path> — logical reference (not PFN), server resolves to presigned URL
  • Replica map: JSON file passed to cwltool executor, maps LFN/SB paths to local files — decouples CWL execution from DIRAC data management

Test coverage

  • Unit tests: JobWrapper commands, output parsing, CWL hint extraction, sandbox path parsing, replica map injection, submission pipeline, input parsing, executor path mapping
  • Integration tests: full JobWrapper lifecycle with mocked services, real CWL execution, stderr streaming, ApplicationStatus filtering

Status

Under certification testing on diracx-cert.app.cern.ch. Actively fixing issues found during grid execution.

cc @aldbr

@read-the-docs-community
Copy link
Copy Markdown

read-the-docs-community bot commented Apr 2, 2026

Documentation build overview

📚 diracx | 🛠️ Build #32203433 | 📁 Comparing eeb15b1 against latest (63dc086)

  🔍 Preview build  

No files changed.

ryuwd added 4 commits April 7, 2026 13:22
Introduce the `POST /api/jobs/` endpoint for submitting CWL workflows
with per-job input parameters, and `GET /api/workflows/{workflow_id}`
for retrieving stored workflows. CWL definitions are stored once in a
new `Workflows` table (content-addressed by SHA-256), with per-job
parameters and workflow references added as new columns on `Jobs`.
During the transition period, JDL is still produced via
`cwl_to_jdl()` for compatibility with existing DIRAC infrastructure.
Pydantic now validates schema_version at model construction instead of
a runtime check in the logic layer. Users can omit schema_version to
get the default "1.0", but invalid versions are rejected immediately.
Parse YAML to dict and serialize as sorted JSON before SHA-256 hashing,
so whitespace, comments, and key ordering differences produce the same
workflow ID. The original YAML is still stored as-is for readability.
Replace UploadFile multipart with a CWLJobSubmission pydantic model
to fix autorest client generation. Add job_wrapper_template.py to
diracx-logic for worker-side CWL execution via importlib.resources.
@ryuwd ryuwd force-pushed the feat/cwl-job-submission branch from 11d928f to b1362d6 Compare April 7, 2026 11:26
ryuwd added 24 commits April 8, 2026 15:53
chore: regenerate client for gubbins
Move JobWrapper, JobReport, commands, and submission models into
diracx-api and diracx-core so DIRAC does not need dirac-cwl
installed.
Rename _resolve_lfn to _resolve_path and update all methods (_abs, glob,
open, exists, isfile, isdir, size) to handle both LFN: and SB: prefixed
paths. SB: keys are stored with prefix in the replica map while LFN keys
are stored without prefix.
Parse SB: prefixed paths in input sandbox downloads, cache per PFN,
and inject sandbox entries into the replica map for CWL executor resolution.
…mmandLineTool

28 tests covering LFN extraction, replica map filtering/merging, path
resolution in DiracPathMapper, and tool factory routing in DiracCommandLineTool.
Loads executor modules by file path to bypass __init__.py's mypyc compat hook.
…lica map

- Create test_executor_integration.py with three subprocess tests:
  test_basic_execution_with_replica_map (LFN resolved via replica map),
  test_execution_without_replica_map (plain local file), and
  test_sb_reference_in_replica_map (SB: key resolved via replica map)
- Fix DiracPathMapper to convert file:// PFNs to local paths for MapperEnt.target
  so cwltool passes filesystem paths (not file:// URLs) to subcommands
- Extend DiracPathMapper.visit to handle SB: URI scheme alongside LFN:
- Add DiracPathMapper.mapper() override so SB: keys with '#' fragments are
  found directly in _pathmap without cwltool's fragment-stripping logic
Skip non-SB: paths in input_sandbox and non-LFN: paths in input_data
with a warning instead of silently handling them. This JobWrapper only
supports CWL jobs where sandboxes are always SB: prefixed and input
data is always LFN: prefixed.
Add integration test exercising the complete run_job() lifecycle: hint
parsing, sandbox download, LFN download, replica map building, SB
injection, real dirac-cwl-run execution, and output parsing.

Also fix dirac-cwl-run logging to use stderr so that only the cwltool
JSON output is written to stdout, allowing job_wrapper.py to reliably
parse it with json.loads().

test(api): add JobWrapper integration test for full CWL execution chain

Add integration test exercising the complete run_job() lifecycle: hint
parsing, sandbox download, LFN download, replica map building, SB
injection, real dirac-cwl-run execution, and output parsing.

Add conftest.py to pre-load real cwl_utils/ruamel.yaml before other test
files mock them, ensuring the integration test can access real types.

Also fix dirac-cwl-run logging to use stderr so that only the cwltool
JSON output is written to stdout, allowing job_wrapper.py to reliably
parse it with json.loads().
All dependencies (cwltool, cwl_utils, ruamel.yaml, DIRACCommon, diracx.client.aio,
diracx.api.job_report, diracx.api.jobs) are now installed in the test environment,
so replace the sys.modules mock scaffolding and importlib file-loading with direct
imports. Delete conftest.py which existed solely to save real module references
before mocking occurred.
The executor is a CLI tool (dirac-cwl-run entry point), not an API
component. Moves source and tests to diracx-cli, updates cwltool
dependency location, and rewrites test_no_cwltool_import to verify
the mypyc compatibility patch is installed before cwltool loads.

chore: update lockfile

chore: update .gitignore
ryuwd and others added 21 commits April 9, 2026 18:26
Shows scalar key=value pairs from the input dict in the job name,
e.g. "hello-world (seed=42)" so parametric jobs are distinguishable
in search results and monitoring.
DIRAC's wrapper script passes only the jobID (not a json-file),
so accept 1 or 2 args and always read the jobID from the last
argument.
Report meaningful ApplicationStatus using the CWL task label instead of
leaving it as "Unknown". Shows e.g. "parametric-echo completed" or
"parametric-echo failed (exit 1)" in job search results.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace subprocess.run with Popen to read stderr line-by-line,
re-emit each line to the wrapper's stderr, and store it as
ApplicationStatus with immediate commit for real-time visibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Status

Read stdout in a background thread to prevent pipe deadlock when both
stdout and stderr use subprocess.PIPE. Add explicit application_status
to the failure path so the final status is not just the last cwltool line.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace subprocess.Popen + threading with asyncio.create_subprocess_exec
for proper async cooperation. Stderr is streamed line-by-line without
blocking the event loop, stdout collected concurrently via asyncio task.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add stdout/stderr capture fields to generate_cwl() so cwltool writes
the tool's output to stdout.log and stderr.log in the output sandbox,
preventing job output from being silently discarded.
Avoids naming collision with DIRAC's conventional std.out filename.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Batch ApplicationStatus commits to at most once per 2 seconds instead
of per stderr line, reducing HTTP traffic. All lines are still stored
via set_job_status — only the commit frequency changes. Commit failures
are logged as warnings instead of killing the job.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Only report step/job/workflow lifecycle lines (start, completed, failed,
skipped) as ApplicationStatus. Noise lines (validation, file staging,
timing) still go to stderr for Watchdog peek but are not sent to the
server. Log level prefixes (INFO/WARNING) are stripped — ApplicationStatus
shows clean cwltool output like "[job X] completed success".

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Require INFO/WARNING/ERROR prefix before cwltool lifecycle patterns to
prevent false positives from user application output. Use group(1) to
extract the bracket-onward portion, stripping the level prefix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The server checks sandbox ownership using the PFN without the
#fragment. The fragment is only needed by the worker to extract
the correct file from the tar archive.
Also rename fake_pfn → fake_sb_ref in submission integration tests
and fix assertion to expect #filename fragment.
Adds list, peek, and get subcommands:
- list: show files and sizes in the output sandbox
- peek: display file contents with syntax highlighting
- get: download sandbox files to a local directory
The generated client regex has a typo ((:? vs (?:) that rejects
the SB: prefix. The server accepts the bare /S3/... path.
Path() mangles the SB: reference. Also filter None values
from get_job_sandbox response in CLI.
@ryuwd ryuwd changed the title feat(cwl): add CWL workflow submission endpoint and DB storage model feat(cwl): integration of CWL job submission and execution into DiracX Apr 10, 2026
raise RuntimeError(f"Could not set job statuses: {ret}")

async def commit(self):
"""Send all the accumulated information."""
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be debounced or use some rate limiter in the JobReport class itself

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would replace the rate limiting logic in the JobWrapper which would be better here

This class extends StdFsAccess to handle LFN: and SB: prefixed paths by
looking them up in the replica map and using the physical file path instead.

Key difference: LFN keys are stored WITHOUT prefix, SB keys are stored WITH prefix.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be harmonised

Services: ServicesConfig = ServicesConfig()
"""Configuration for various DIRAC services."""
SoftwareDistModule: str = "LocalSoftwareDist"
SoftwareDistModule: str = ""
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was causing errors in the Pilot

to be checked with @chaen

Comment on lines +75 to +81
# TODO: Compute Adler32 checksum before upload
# TODO: Extract POOL/ROOT GUID if applicable
# TODO: Prefer local SEs (getSEsForSite) before remote ones
# TODO: Implement retry with exponential backoff on transient failures
# TODO: On complete failure, create a failover Request (RMS)
# for async recovery instead of raising immediately
# TODO: Report upload progress via job status updates
Copy link
Copy Markdown
Contributor Author

@ryuwd ryuwd Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command is still untested in cert. StoreOutputData still needs fuller implementation, discussion, and testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant