From b22f7a123398deed2e0635b6fcd8062e027be4b3 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 26 Mar 2026 13:58:43 +0000 Subject: [PATCH 1/2] feat: port core launcher base functionality to the `RuntimeBackend` base This commit fleshes out the abstract `RuntimeBackend` base class with a lot of core functionality that will be needed to implement new runtime backends (which aren't just the legacy launcher adapter). These mostly take the form of protected methods optionally called by backends, comprised of logic on the `Launcher` base class or that was previously duplicated across its various subclasses. Some key changes to note from the launchers: - A new `DVSIM_RUN_INTERACTIVE` env var is introduced intended to replace the `RUN_INTERACTIVE` env var long term, to avoid potential name collision. - Errors are raised if an interactive job tries to run on a backend that doesn't support running jobs interactively. - Log parsing functionality is extracted to a separate object; logs are always lazily loaded so that for jobs that don't need them (passing jobs without any fail or pass patterns), we don't waste time. - Efficiency of the log contents pass/fail regex pattern parsing is improved. Fail patterns are combined into a single regex check, and all regexes are compiled once instead of per-line. Signed-off-by: Alex Jones --- src/dvsim/runtime/backend.py | 261 ++++++++++++++++++++++++++++++++++- 1 file changed, 259 insertions(+), 2 deletions(-) diff --git a/src/dvsim/runtime/backend.py b/src/dvsim/runtime/backend.py index 8c62e279..cba5edcc 100644 --- a/src/dvsim/runtime/backend.py +++ b/src/dvsim/runtime/backend.py @@ -4,17 +4,46 @@ """Runtime backend abstract base class.""" +import os +import re from abc import ABC, abstractmethod -from collections.abc import Hashable, Iterable +from collections.abc import Hashable, Iterable, Sequence +from pathlib import Path -from dvsim.job.data import JobSpec +from dvsim.job.data import JobSpec, JobStatusInfo from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime from dvsim.logging import log from dvsim.runtime.data import ( CompletionCallback, JobCompletionEvent, JobHandle, ) +from dvsim.tool.utils import get_sim_tool_plugin +from dvsim.utils import clean_odirs + +__all__ = ("RuntimeBackend",) + +# A list of magic flags that are currently cleared. +# TODO: it would be good to find a nicer solution for this - perhaps a common configuration +# could just re-export it or define that it should not exist? Or it could be in a DVSim config. +MAGIC_VARS_TO_CLEAR = { + # This variable is used by recursive Make calls to pass variables from one level to the next. + # Even if our command here invokes Make, it should logically be a top-level invocation. We + # don't want to pollute the flow with Make variables from any wrapper that called DVSim. + "MAKEFLAGS", +} + +# Relative paths to files created in job output directories +ENV_DUMP_PATH = "env_vars" + + +# The number of lines to give as context when a failure pattern is parsed from a log file. +NUM_LOG_FAIL_CONTEXT_LINES = 4 +# The number of lines to give as context when pass patterns are missing from a log file. +NUM_LOG_PASS_CONTEXT_LINES = 10 +# The number of lines to give as context when a non-zero exit code is returned. +NUM_RETCODE_CONTEXT_LINES = 10 class RuntimeBackend(ABC): @@ -66,6 +95,9 @@ async def _emit_completion(self, events: Iterable[JobCompletionEvent]) -> None: raise RuntimeError("Backend not attached to the scheduler") for event in events: + # TODO: aim to refactor to remove these callbacks + event.spec.post_finish(event.status) + log.debug( "Job %s completed execution: %s", event.spec.qual_name, event.status.shorthand ) @@ -112,3 +144,228 @@ async def close(self) -> None: # noqa: B027 The default implementation just does nothing. """ + + def _build_job_env( + self, + job: JobSpec, + backend_env: dict[str, str] | None = None, + remove: Iterable[str] | None = None, + ) -> dict[str, str]: + """Build job environment configuration for a given job. + + Arguments: + job: The job specification to get the environment from. + context: The job execution context for this backend. + backend_env: Any backend-specific environment overrides to use. Defaults to None. + Takes precedence over the base OS environment, but is overridden by the job itself. + remove: A list of variables to remove from the final environment variable list. + Defaults to None. + + Returns the job environment as a mapping of env var names to values. + + """ + # Take the existing environment variables and update with any exports defined on the spec. + # TODO: consider adding some `--clean-env` CLI arg & flag to only use `job.exports` instead + # of also inheriting from `os.environ`? + env = dict(os.environ) + if backend_env: + env.update(backend_env) + env.update(job.exports) + + # If the job is set to run in "interactive" mode, we set the `RUN_INTERACTIVE` environment + # variable to 1, and also make a note in the environment. + if job.interactive: + env["DVSIM_RUN_INTERACTIVE"] = "1" + # TODO: Legacy environment variable not prefixed with `DVSIM` - deprecate this. + env["RUN_INTERACTIVE"] = "1" + + # Clear any magic flags or `remove` entries from the environment variable export list + for key in remove or (): + env.pop(key, None) + for magic_var in MAGIC_VARS_TO_CLEAR: + env.pop(magic_var, None) + + # Dump the environment variables to their own file to make debugging easier. + if job.odir and job.odir.exists(): + dump = job.odir / ENV_DUMP_PATH + with dump.open("w", encoding="utf-8", errors="surrogateescape") as f: + f.writelines(f"{key}={value}\n" for key, value in sorted(env.items())) + + return env + + def _make_job_output_directory(self, job: JobSpec) -> None: + """Create the output directory for a job. + + Depending on the configured `renew_odir` setting, this will optionally clean or maintain + a list of previous output directories for this job. + + """ + if job.renew_odir: + clean_odirs(odir=job.odir, max_odirs=self.max_output_dirs) + + Path(job.odir).mkdir(exist_ok=True, parents=True) + + def _prepare_launch(self, job: JobSpec) -> None: + """Do any pre-launch activities, preparing the environment. + + This may include clearing old runs, creating the output directory, etc. + """ + if job.interactive and not self.supports_interactive: + msg = f"Interactive jobs are not supported by the '{self.name}' backend." + raise RuntimeError(msg) + + job.pre_launch() + self._make_job_output_directory(job) + + def _finish_job( + self, handle: JobHandle, exit_code: int, runtime: float | None + ) -> tuple[JobStatus, JobStatusInfo | None]: + """Determine the outcome of a job that ran to completion, and parse extra log info. + + Updates the handle with any extracted job runtime & simulation time info. + """ + if handle.spec.dry_run: + return JobStatus.PASSED, None + + log_results = LogResults(handle.spec) + + # Update time information on the handle. + job_runtime, simulated_time = log_results.get_runtime_from_logs() + if job_runtime is None: + log.warning("%s: Using dvsim-maintained job_runtime instead.", handle.spec.full_name) + if runtime is not None: + handle.job_runtime.set(runtime, "s") + else: + handle.job_runtime.set(*job_runtime.get()) + if simulated_time is not None: + handle.simulated_time.set(*simulated_time.get()) + + # Determine the final status from the logs and exit code. + status, reason = log_results.get_status_from_logs() + if status is not None: + return status, reason + if exit_code != 0: + lines = log_results.get_lines() + return JobStatus.FAILED, JobStatusInfo( + message=f"Job returned a non-zero exit code: {exit_code}", + context=lines[-NUM_RETCODE_CONTEXT_LINES:], + ) + return JobStatus.PASSED, None + + +class LogResults: + """Wrapper for log result parsing which lazily loads the contents of the job log file.""" + + def __init__(self, job: JobSpec) -> None: + """Construct a LogResults object. Does not load the log file until needed.""" + self.spec = job + self._parsed = False + self._lines: list[str] | None = None + self._err_status: tuple[JobStatus, JobStatusInfo] | None = None + + def _ensure_log_parsed(self) -> None: + """Parse the log file into its lines if not already parsed.""" + if self._parsed: + return + + try: + with self.spec.log_path.open(encoding="utf-8", errors="surrogateescape") as f: + self._lines = f.readlines() + except OSError as e: + log.debug( + "%s: Error reading job log file %s: %s", + self.spec.full_name, + str(self.spec.log_path), + str(e), + ) + self._err_status = ( + JobStatus.FAILED, + JobStatusInfo(message=f"Error opening file {self.spec.log_path}:\n{e}"), + ) + finally: + self._parsed = True + + def get_lines(self) -> Sequence[str]: + """Get the sequence of lines in the log results, or an empty sequence if failed parsing.""" + self._ensure_log_parsed() + return () if self._lines is None else self._lines + + def get_status_from_logs(self) -> tuple[JobStatus | None, JobStatusInfo | None]: + """Determine the outcome of a completed job from its log file.""" + # Check we actually need to use the logs before loading them + use_log_check_strategy = bool(self.spec.fail_patterns) or bool(self.spec.pass_patterns) + if not use_log_check_strategy: + return None, None + + lines = self.get_lines() + if self._err_status: + return self._err_status + + fail_regex = None + if self.spec.fail_patterns: + fail_regex = re.compile("|".join(f"(?:{p})" for p in self.spec.fail_patterns)) + pass_regexes = {re.compile(pattern) for pattern in self.spec.pass_patterns} + + # TODO: does this need to be restricted to per-line patterns? It would complicate line + # number parsing, but it might be useful to make this more expressive? + for lineno, line in enumerate(lines, start=1): + # If the job matches ANY fail pattern, it fails. Provide some extra lines for context. + if fail_regex and fail_regex.search(line): + end = lineno + NUM_LOG_FAIL_CONTEXT_LINES + return JobStatus.FAILED, JobStatusInfo( + message=line.strip(), lines=[lineno], context=lines[lineno:end] + ) + + # The job must match ALL pass patterns to succeed. + matched = {regex for regex in pass_regexes if regex.search(line)} + pass_regexes -= matched + + if not pass_regexes and not fail_regex: + break # Early exit if possible + + if pass_regexes: + pass_patterns = [regex.pattern for regex in pass_regexes] + return JobStatus.FAILED, JobStatusInfo( + message=f"Some pass patterns missing: {pass_patterns}", + context=lines[-NUM_LOG_PASS_CONTEXT_LINES:], + ) + + return None, None + + def get_runtime_from_logs(self) -> tuple[JobTime | None, JobTime | None]: + """Try to determine a job's runtime from its log file, using specified extensions.""" + # TODO: rather than check the job type here, in the future the sim tool plugin should + # define the job types it supports. Even longer term, perhaps the job time and sim time + # should not be defined on the JobHandle/CompletedJobStatus and should be directly parsed + # out of the resulting log artifacts by the respective flows. + sim_job_types = ["CompileSim", "RunTest", "CovUnr", "CovMerge", "CovReport", "CovAnalyze"] + supports_log_info_ext = self.spec.job_type in sim_job_types + if not supports_log_info_ext: + return None, None + + lines = self.get_lines() + if self._err_status: + return None, None + + try: + plugin = get_sim_tool_plugin(tool=self.spec.tool.name) + except NotImplementedError as e: + log.error("%s: %s", self.spec.full_name, str(e)) + return None, None + + runtime = None + try: + time, unit = plugin.get_job_runtime(log_text=lines) + runtime = JobTime(time, unit) + except RuntimeError as e: + log.warning("%s: %s", self.spec.full_name, str(e)) + + simulated_time = None + if self.spec.job_type == "RunTest": + try: + time, unit = plugin.get_simulated_time(log_text=lines) + simulated_time = JobTime(time, unit) + except RuntimeError as e: + log.debug("%s: %s", self.spec.full_name, str(e)) + + return runtime, simulated_time From d14abc6aa46a1617bbd372f1c6fd835e0610af8f Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 26 Mar 2026 14:40:46 +0000 Subject: [PATCH 2/2] feat: add `LocalRuntimeBackend` backend This is the async `RuntimeBackend` replacement of the `LocalLauncher`, which will eventually by removed in lieu of this new backend. Some behavioural differences to note: - We now try to await() after a SIGKILL to be sure the process ended, bounded by a short timeout in case blocked at the kernel level. - We now use psutil to enumerate and kill descendent processes in addition to the created subprocess. This won't catch orphaned processes (needs e.g. cgroups), but should cover most sane usage. - The backend does _not_ link the output directories based on status (the `JobSpec.links`, e.g. "passing/", "failed/", "killed/"). The intention is that this detail is not core functionality for either the scheduler or the backends - instead, it will be implemented as an observer on the new async scheduler callbacks when introduced. By using async subprocesses and launching/killing jobs in batch, we are able to more efficiently launch jobs in parallel via async coroutines. We likewise avoid the ned to poll jobs - instead we have an async task awaiting the subprocess' completion, which we then forward to notify the (to be added) scheduler of the job's completion. Note that interactive jobs are still basically handled synchronously as before - assumed that there is only 1 interactive job running at a time. Signed-off-by: Alex Jones --- src/dvsim/runtime/local.py | 334 +++++++++++++++++++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 src/dvsim/runtime/local.py diff --git a/src/dvsim/runtime/local.py b/src/dvsim/runtime/local.py new file mode 100644 index 00000000..71c98798 --- /dev/null +++ b/src/dvsim/runtime/local.py @@ -0,0 +1,334 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Legacy launcher adapter interface for the new async scheduler design.""" + +import asyncio +import contextlib +import shlex +import signal +import subprocess +import time +from collections.abc import Hashable, Iterable +from dataclasses import dataclass +from typing import TextIO + +import psutil + +from dvsim.job.data import JobSpec, JobStatusInfo +from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + + +@dataclass(kw_only=True) +class LocalJobHandle(JobHandle): + """Job handle for a job belonging to a legacy launcher adapter runtime backend.""" + + process: asyncio.subprocess.Process | None + log_file: TextIO | None + start_time: float + kill_requested: bool = False + + +class LocalRuntimeBackend(RuntimeBackend): + """Launch jobs as subprocesses on the user's local machine.""" + + name = "local" + supports_interactive = True + + DEFAULT_SIGTERM_TIMEOUT = 2.0 # in seconds + DEFAULT_SIGKILL_TIMEOUT = 2.0 # in seconds + + def __init__( + self, + *, + max_parallelism: int | None = None, + sigterm_timeout: float | None = None, + sigkill_timeout: float | None = None, + ) -> None: + """Construct a local runtime backend. + + Args: + max_parallelism: The maximum number of jobs that can be dispatched to this backend + at once. `0` means no limit, `None` means no override is applied to the default. + sigterm_timeout: The time to wait for a process to die after a SIGTERM when killing + it, before sending SIGKILL. + sigkill_timeout: The time to wait for a process to die after a SIGKILL when killing + it, before giving up (so the scheduler can progress). + + """ + super().__init__(max_parallelism=max_parallelism) + self.sigterm_timeout = ( + sigterm_timeout if sigterm_timeout is not None else self.DEFAULT_SIGTERM_TIMEOUT + ) + self.sigkill_timeout = ( + sigkill_timeout if sigkill_timeout is not None else self.DEFAULT_SIGKILL_TIMEOUT + ) + + # Retain references to created asyncio tasks so they don't get GC'd. + self._tasks: set[asyncio.Task] = set() + + async def _log_from_pipe( + self, handle: LocalJobHandle, stream: asyncio.StreamReader | None + ) -> None: + """Write piped asyncio subprocess stream contents to a job's log file.""" + if stream is None or not handle.log_file: + return + try: + async for line in stream: + decoded = line.decode("utf-8", errors="surrogateescape") + handle.log_file.write(decoded) + handle.log_file.flush() + except asyncio.CancelledError: + pass + + async def _monitor_job(self, handle: LocalJobHandle) -> None: + """Wait for subprocess completion and emit a completion event.""" + if handle.process is None: + return + + if handle.log_file: + handle.log_file.write(f"[Executing]:\n{handle.spec.cmd}\n\n") + handle.log_file.flush() + + reader_tasks = [ + asyncio.create_task(self._log_from_pipe(handle, handle.process.stdout)), + asyncio.create_task(self._log_from_pipe(handle, handle.process.stderr)), + ] + status = JobStatus.KILLED + reason = None + + try: + exit_code = await asyncio.wait_for( + handle.process.wait(), timeout=handle.spec.timeout_secs + ) + runtime = time.monotonic() - handle.start_time + status, reason = self._finish_job(handle, exit_code, runtime) + except asyncio.TimeoutError: + await self._kill_job(handle) + status = JobStatus.KILLED + timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes" + reason = JobStatusInfo(message=timeout_message) + finally: + # Explicitly cancel reader tasks and wait for them to finish before closing the log + # file. We first give them a second to finish naturally to reduce log loss. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait(reader_tasks, timeout=1) + for task in reader_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*reader_tasks, return_exceptions=True) + + if handle.log_file: + handle.log_file.close() + if handle.kill_requested: + status = JobStatus.KILLED + reason = JobStatusInfo(message="Job killed!") + await self._emit_completion([JobCompletionEvent(handle.spec, status, reason)]) + + def _launch_interactive_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle, JobCompletionEvent | None]: + """Launch a job in interactive mode with transparent stdin and stdout.""" + start_time = time.monotonic() + exit_code = None + completion = None + + if log_file is not None: + try: + proc = subprocess.Popen( + shlex.split(job.cmd), + # Transparent stdin/stdout, stdout & stderr muxed and tee'd via the pipe. + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + env=env, + ) + if proc.stdout is not None: + for line in proc.stdout: + print(line, end="") # noqa: T201 + log_file.write(line) + log_file.flush() + + exit_code = proc.wait() + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + runtime = time.monotonic() - start_time + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=None, + log_file=log_file, + start_time=start_time, + ) + + if exit_code is not None: + status, reason = self._finish_job(handle, exit_code, runtime) + completion = JobCompletionEvent(job, status, reason) + + return handle, completion + + async def _launch_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle | None, JobCompletionEvent | None]: + """Launch a job (in non-interactive mode) as an async subprocess.""" + proc = None + completion = None + if log_file is not None: + try: + proc = await asyncio.create_subprocess_exec( + *shlex.split(job.cmd), + # TODO: currently we mux the stdout and stderr streams by default. It would be + # useful to make this behaviour optional on some global `IoPolicy`. + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + except BlockingIOError: + # Skip this job for now; the scheduler should re-try to launch it later. + log_file.close() + return None, None + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=proc, + log_file=log_file, + start_time=time.monotonic(), + ) + + # Create a task to asynchronously monitor the launched subprocess. + # We must store a reference in self._tasks to ensure the task is not GC'd. + if proc is not None: + task = asyncio.create_task(self._monitor_job(handle)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + return handle, completion + + async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]: + """Submit & launch multiple jobs. + + Returns: + mapping from job.id -> JobHandle. Entries are only present for jobs that successfully + launched; jobs that failed in a non-fatal way are missing, and should be retried. + + """ + completions: list[JobCompletionEvent] = [] + handles: dict[Hashable, JobHandle] = {} + + for job in jobs: + env = self._build_job_env(job) + self._prepare_launch(job) + + log_file = None + try: + log_file = job.log_path.open("w", encoding="utf-8", errors="surrogateescape") + except BlockingIOError: + continue # Skip this job for now; the scheduler should re-try to launch it later. + except OSError as e: + log.exception("Error writing to job log file: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason)) + + if job.interactive: + handle, completion = self._launch_interactive_job(job, log_file, env) + else: + handle, completion = await self._launch_job(job, log_file, env) + if completion is not None: + completions.append(completion) + if handle is not None: + handles[job.id] = handle + + if completions: + await self._emit_completion(completions) + + return handles + + def _send_kill_signal(self, proc: asyncio.subprocess.Process, signal_num: int) -> None: + """Send a (kill) signal to a process and all its descendent processes.""" + # TODO: maybe this should use cgroups in the future to be thorough? + for child in psutil.Process(proc.pid).children(recursive=True): + child.send_signal(signal_num) + proc.send_signal(signal_num) + + async def _kill_job(self, handle: LocalJobHandle) -> None: + """Kill the running local process, sending SIGTERM and then SIGKILL if that didn't work.""" + proc = handle.process + if proc is None: + return + + if proc.returncode is None: + handle.kill_requested = True + try: + self._send_kill_signal(proc, signal.SIGTERM) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigterm_timeout) + except asyncio.TimeoutError: + pass + else: + return + + if proc.returncode is None: + log.warning( + "Job '%s' was not killed with SIGTERM after %g seconds, sending SIGKILL.", + handle.spec.full_name, + self.sigterm_timeout, + ) + try: + self._send_kill_signal(proc, signal.SIGKILL) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigkill_timeout) + except asyncio.TimeoutError: + # proc.wait() completes only when the kernel reaps the process. If we sent SIGKILL + # and did not see this happen for a bit, the process is probably blocked in the + # kernel somewhere (e.g. NFS hang, slow or dead disk I/O). + log.error( + "Job '%s' was not killed with SIGKILL after %g seconds, so give up on it.", + handle.spec.full_name, + self.sigkill_timeout, + ) + + async def kill_many(self, handles: Iterable[JobHandle]) -> None: + """Cancel ongoing jobs via their handle. Killed jobs should still "complete".""" + tasks = [] + for handle in handles: + if not isinstance(handle, LocalJobHandle): + msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`." + raise TypeError(msg) + if handle.process and not handle.kill_requested and handle.process.returncode is None: + tasks.append(asyncio.create_task(self._kill_job(handle))) + + if tasks: + # Wait for all job subprocesses to be killed; `_monitor_job` handles the completions. + await asyncio.gather(*tasks)