diff --git a/src/dvsim/runtime/backend.py b/src/dvsim/runtime/backend.py index 8c62e279..cba5edcc 100644 --- a/src/dvsim/runtime/backend.py +++ b/src/dvsim/runtime/backend.py @@ -4,17 +4,46 @@ """Runtime backend abstract base class.""" +import os +import re from abc import ABC, abstractmethod -from collections.abc import Hashable, Iterable +from collections.abc import Hashable, Iterable, Sequence +from pathlib import Path -from dvsim.job.data import JobSpec +from dvsim.job.data import JobSpec, JobStatusInfo from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime from dvsim.logging import log from dvsim.runtime.data import ( CompletionCallback, JobCompletionEvent, JobHandle, ) +from dvsim.tool.utils import get_sim_tool_plugin +from dvsim.utils import clean_odirs + +__all__ = ("RuntimeBackend",) + +# A list of magic flags that are currently cleared. +# TODO: it would be good to find a nicer solution for this - perhaps a common configuration +# could just re-export it or define that it should not exist? Or it could be in a DVSim config. +MAGIC_VARS_TO_CLEAR = { + # This variable is used by recursive Make calls to pass variables from one level to the next. + # Even if our command here invokes Make, it should logically be a top-level invocation. We + # don't want to pollute the flow with Make variables from any wrapper that called DVSim. + "MAKEFLAGS", +} + +# Relative paths to files created in job output directories +ENV_DUMP_PATH = "env_vars" + + +# The number of lines to give as context when a failure pattern is parsed from a log file. +NUM_LOG_FAIL_CONTEXT_LINES = 4 +# The number of lines to give as context when pass patterns are missing from a log file. +NUM_LOG_PASS_CONTEXT_LINES = 10 +# The number of lines to give as context when a non-zero exit code is returned. +NUM_RETCODE_CONTEXT_LINES = 10 class RuntimeBackend(ABC): @@ -66,6 +95,9 @@ async def _emit_completion(self, events: Iterable[JobCompletionEvent]) -> None: raise RuntimeError("Backend not attached to the scheduler") for event in events: + # TODO: aim to refactor to remove these callbacks + event.spec.post_finish(event.status) + log.debug( "Job %s completed execution: %s", event.spec.qual_name, event.status.shorthand ) @@ -112,3 +144,228 @@ async def close(self) -> None: # noqa: B027 The default implementation just does nothing. """ + + def _build_job_env( + self, + job: JobSpec, + backend_env: dict[str, str] | None = None, + remove: Iterable[str] | None = None, + ) -> dict[str, str]: + """Build job environment configuration for a given job. + + Arguments: + job: The job specification to get the environment from. + context: The job execution context for this backend. + backend_env: Any backend-specific environment overrides to use. Defaults to None. + Takes precedence over the base OS environment, but is overridden by the job itself. + remove: A list of variables to remove from the final environment variable list. + Defaults to None. + + Returns the job environment as a mapping of env var names to values. + + """ + # Take the existing environment variables and update with any exports defined on the spec. + # TODO: consider adding some `--clean-env` CLI arg & flag to only use `job.exports` instead + # of also inheriting from `os.environ`? + env = dict(os.environ) + if backend_env: + env.update(backend_env) + env.update(job.exports) + + # If the job is set to run in "interactive" mode, we set the `RUN_INTERACTIVE` environment + # variable to 1, and also make a note in the environment. + if job.interactive: + env["DVSIM_RUN_INTERACTIVE"] = "1" + # TODO: Legacy environment variable not prefixed with `DVSIM` - deprecate this. + env["RUN_INTERACTIVE"] = "1" + + # Clear any magic flags or `remove` entries from the environment variable export list + for key in remove or (): + env.pop(key, None) + for magic_var in MAGIC_VARS_TO_CLEAR: + env.pop(magic_var, None) + + # Dump the environment variables to their own file to make debugging easier. + if job.odir and job.odir.exists(): + dump = job.odir / ENV_DUMP_PATH + with dump.open("w", encoding="utf-8", errors="surrogateescape") as f: + f.writelines(f"{key}={value}\n" for key, value in sorted(env.items())) + + return env + + def _make_job_output_directory(self, job: JobSpec) -> None: + """Create the output directory for a job. + + Depending on the configured `renew_odir` setting, this will optionally clean or maintain + a list of previous output directories for this job. + + """ + if job.renew_odir: + clean_odirs(odir=job.odir, max_odirs=self.max_output_dirs) + + Path(job.odir).mkdir(exist_ok=True, parents=True) + + def _prepare_launch(self, job: JobSpec) -> None: + """Do any pre-launch activities, preparing the environment. + + This may include clearing old runs, creating the output directory, etc. + """ + if job.interactive and not self.supports_interactive: + msg = f"Interactive jobs are not supported by the '{self.name}' backend." + raise RuntimeError(msg) + + job.pre_launch() + self._make_job_output_directory(job) + + def _finish_job( + self, handle: JobHandle, exit_code: int, runtime: float | None + ) -> tuple[JobStatus, JobStatusInfo | None]: + """Determine the outcome of a job that ran to completion, and parse extra log info. + + Updates the handle with any extracted job runtime & simulation time info. + """ + if handle.spec.dry_run: + return JobStatus.PASSED, None + + log_results = LogResults(handle.spec) + + # Update time information on the handle. + job_runtime, simulated_time = log_results.get_runtime_from_logs() + if job_runtime is None: + log.warning("%s: Using dvsim-maintained job_runtime instead.", handle.spec.full_name) + if runtime is not None: + handle.job_runtime.set(runtime, "s") + else: + handle.job_runtime.set(*job_runtime.get()) + if simulated_time is not None: + handle.simulated_time.set(*simulated_time.get()) + + # Determine the final status from the logs and exit code. + status, reason = log_results.get_status_from_logs() + if status is not None: + return status, reason + if exit_code != 0: + lines = log_results.get_lines() + return JobStatus.FAILED, JobStatusInfo( + message=f"Job returned a non-zero exit code: {exit_code}", + context=lines[-NUM_RETCODE_CONTEXT_LINES:], + ) + return JobStatus.PASSED, None + + +class LogResults: + """Wrapper for log result parsing which lazily loads the contents of the job log file.""" + + def __init__(self, job: JobSpec) -> None: + """Construct a LogResults object. Does not load the log file until needed.""" + self.spec = job + self._parsed = False + self._lines: list[str] | None = None + self._err_status: tuple[JobStatus, JobStatusInfo] | None = None + + def _ensure_log_parsed(self) -> None: + """Parse the log file into its lines if not already parsed.""" + if self._parsed: + return + + try: + with self.spec.log_path.open(encoding="utf-8", errors="surrogateescape") as f: + self._lines = f.readlines() + except OSError as e: + log.debug( + "%s: Error reading job log file %s: %s", + self.spec.full_name, + str(self.spec.log_path), + str(e), + ) + self._err_status = ( + JobStatus.FAILED, + JobStatusInfo(message=f"Error opening file {self.spec.log_path}:\n{e}"), + ) + finally: + self._parsed = True + + def get_lines(self) -> Sequence[str]: + """Get the sequence of lines in the log results, or an empty sequence if failed parsing.""" + self._ensure_log_parsed() + return () if self._lines is None else self._lines + + def get_status_from_logs(self) -> tuple[JobStatus | None, JobStatusInfo | None]: + """Determine the outcome of a completed job from its log file.""" + # Check we actually need to use the logs before loading them + use_log_check_strategy = bool(self.spec.fail_patterns) or bool(self.spec.pass_patterns) + if not use_log_check_strategy: + return None, None + + lines = self.get_lines() + if self._err_status: + return self._err_status + + fail_regex = None + if self.spec.fail_patterns: + fail_regex = re.compile("|".join(f"(?:{p})" for p in self.spec.fail_patterns)) + pass_regexes = {re.compile(pattern) for pattern in self.spec.pass_patterns} + + # TODO: does this need to be restricted to per-line patterns? It would complicate line + # number parsing, but it might be useful to make this more expressive? + for lineno, line in enumerate(lines, start=1): + # If the job matches ANY fail pattern, it fails. Provide some extra lines for context. + if fail_regex and fail_regex.search(line): + end = lineno + NUM_LOG_FAIL_CONTEXT_LINES + return JobStatus.FAILED, JobStatusInfo( + message=line.strip(), lines=[lineno], context=lines[lineno:end] + ) + + # The job must match ALL pass patterns to succeed. + matched = {regex for regex in pass_regexes if regex.search(line)} + pass_regexes -= matched + + if not pass_regexes and not fail_regex: + break # Early exit if possible + + if pass_regexes: + pass_patterns = [regex.pattern for regex in pass_regexes] + return JobStatus.FAILED, JobStatusInfo( + message=f"Some pass patterns missing: {pass_patterns}", + context=lines[-NUM_LOG_PASS_CONTEXT_LINES:], + ) + + return None, None + + def get_runtime_from_logs(self) -> tuple[JobTime | None, JobTime | None]: + """Try to determine a job's runtime from its log file, using specified extensions.""" + # TODO: rather than check the job type here, in the future the sim tool plugin should + # define the job types it supports. Even longer term, perhaps the job time and sim time + # should not be defined on the JobHandle/CompletedJobStatus and should be directly parsed + # out of the resulting log artifacts by the respective flows. + sim_job_types = ["CompileSim", "RunTest", "CovUnr", "CovMerge", "CovReport", "CovAnalyze"] + supports_log_info_ext = self.spec.job_type in sim_job_types + if not supports_log_info_ext: + return None, None + + lines = self.get_lines() + if self._err_status: + return None, None + + try: + plugin = get_sim_tool_plugin(tool=self.spec.tool.name) + except NotImplementedError as e: + log.error("%s: %s", self.spec.full_name, str(e)) + return None, None + + runtime = None + try: + time, unit = plugin.get_job_runtime(log_text=lines) + runtime = JobTime(time, unit) + except RuntimeError as e: + log.warning("%s: %s", self.spec.full_name, str(e)) + + simulated_time = None + if self.spec.job_type == "RunTest": + try: + time, unit = plugin.get_simulated_time(log_text=lines) + simulated_time = JobTime(time, unit) + except RuntimeError as e: + log.debug("%s: %s", self.spec.full_name, str(e)) + + return runtime, simulated_time diff --git a/src/dvsim/runtime/local.py b/src/dvsim/runtime/local.py new file mode 100644 index 00000000..71c98798 --- /dev/null +++ b/src/dvsim/runtime/local.py @@ -0,0 +1,334 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Legacy launcher adapter interface for the new async scheduler design.""" + +import asyncio +import contextlib +import shlex +import signal +import subprocess +import time +from collections.abc import Hashable, Iterable +from dataclasses import dataclass +from typing import TextIO + +import psutil + +from dvsim.job.data import JobSpec, JobStatusInfo +from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + + +@dataclass(kw_only=True) +class LocalJobHandle(JobHandle): + """Job handle for a job belonging to a legacy launcher adapter runtime backend.""" + + process: asyncio.subprocess.Process | None + log_file: TextIO | None + start_time: float + kill_requested: bool = False + + +class LocalRuntimeBackend(RuntimeBackend): + """Launch jobs as subprocesses on the user's local machine.""" + + name = "local" + supports_interactive = True + + DEFAULT_SIGTERM_TIMEOUT = 2.0 # in seconds + DEFAULT_SIGKILL_TIMEOUT = 2.0 # in seconds + + def __init__( + self, + *, + max_parallelism: int | None = None, + sigterm_timeout: float | None = None, + sigkill_timeout: float | None = None, + ) -> None: + """Construct a local runtime backend. + + Args: + max_parallelism: The maximum number of jobs that can be dispatched to this backend + at once. `0` means no limit, `None` means no override is applied to the default. + sigterm_timeout: The time to wait for a process to die after a SIGTERM when killing + it, before sending SIGKILL. + sigkill_timeout: The time to wait for a process to die after a SIGKILL when killing + it, before giving up (so the scheduler can progress). + + """ + super().__init__(max_parallelism=max_parallelism) + self.sigterm_timeout = ( + sigterm_timeout if sigterm_timeout is not None else self.DEFAULT_SIGTERM_TIMEOUT + ) + self.sigkill_timeout = ( + sigkill_timeout if sigkill_timeout is not None else self.DEFAULT_SIGKILL_TIMEOUT + ) + + # Retain references to created asyncio tasks so they don't get GC'd. + self._tasks: set[asyncio.Task] = set() + + async def _log_from_pipe( + self, handle: LocalJobHandle, stream: asyncio.StreamReader | None + ) -> None: + """Write piped asyncio subprocess stream contents to a job's log file.""" + if stream is None or not handle.log_file: + return + try: + async for line in stream: + decoded = line.decode("utf-8", errors="surrogateescape") + handle.log_file.write(decoded) + handle.log_file.flush() + except asyncio.CancelledError: + pass + + async def _monitor_job(self, handle: LocalJobHandle) -> None: + """Wait for subprocess completion and emit a completion event.""" + if handle.process is None: + return + + if handle.log_file: + handle.log_file.write(f"[Executing]:\n{handle.spec.cmd}\n\n") + handle.log_file.flush() + + reader_tasks = [ + asyncio.create_task(self._log_from_pipe(handle, handle.process.stdout)), + asyncio.create_task(self._log_from_pipe(handle, handle.process.stderr)), + ] + status = JobStatus.KILLED + reason = None + + try: + exit_code = await asyncio.wait_for( + handle.process.wait(), timeout=handle.spec.timeout_secs + ) + runtime = time.monotonic() - handle.start_time + status, reason = self._finish_job(handle, exit_code, runtime) + except asyncio.TimeoutError: + await self._kill_job(handle) + status = JobStatus.KILLED + timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes" + reason = JobStatusInfo(message=timeout_message) + finally: + # Explicitly cancel reader tasks and wait for them to finish before closing the log + # file. We first give them a second to finish naturally to reduce log loss. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait(reader_tasks, timeout=1) + for task in reader_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*reader_tasks, return_exceptions=True) + + if handle.log_file: + handle.log_file.close() + if handle.kill_requested: + status = JobStatus.KILLED + reason = JobStatusInfo(message="Job killed!") + await self._emit_completion([JobCompletionEvent(handle.spec, status, reason)]) + + def _launch_interactive_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle, JobCompletionEvent | None]: + """Launch a job in interactive mode with transparent stdin and stdout.""" + start_time = time.monotonic() + exit_code = None + completion = None + + if log_file is not None: + try: + proc = subprocess.Popen( + shlex.split(job.cmd), + # Transparent stdin/stdout, stdout & stderr muxed and tee'd via the pipe. + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + env=env, + ) + if proc.stdout is not None: + for line in proc.stdout: + print(line, end="") # noqa: T201 + log_file.write(line) + log_file.flush() + + exit_code = proc.wait() + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + runtime = time.monotonic() - start_time + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=None, + log_file=log_file, + start_time=start_time, + ) + + if exit_code is not None: + status, reason = self._finish_job(handle, exit_code, runtime) + completion = JobCompletionEvent(job, status, reason) + + return handle, completion + + async def _launch_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle | None, JobCompletionEvent | None]: + """Launch a job (in non-interactive mode) as an async subprocess.""" + proc = None + completion = None + if log_file is not None: + try: + proc = await asyncio.create_subprocess_exec( + *shlex.split(job.cmd), + # TODO: currently we mux the stdout and stderr streams by default. It would be + # useful to make this behaviour optional on some global `IoPolicy`. + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + except BlockingIOError: + # Skip this job for now; the scheduler should re-try to launch it later. + log_file.close() + return None, None + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=proc, + log_file=log_file, + start_time=time.monotonic(), + ) + + # Create a task to asynchronously monitor the launched subprocess. + # We must store a reference in self._tasks to ensure the task is not GC'd. + if proc is not None: + task = asyncio.create_task(self._monitor_job(handle)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + return handle, completion + + async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]: + """Submit & launch multiple jobs. + + Returns: + mapping from job.id -> JobHandle. Entries are only present for jobs that successfully + launched; jobs that failed in a non-fatal way are missing, and should be retried. + + """ + completions: list[JobCompletionEvent] = [] + handles: dict[Hashable, JobHandle] = {} + + for job in jobs: + env = self._build_job_env(job) + self._prepare_launch(job) + + log_file = None + try: + log_file = job.log_path.open("w", encoding="utf-8", errors="surrogateescape") + except BlockingIOError: + continue # Skip this job for now; the scheduler should re-try to launch it later. + except OSError as e: + log.exception("Error writing to job log file: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason)) + + if job.interactive: + handle, completion = self._launch_interactive_job(job, log_file, env) + else: + handle, completion = await self._launch_job(job, log_file, env) + if completion is not None: + completions.append(completion) + if handle is not None: + handles[job.id] = handle + + if completions: + await self._emit_completion(completions) + + return handles + + def _send_kill_signal(self, proc: asyncio.subprocess.Process, signal_num: int) -> None: + """Send a (kill) signal to a process and all its descendent processes.""" + # TODO: maybe this should use cgroups in the future to be thorough? + for child in psutil.Process(proc.pid).children(recursive=True): + child.send_signal(signal_num) + proc.send_signal(signal_num) + + async def _kill_job(self, handle: LocalJobHandle) -> None: + """Kill the running local process, sending SIGTERM and then SIGKILL if that didn't work.""" + proc = handle.process + if proc is None: + return + + if proc.returncode is None: + handle.kill_requested = True + try: + self._send_kill_signal(proc, signal.SIGTERM) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigterm_timeout) + except asyncio.TimeoutError: + pass + else: + return + + if proc.returncode is None: + log.warning( + "Job '%s' was not killed with SIGTERM after %g seconds, sending SIGKILL.", + handle.spec.full_name, + self.sigterm_timeout, + ) + try: + self._send_kill_signal(proc, signal.SIGKILL) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigkill_timeout) + except asyncio.TimeoutError: + # proc.wait() completes only when the kernel reaps the process. If we sent SIGKILL + # and did not see this happen for a bit, the process is probably blocked in the + # kernel somewhere (e.g. NFS hang, slow or dead disk I/O). + log.error( + "Job '%s' was not killed with SIGKILL after %g seconds, so give up on it.", + handle.spec.full_name, + self.sigkill_timeout, + ) + + async def kill_many(self, handles: Iterable[JobHandle]) -> None: + """Cancel ongoing jobs via their handle. Killed jobs should still "complete".""" + tasks = [] + for handle in handles: + if not isinstance(handle, LocalJobHandle): + msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`." + raise TypeError(msg) + if handle.process and not handle.kill_requested and handle.process.returncode is None: + tasks.append(asyncio.create_task(self._kill_job(handle))) + + if tasks: + # Wait for all job subprocesses to be killed; `_monitor_job` handles the completions. + await asyncio.gather(*tasks)