diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index a84ae674..ff126f08 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -422,7 +422,7 @@ def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: json.dumps( # Sort on full name to ensure consistent ordering sorted( - [d.model_dump() for d in deploy], + [d.dump() for d in deploy], key=lambda d: d["full_name"], ), indent=2, @@ -445,16 +445,19 @@ def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: prints the full list of failures for debug / triage to the final report, which is in markdown format. - Results: - dictionary mapping deployed item names to job status. + Args: + results: dictionary mapping deployed item names to completed job status. + + Returns: + Results as a formatted string """ def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: - """Public facing API for _gen_results(). + """Generate flow results. Args: - results: dictionary mapping deployed item names to job status. + results: dictionary mapping deployed item names to completed job status. """ for item in self.cfgs: diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 53944224..511f157a 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -11,6 +11,8 @@ from pathlib import Path from typing import TYPE_CHECKING, ClassVar +from pydantic import BaseModel +from pydantic.config import ConfigDict from tabulate import tabulate from dvsim.job.time import JobTime @@ -26,6 +28,21 @@ if TYPE_CHECKING: from dvsim.flow.sim import SimCfg + from dvsim.modes import BuildMode + + +class WorkspaceConfig(BaseModel): + """Workspace configuration.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + project: str + timestamp: str + + project_root: Path + scratch_root: Path + scratch_path: Path + __all__ = ( "CompileSim", @@ -101,6 +118,14 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Job's wall clock time (a.k.a CPU time, or runtime). self.job_runtime = JobTime() + self.workspace_cfg = WorkspaceConfig( + project=sim_cfg.name, + project_root=sim_cfg.proj_root, + scratch_root=Path(sim_cfg.scratch_root), + scratch_path=Path(sim_cfg.scratch_path), + timestamp=sim_cfg.args.timestamp, + ) + def _define_attrs(self) -> None: """Define the attributes this instance needs to have. @@ -293,7 +318,7 @@ def pre_launch(self, launcher: Launcher) -> None: This is invoked by launcher::_pre_launch(). """ - def post_finish(self, status) -> None: + def post_finish(self, status: str) -> None: """Perform additional post-finish activities (callback). This is invoked by launcher::_post_finish(). @@ -328,12 +353,9 @@ def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.") self.job_runtime.set(job_runtime_secs, "s") - def model_dump(self) -> Mapping: + def dump(self) -> Mapping: """Dump the deployment object to mapping object. - This method matches the interface provided by pydantic models to dump a - subset of the class attributes - Returns: Representation of a deployment object as a dict. @@ -371,6 +393,20 @@ def __init__(self, build_mode, sim_cfg) -> None: self.build_timeout_mins, ) + @staticmethod + def new(build_mode_obj: "BuildMode", sim_cfg: "SimCfg") -> "CompileSim": + """Create a new CompileSim object. + + Args: + build_mode_obj: build mode instance + sim_cfg: Simulation config object + + Returns: + new CompileSim object. + + """ + return CompileSim(build_mode=build_mode_obj, sim_cfg=sim_cfg) + def _define_attrs(self) -> None: """Define attributes.""" super()._define_attrs() @@ -711,6 +747,10 @@ def __init__(self, run_items, sim_cfg) -> None: else: self.cov_db_dirs.append(run.cov_db_dir) + # Sort the cov_db_dir except for the first directory + if len(self.cov_db_dirs) > 1: + self.cov_db_dirs = [self.cov_db_dirs[0], *sorted(self.cov_db_dirs[1:])] + # Early lookup the cov_merge_db_dir, which is a mandatory misc # attribute anyway. We need it to compute additional cov db dirs. self.cov_merge_db_dir = subst_wildcards("{cov_merge_db_dir}", sim_cfg.__dict__) diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index f5bc73c7..b852442f 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -19,7 +19,7 @@ from dvsim.utils import clean_odirs, mk_symlink, rm_path if TYPE_CHECKING: - from dvsim.job.deploy import Deploy + from dvsim.job.deploy import Deploy, WorkspaceConfig class LauncherError(Exception): @@ -67,7 +67,7 @@ class Launcher(ABC): poll_freq = 1 # Points to the python virtual env area. - pyvenv = None + pyvenv: Path | None = None # If a history of previous invocations is to be maintained, then keep no # more than this many directories. @@ -97,18 +97,18 @@ def __init__(self, deploy: "Deploy") -> None: deploy: deployment object that will be launched. """ - cfg = deploy.sim_cfg + workspace_cfg = deploy.workspace_cfg # One-time preparation of the workspace. if not Launcher.workspace_prepared: - # TODO: CLI args should be processed far earlier than this - self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args) + self.prepare_workspace(workspace_cfg) Launcher.workspace_prepared = True # One-time preparation of the workspace, specific to the cfg. - if cfg not in Launcher.workspace_prepared_for_cfg: - self.prepare_workspace_for_cfg(cfg) - Launcher.workspace_prepared_for_cfg.add(cfg) + project = workspace_cfg.project + if project not in Launcher.workspace_prepared_for_cfg: + self.prepare_workspace_for_cfg(workspace_cfg) + Launcher.workspace_prepared_for_cfg.add(project) # Store the deploy object handle. self.deploy = deploy @@ -155,34 +155,40 @@ def set_pyvenv(project: str) -> None: # The code below allows each launcher variant to set its own virtualenv # because the loading / activating mechanism could be different between # them. - Launcher.pyvenv = os.environ.get( - f"{project.upper()}_PYVENV_{Launcher.variant.upper()}", - ) + common_venv = f"{project.upper()}_PYVENV" + variant = Launcher.variant.upper() + + venv_path = os.environ.get(f"{common_venv}_{variant}") + + if not venv_path: + venv_path = os.environ.get(common_venv) - if not Launcher.pyvenv: - Launcher.pyvenv = os.environ.get(f"{project.upper()}_PYVENV") + if venv_path: + Launcher.pyvenv = Path(venv_path) @staticmethod @abstractmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod @abstractmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ def __str__(self) -> str: @@ -250,6 +256,10 @@ def poll(self) -> str | None: """Poll the launched job for completion. Invokes _check_status() and _post_finish() when the job completes. + + Returns: + status of the job or None + """ @abstractmethod diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 8bd38446..82e1da96 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -4,14 +4,14 @@ """Fake Launcher that returns random results.""" -from collections.abc import Mapping from random import choice, random from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher if TYPE_CHECKING: - from dvsim.job.deploy import CovReport, Deploy, RunTest + from dvsim.job.deploy import CovReport, Deploy, RunTest, WorkspaceConfig + __all__ = ("FakeLauncher",) @@ -75,22 +75,23 @@ def kill(self) -> None: ) @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index 8ea688ce..bc703f4d 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -8,14 +8,13 @@ import os import shlex import subprocess -from collections.abc import Mapping from pathlib import Path from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError if TYPE_CHECKING: - from dvsim.job.deploy import Deploy + from dvsim.job.deploy import Deploy, WorkspaceConfig class LocalLauncher(Launcher): @@ -184,22 +183,23 @@ def _close_job_log_file(self) -> None: self._log_file.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index e2933c7a..e8d5277c 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -7,19 +7,27 @@ import subprocess import tarfile from pathlib import Path +from typing import TYPE_CHECKING, ClassVar from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import clean_odirs +if TYPE_CHECKING: + from dvsim.job.deploy import Deploy, WorkspaceConfig + +__all__ = ("LsfLauncher",) + class LsfLauncher(Launcher): + """Launcher for IBM Spectrum LSF.""" + # A hidden directory specific to a cfg, where we put individual 'job' # scripts. - jobs_dir = {} + jobs_dir: ClassVar = {} # All launcher instances available for lookup. - jobs = {} + jobs: ClassVar = {} # When the job completes, we try to read the job script output to determine # the outcome. It may not have been completely written the first time we @@ -32,40 +40,63 @@ class LsfLauncher(Launcher): # `DVSIM_LSF_CFG` environment variable. @staticmethod - def prepare_workspace(project, repo_top, args) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + + """ # Since we dispatch to remote machines, a project specific python # virtualenv is exists, needs to be activated when launching the job. - Launcher.set_pyvenv(project) - if Launcher.pyvenv is None: + Launcher.set_pyvenv(cfg.project) + + pyvenv = Launcher.pyvenv + if pyvenv is None: return # If it is already a dir, then nothing to be done. - if os.path.isdir(Launcher.pyvenv): # noqa: PTH112 + if pyvenv.is_dir(): return # If not, then it needs to be a valid tarball. Extract it in the # scratch area if it does not exist. - stem = Path(Launcher.pyvenv).stem + stem = pyvenv.stem if stem.endswith("tar"): stem = stem[:-4] - path = Path(args.scratch_root, stem) + + path = cfg.scratch_root / stem if not path.is_dir(): - log.info("[prepare_workspace]: [pyvenv]: Extracting %s", Launcher.pyvenv) + log.info("[prepare_workspace]: [pyvenv]: Extracting %s", pyvenv) with tarfile.open(Launcher.pyvenv, mode="r") as tar: - tar.extractall(args.scratch_root) + tar.extractall(cfg.scratch_root) + log.info("[prepare_workspace]: [pyvenv]: Done: %s", path) + Launcher.pyvenv = path @staticmethod - def prepare_workspace_for_cfg(cfg) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + + """ # Create the job dir. - LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf", cfg.timestamp) - clean_odirs(odir=LsfLauncher.jobs_dir[cfg], max_odirs=2) - os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True) + LsfLauncher.jobs_dir[cfg.project] = cfg.scratch_path / "lsf" / cfg.timestamp + clean_odirs(odir=LsfLauncher.jobs_dir[cfg.project], max_odirs=2) + os.makedirs(Path(LsfLauncher.jobs_dir[cfg.project]), exist_ok=True) @staticmethod - def make_job_script(cfg, job_name): - """Creates the job script. + def make_job_script(cfg: "WorkspaceConfig", job_name: str): + """Create the job script. Once all jobs in the array are launched, the job script can be created. It is a bash script that takes the job index as a single argument. @@ -76,7 +107,13 @@ def make_job_script(cfg, job_name): creating individual scripts for each job which incurs additional file I/O overhead when the scratch area is on NFS, causing a slowdown. - Returns the path to the job script. + Args: + cfg: workspace configuration + job_name: name of the job to run + + Returns: + the path to the job script. + """ lines = ["#!/usr/bin/env bash\nset -e\n"] @@ -95,10 +132,11 @@ def make_job_script(cfg, job_name): if Launcher.pyvenv: lines += ["deactivate\n"] - job_script = Path(LsfLauncher.jobs_dir[cfg], job_name) + job_script = LsfLauncher.jobs_dir[cfg] / job_name try: with open(job_script, "w", encoding="utf-8") as f: f.writelines(lines) + except OSError as e: err_msg = f"ERROR: Failed to write {job_script}:\n{e}" LsfLauncher._post_finish_job_array(cfg, job_name, err_msg) @@ -107,7 +145,13 @@ def make_job_script(cfg, job_name): log.verbose("[job_script]: %s", job_script) return job_script - def __init__(self, deploy) -> None: + def __init__(self, deploy: "Deploy") -> None: + """Initialise the LSF Launcher. + + Args: + deploy: job to remotely deploy. + + """ super().__init__(deploy) # Maintain the job script output as an instance variable for polling @@ -136,10 +180,11 @@ def __init__(self, deploy) -> None: self.index = len(job_name_list) def _do_launch(self) -> None: + """Launch the job.""" # Add self to the list of jobs. job_name = self.deploy.job_name - cfg = self.deploy.sim_cfg - job_total = len(LsfLauncher.jobs[cfg][job_name]) + cfg = self.deploy.workspace_cfg + job_total = len(LsfLauncher.jobs[cfg.project][job_name]) # The actual launching of the bsub command cannot happen until the # Scheduler has dispatched ALL jobs in the array. @@ -211,13 +256,15 @@ def _do_launch(self) -> None: # Need to mark all jobs in this range with this fail pattern. err_msg = e.stderr.decode("utf-8").strip() self._post_finish_job_array(cfg, job_name, err_msg) - raise LauncherError(err_msg) + + raise LauncherError(err_msg) from e # Extract the job ID. result = p.stdout.decode("utf-8").strip() job_id = result.split("Job <")[1].split(">")[0] if not job_id: self._post_finish_job_array(cfg, job_name, "Job ID not found!") + err_msg = f"job (id:{job_id}) not found" raise LauncherError(err_msg) for job in LsfLauncher.jobs[cfg][job_name]: @@ -225,7 +272,13 @@ def _do_launch(self) -> None: job.job_id = f"{job_id}[{job.index}]" job._link_odir("D") - def poll(self): + def poll(self) -> str | None: + """Poll the status of the job. + + Returns: + status of the job or None + + """ # It is possible we may have determined the status already. if self.status: return self.status @@ -243,6 +296,7 @@ def poll(self): try: if not self.bsub_out.stat().st_size: return "D" + except FileNotFoundError: return "D" @@ -250,6 +304,7 @@ def poll(self): # file for reading. try: self.bsub_out_fd = open(self.bsub_out) + except OSError as e: self._post_finish( "F", @@ -283,7 +338,7 @@ def poll(self): # will resume reading it again at the next poll. We will do this upto # max_poll_retries times before giving up and flagging an error. # - # TODO: Consider using the IBM Plarform LSF Python APIs instead. + # TODO: Consider using the IBM Platform LSF Python APIs instead. # (deferred due to shortage of time / resources). # TODO: Parse job telemetry data for performance insights. @@ -306,15 +361,18 @@ def poll(self): if self.num_poll_retries == LsfLauncher.max_poll_retries: self._post_finish( "F", - "ERROR: Reached max retries while " - f"reading job script output {self.bsub_out} to determine" - " the outcome.", + ErrorMessage( + message="ERROR: Reached max retries while " + f"reading job script output {self.bsub_out} to determine" + " the outcome.", + context=[], + ), ) return "F" return "D" - def _get_job_exit_code(self): + def _get_job_exit_code(self) -> int | None: """Read the job script output to retrieve the exit code. Also read the error message if any, which will appear at the beginning @@ -337,7 +395,9 @@ def _get_job_exit_code(self): indicating whether it was successful or it failed with an exit code is used to return the exit code. - Returns the exit code if found, else None. + Returns: + the exit code if found, else None. + """ # Job script output must have been opened already. assert self.bsub_out_fd @@ -364,6 +424,7 @@ def _get_job_exit_code(self): return None def kill(self) -> None: + """Kill the remote job.""" if self.job_id: try: subprocess.run( @@ -376,9 +437,10 @@ def kill(self) -> None: else: log.error("Job ID for %s not found", self.deploy.full_name) - self._post_finish("K", "Job killed!") + self._post_finish("K", ErrorMessage(message="Job killed!", context=[])) - def _post_finish(self, status, err_msg) -> None: + def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: + """Tidy up after the job is complete.""" if self.bsub_out_fd: self.bsub_out_fd.close() if self.exit_code is None: @@ -386,12 +448,20 @@ def _post_finish(self, status, err_msg) -> None: super()._post_finish(status, err_msg) @staticmethod - def _post_finish_job_array(cfg, job_name, err_msg) -> None: + def _post_finish_job_array( + cfg: "WorkspaceConfig", + job_name: str, + err_msg: str, + ) -> None: """On LSF error, mark all jobs in this array as killed. - err_msg is the error message indicating the cause of failure. + Args: + cfg: workspace configuration + job_name: name of the job to run + err_msg: is the error message indicating the cause of failure. + """ - for job in LsfLauncher.jobs[cfg][job_name]: + for job in LsfLauncher.jobs[cfg.project][job_name]: job._post_finish( "F", ErrorMessage(line_number=None, message=err_msg, context=[err_msg]), diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index 2a5af9f7..ba7cc865 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -9,12 +9,15 @@ import pathlib import subprocess import sys -from collections.abc import Mapping +from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import rm_path +if TYPE_CHECKING: + from dvsim.job.deploy import WorkspaceConfig + class NcLauncher(Launcher): """Implementation of Launcher to launch jobs using altair nc.""" @@ -45,7 +48,7 @@ def create_run_sh(self, full_path, cmd) -> None: "echo Launch end : `date`", "echo CPU time : $SECONDS sec", ] - with open(run_file, "w") as f: + with pathlib.Path(run_file).open("w") as f: f.write("\n".join(lines)) pathlib.Path(run_file).chmod(0o755) @@ -266,22 +269,23 @@ def _close_process(self) -> None: self.process.stdout.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 0d759762..3989d175 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -8,12 +8,15 @@ import pathlib import shlex import subprocess -from collections.abc import Mapping from subprocess import PIPE, Popen +from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.launcher.sge.engine import * # noqa: F403 +if TYPE_CHECKING: + from dvsim.job.deploy import WorkspaceConfig + global job_name pid = os.getpid() @@ -167,22 +170,23 @@ def _close_process(self) -> None: self.process.stdout.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index e631a556..d246d31d 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -7,11 +7,14 @@ import shlex import shutil import subprocess -from collections.abc import Mapping +from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log +if TYPE_CHECKING: + from dvsim.job.deploy import WorkspaceConfig + SLURM_QUEUE = os.environ.get("SLURM_QUEUE", "hw-m") SLURM_MEM = os.environ.get("SLURM_MEM", "16G") SLURM_MINCPUS = os.environ.get("SLURM_MINCPUS", "8") @@ -149,22 +152,23 @@ def _close_process(self) -> None: self.process.stdout.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index dc5bf0d2..de5f9041 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -91,7 +91,14 @@ def __init__( *, interactive: bool, ) -> None: - """Initialise a job scheduler.""" + """Initialise a job scheduler. + + Args: + items: sequence of jobs to deploy. + launcher_cls: Launcher class to use to deploy the jobs. + interactive: launch the tools in interactive mode. + + """ self.items: Sequence[Deploy] = items # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen @@ -148,7 +155,7 @@ def __init__( msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) self.status_printer.init_target(target=target, msg=msg) - # A map from the Deploy object names tracked by this class to their + # A map from the Deployment names tracked by this class to their # current status. This status is 'Q', 'D', 'P', 'F' or 'K', # corresponding to membership in the dicts above. This is not # per-target. @@ -271,8 +278,13 @@ def _enqueue_successors(self, item: Deploy | None = None) -> None: them to _queued. """ for next_item in self._get_successors(item): - assert next_item.full_name not in self.item_status - assert next_item not in self._queued[next_item.target] + if ( + next_item.full_name in self.item_status + or next_item in self._queued[next_item.target] + ): + msg = f"Job {next_item.full_name} already scheduled" + raise RuntimeError(msg) + self.item_status[next_item.full_name] = "Q" self._queued[next_item.target].append(next_item) self._unschedule_item(next_item) @@ -281,6 +293,10 @@ def _cancel_successors(self, item: Deploy) -> None: """Cancel an item's successors. Recursively move them from _scheduled or _queued to _killed. + + Args: + item: job whose successors are to be canceled. + """ items = list(self._get_successors(item)) while items: @@ -291,14 +307,17 @@ def _cancel_successors(self, item: Deploy) -> None: def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: """Find immediate successors of an item. - 'item' is a job that has completed. We choose the target that follows - the 'item''s current target and find the list of successors whose - dependency list contains 'item'. If 'item' is None, we pick successors - from all cfgs, else we pick successors only from the cfg to which the - item belongs. + We choose the target that follows the 'item''s current target and find + the list of successors whose dependency list contains 'item'. If 'item' + is None, we pick successors from all cfgs, else we pick successors only + from the cfg to which the item belongs. + + Args: + item: is a job that has completed. + + Returns: + list of item's successors, or an empty list if there are none. - Returns the list of item's successors, or an empty list if there are - none. """ if item is None: target = next(iter(self._scheduled)) @@ -320,8 +339,10 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: while not found: if target == item.target: found = True + try: target = next(target_iterator) + except StopIteration: return [] @@ -349,7 +370,15 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: return successors def _ok_to_enqueue(self, item: Deploy) -> bool: - """Return true if ALL dependencies of item are complete.""" + """Check if all dependencies jobs are completed. + + Args: + item: is a deployment job. + + Returns: + true if ALL dependencies of item are complete. + + """ for dep in item.dependencies: # Ignore dependencies that were not scheduled to run. if dep not in self.items: @@ -366,11 +395,18 @@ def _ok_to_enqueue(self, item: Deploy) -> bool: return True def _ok_to_run(self, item: Deploy) -> bool: - """Return true if the required dependencies have passed. + """Check if a job is ready to start. The item's needs_all_dependencies_passing setting is used to figure out whether we can run this item or not, based on its dependent jobs' statuses. + + Args: + item: is a deployment job. + + Returns: + true if the required dependencies have passed. + """ # 'item' can run only if its dependencies have passed (their results # should already show up in the item to status map). @@ -395,7 +431,9 @@ def _ok_to_run(self, item: Deploy) -> bool: def _poll(self, hms: str) -> bool: """Check for running items that have finished. - Returns True if something changed. + Returns: + True if something changed. + """ max_poll = min( self.launcher_cls.max_poll, @@ -615,6 +653,11 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: Supplied item may be in _scheduled list or the _queued list. From either, we move it straight to _killed. + + Args: + item: is a deployment job. + cancel_successors: if set then cancel successors as well (True). + """ self.item_status[item.full_name] = "K" self._killed[item.target].add(item) @@ -627,7 +670,12 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: self._cancel_successors(item) def _kill_item(self, item: Deploy) -> None: - """Kill a running item and cancel all of its successors.""" + """Kill a running item and cancel all of its successors. + + Args: + item: is a deployment job. + + """ self._launchers[item.full_name].kill() self.item_status[item.full_name] = "K" self._killed[item.target].add(item) diff --git a/src/dvsim/sim_utils.py b/src/dvsim/sim_utils.py index 1c4f3149..48ee6c87 100644 --- a/src/dvsim/sim_utils.py +++ b/src/dvsim/sim_utils.py @@ -6,15 +6,19 @@ import re from collections import OrderedDict +from collections.abc import Sequence +from io import TextIOWrapper from pathlib import Path -def get_cov_summary_table(cov_report_txt: Path, tool: str): +def get_cov_summary_table( + txt_cov_report: Path, + tool: str, +) -> tuple[Sequence[Sequence[str]], str]: """Capture the summary results as a list of lists. The text coverage report is passed as input to the function, in addition to - the tool used. The tool returns a 2D list if the coverage report file was read - and the coverage was extracted successfully. + the tool used. Returns: tuple of, List of metrics and values, and final coverage total @@ -23,7 +27,7 @@ def get_cov_summary_table(cov_report_txt: Path, tool: str): the appropriate exception if the coverage summary extraction fails. """ - with Path(cov_report_txt).open() as f: + with Path(txt_cov_report).open() as f: if tool == "xcelium": return xcelium_cov_summary_table(f) @@ -35,7 +39,8 @@ def get_cov_summary_table(cov_report_txt: Path, tool: str): # Same desc as above, but specific to Xcelium and takes an opened input stream. -def xcelium_cov_summary_table(buf): +def xcelium_cov_summary_table(buf: TextIOWrapper) -> tuple[Sequence[Sequence[str]], str]: + """Capture the summary results as a list of lists from Xcelium.""" for line in buf: if "name" in line: # Strip the line and remove the unwanted "* Covered" string. @@ -87,7 +92,8 @@ def xcelium_cov_summary_table(buf): # Same desc as above, but specific to VCS and takes an opened input stream. -def vcs_cov_summary_table(buf): +def vcs_cov_summary_table(buf: TextIOWrapper) -> tuple[Sequence[Sequence[str]], str]: + """Capture the summary results as a list of lists from VCS.""" for line in buf: match = re.match("total coverage summary", line, re.IGNORECASE) if match: @@ -124,7 +130,7 @@ def get_job_runtime(log_text: list, tool: str) -> tuple[float, str]: tool: is the EDA tool used to run the job. Returns: - the runtime, units as a tuple. + a tuple of (runtime, units). Raises: NotImplementedError: exception if the EDA tool is not supported. @@ -184,8 +190,7 @@ def xcelium_job_runtime(log_text: list) -> tuple[float, str]: """ pattern = r"^TOOL:\s*xrun.*: Exiting on .*\(total:\s*(\d+):(\d+):(\d+)\)\s*$" for line in reversed(log_text): - m = re.search(pattern, line) - if m: + if m := re.search(pattern, line): t = int(m.group(1)) * 3600 + int(m.group(2)) * 60 + int(m.group(3)) return t, "s" msg = "Job runtime not found in the log." @@ -265,9 +270,10 @@ def vcs_simulated_time(log_text: list) -> tuple[float, str]: next_line = "" for line in reversed(log_text): - if "V C S S i m u l a t i o n R e p o r t" in line and ( - m := re.search(pattern, next_line) - ): + if "V C S S i m u l a t i o n R e p o r t" not in line: + continue + + if m := re.search(pattern, next_line): return float(m.group(1)), m.group(2).lower() next_line = line diff --git a/tests/job/test_deploy.py b/tests/job/test_deploy.py new file mode 100644 index 00000000..7f3c8254 --- /dev/null +++ b/tests/job/test_deploy.py @@ -0,0 +1,219 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Test Job deployment models.""" + +from collections.abc import Mapping + +import pytest +from hamcrest import assert_that, equal_to + +from dvsim.job.deploy import CompileSim + +__all__ = () + + +class FakeCliArgs: + """Fake CLI args.""" + + def __init__(self) -> None: + """Initialise fake command line arguments.""" + self.build_timeout_mins = None + self.timestamp = "timestamp" + + +class FakeSimCfg: + """Fake sim configuration.""" + + def __init__(self) -> None: + """Initialise fake sim configuration.""" + self.name = "flow_name" + self.variant = "variant" + + self.args = FakeCliArgs() + self.dry_run = True + self.gui = False + + self.scratch_path = "/scratch_path" + self.scratch_root = "/scratch_root" + self.proj_root = "/project" + + self.exports = [] + + self.flow_makefile = "path/to/makefile" + self.build_cmd = "path/to/{build_mode}/build_cmd" + self.pre_build_cmds = ["A", "B"] + self.post_build_cmds = ["C", "D"] + self.build_dir = "build/dir" + self.build_pass_patterns = None + self.build_fail_patterns = None + self.build_seed = 123 + + self.sv_flist_gen_cmd = "gen_cmd" + self.sv_flist_gen_opts = [] + self.sv_flist_gen_dir = "path/to/gen" + + self.cov = True + self.cov_db_dir = "path" + + +class FakeBuildMode: + """Fake BuildMode.""" + + def __init__(self) -> None: + """Initialise fake BuildMode.""" + self.name = "build_name" + self.build_timeout_mins = 500 + self.build_mode = "build_mode" + self.build_opts = ["-b path/here", '-a "Quoted"'] + + +def _build_compile_sim( + *, + build_overrides: Mapping | None = None, + sim_overrides: Mapping | None = None, + cli_args_overrides: Mapping | None = None, +) -> CompileSim: + """Build CompileSim object. + + Test helper that takes overrides to apply on top of the default values for + the BuildMode and SimCfg fake objects. + """ + cli_args = FakeCliArgs() + if cli_args_overrides: + for arg, value in cli_args_overrides.items(): + setattr(cli_args, arg, value) + + build_mode_obj = FakeBuildMode() + if build_overrides: + for arg, value in build_overrides.items(): + setattr(build_mode_obj, arg, value) + + sim_cfg = FakeSimCfg() + if sim_overrides: + for arg, value in sim_overrides.items(): + setattr(sim_cfg, arg, value) + + # Override the cli args in the sim configuration + sim_cfg.args = cli_args + + return CompileSim.new( + build_mode_obj=build_mode_obj, + sim_cfg=sim_cfg, + ) + + +class TestCompileSim: + """Test CompileSim.""" + + @staticmethod + @pytest.mark.parametrize( + ("build_overrides", "sim_overrides", "exp_cmd"), + [ + ( + {"dry_run": True}, + {}, + "make -f path/to/makefile build " + "-n " + "build_cmd=path/to/build_name/build_cmd " + "build_dir=build/dir " + "build_opts='-b path/here -a \"Quoted\"' " + "post_build_cmds='C && D' " + "pre_build_cmds='A && B' " + "proj_root=/project " + "sv_flist_gen_cmd=gen_cmd " + "sv_flist_gen_dir=path/to/gen " + "sv_flist_gen_opts=''", + ), + ( + {"dry_run": False}, + {}, + "make -f path/to/makefile build " + "build_cmd=path/to/build_name/build_cmd " + "build_dir=build/dir " + "build_opts='-b path/here -a \"Quoted\"' " + "post_build_cmds='C && D' " + "pre_build_cmds='A && B' " + "proj_root=/project " + "sv_flist_gen_cmd=gen_cmd " + "sv_flist_gen_dir=path/to/gen " + "sv_flist_gen_opts=''", + ), + ], + ) + def test_cmd(build_overrides: Mapping, sim_overrides: Mapping, exp_cmd: str) -> None: + """Test that a CompileSim has the expected cmd.""" + job = _build_compile_sim( + build_overrides=build_overrides, + sim_overrides=sim_overrides, + ) + + assert_that(job.cmd, equal_to(exp_cmd)) + + @staticmethod + @pytest.mark.parametrize( + ("build_overrides", "sim_overrides", "name", "full_name"), + [ + ({"name": "fred"}, {"variant": None}, "fred", "flow_name:fred"), + ({"name": "fred"}, {"variant": "v1"}, "fred", "flow_name_v1:fred"), + ({"name": "fred"}, {"name": "flow", "variant": None}, "fred", "flow:fred"), + ({"name": "george"}, {"variant": None}, "george", "flow_name:george"), + ({"name": "george"}, {"variant": "v2"}, "george", "flow_name_v2:george"), + ], + ) + def test_names( + build_overrides: Mapping, + sim_overrides: Mapping, + name: str, + full_name: str, + ) -> None: + """Test that a CompileSim ends up with the expected names.""" + job = _build_compile_sim( + build_overrides=build_overrides, + sim_overrides=sim_overrides, + ) + + assert_that(job.name, equal_to(name)) + assert_that(job.qual_name, equal_to(name)) + assert_that(job.full_name, equal_to(full_name)) + + @staticmethod + @pytest.mark.parametrize( + ("sim_overrides", "seed"), + [ + ({"build_seed": 123}, 123), + ({"build_seed": 631}, 631), + ], + ) + def test_seed( + sim_overrides: Mapping, + seed: int, + ) -> None: + """Test that a CompileSim ends up with the expected seed.""" + job = _build_compile_sim( + sim_overrides=sim_overrides, + ) + + assert_that(job.seed, equal_to(seed)) + + @staticmethod + @pytest.mark.parametrize( + ("cli_args_overrides", "build_overrides", "timeout"), + [ + ({"build_timeout_mins": 111}, {}, 111), + ({}, {"build_timeout_mins": 112}, 112), + ], + ) + def test_timeout( + cli_args_overrides: Mapping, + build_overrides: Mapping, + timeout: int, + ) -> None: + """Test that a CompileSim ends up with the expected timeout.""" + job = _build_compile_sim( + build_overrides=build_overrides, + cli_args_overrides=cli_args_overrides, + ) + + assert_that(job.build_timeout_mins, equal_to(timeout))