From c29e1e2d464fa48b5265fe37153c26b6d394cec7 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Thu, 16 Oct 2025 15:59:18 +0100 Subject: [PATCH 1/6] refactor: make cov_db_dirs deterministic Sort the coverage database paths, while leaving the first path in place. This should mean that the dumped deployment objects are now deterministic. At the moment when comparing dumps often the only difference is the order of these paths, adding unnecessary noise and making it more difficult to check for meaningful differences. Signed-off-by: James McCorrie --- src/dvsim/job/deploy.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 5394422..300ce9f 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -711,6 +711,10 @@ def __init__(self, run_items, sim_cfg) -> None: else: self.cov_db_dirs.append(run.cov_db_dir) + # Sort the cov_db_dir except for the first directory + if len(self.cov_db_dirs) > 1: + self.cov_db_dirs = [self.cov_db_dirs[0], *sorted(self.cov_db_dirs[1:])] + # Early lookup the cov_merge_db_dir, which is a mandatory misc # attribute anyway. We need it to compute additional cov db dirs. self.cov_merge_db_dir = subst_wildcards("{cov_merge_db_dir}", sim_cfg.__dict__) From 6b56725e39a51747916346d9f763d74dba39f180 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Thu, 16 Oct 2025 18:33:45 +0100 Subject: [PATCH 2/6] refactor: rename model_dump -> dump The pydantic model_dump method contains a lot more information, so rename to allow using these in parallel until all the Deploy classes are migrated to pydantic. Signed-off-by: James McCorrie --- src/dvsim/flow/base.py | 2 +- src/dvsim/job/deploy.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index a84ae67..8bee9e7 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -422,7 +422,7 @@ def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: json.dumps( # Sort on full name to ensure consistent ordering sorted( - [d.model_dump() for d in deploy], + [d.dump() for d in deploy], key=lambda d: d["full_name"], ), indent=2, diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 300ce9f..f7c4b58 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -328,12 +328,9 @@ def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.") self.job_runtime.set(job_runtime_secs, "s") - def model_dump(self) -> Mapping: + def dump(self) -> Mapping: """Dump the deployment object to mapping object. - This method matches the interface provided by pydantic models to dump a - subset of the class attributes - Returns: Representation of a deployment object as a dict. From 344fdde13da4368b501fc73e17ac95a791734360 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Thu, 16 Oct 2025 18:49:50 +0100 Subject: [PATCH 3/6] style: linting, docstrings and typing Signed-off-by: James McCorrie --- src/dvsim/flow/base.py | 11 ++++-- src/dvsim/job/deploy.py | 2 +- src/dvsim/launcher/base.py | 6 ++- src/dvsim/scheduler.py | 78 ++++++++++++++++++++++++++++++-------- src/dvsim/sim_utils.py | 30 +++++++++------ 5 files changed, 94 insertions(+), 33 deletions(-) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 8bee9e7..ff126f0 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -445,16 +445,19 @@ def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: prints the full list of failures for debug / triage to the final report, which is in markdown format. - Results: - dictionary mapping deployed item names to job status. + Args: + results: dictionary mapping deployed item names to completed job status. + + Returns: + Results as a formatted string """ def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: - """Public facing API for _gen_results(). + """Generate flow results. Args: - results: dictionary mapping deployed item names to job status. + results: dictionary mapping deployed item names to completed job status. """ for item in self.cfgs: diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index f7c4b58..5948867 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -293,7 +293,7 @@ def pre_launch(self, launcher: Launcher) -> None: This is invoked by launcher::_pre_launch(). """ - def post_finish(self, status) -> None: + def post_finish(self, status: str) -> None: """Perform additional post-finish activities (callback). This is invoked by launcher::_post_finish(). diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index f5bc73c..3614e78 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -67,7 +67,7 @@ class Launcher(ABC): poll_freq = 1 # Points to the python virtual env area. - pyvenv = None + pyvenv: Path | None = None # If a history of previous invocations is to be maintained, then keep no # more than this many directories. @@ -250,6 +250,10 @@ def poll(self) -> str | None: """Poll the launched job for completion. Invokes _check_status() and _post_finish() when the job completes. + + Returns: + status of the job or None + """ @abstractmethod diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index dc5bf0d..de5f904 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -91,7 +91,14 @@ def __init__( *, interactive: bool, ) -> None: - """Initialise a job scheduler.""" + """Initialise a job scheduler. + + Args: + items: sequence of jobs to deploy. + launcher_cls: Launcher class to use to deploy the jobs. + interactive: launch the tools in interactive mode. + + """ self.items: Sequence[Deploy] = items # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen @@ -148,7 +155,7 @@ def __init__( msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) self.status_printer.init_target(target=target, msg=msg) - # A map from the Deploy object names tracked by this class to their + # A map from the Deployment names tracked by this class to their # current status. This status is 'Q', 'D', 'P', 'F' or 'K', # corresponding to membership in the dicts above. This is not # per-target. @@ -271,8 +278,13 @@ def _enqueue_successors(self, item: Deploy | None = None) -> None: them to _queued. """ for next_item in self._get_successors(item): - assert next_item.full_name not in self.item_status - assert next_item not in self._queued[next_item.target] + if ( + next_item.full_name in self.item_status + or next_item in self._queued[next_item.target] + ): + msg = f"Job {next_item.full_name} already scheduled" + raise RuntimeError(msg) + self.item_status[next_item.full_name] = "Q" self._queued[next_item.target].append(next_item) self._unschedule_item(next_item) @@ -281,6 +293,10 @@ def _cancel_successors(self, item: Deploy) -> None: """Cancel an item's successors. Recursively move them from _scheduled or _queued to _killed. + + Args: + item: job whose successors are to be canceled. + """ items = list(self._get_successors(item)) while items: @@ -291,14 +307,17 @@ def _cancel_successors(self, item: Deploy) -> None: def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: """Find immediate successors of an item. - 'item' is a job that has completed. We choose the target that follows - the 'item''s current target and find the list of successors whose - dependency list contains 'item'. If 'item' is None, we pick successors - from all cfgs, else we pick successors only from the cfg to which the - item belongs. + We choose the target that follows the 'item''s current target and find + the list of successors whose dependency list contains 'item'. If 'item' + is None, we pick successors from all cfgs, else we pick successors only + from the cfg to which the item belongs. + + Args: + item: is a job that has completed. + + Returns: + list of item's successors, or an empty list if there are none. - Returns the list of item's successors, or an empty list if there are - none. """ if item is None: target = next(iter(self._scheduled)) @@ -320,8 +339,10 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: while not found: if target == item.target: found = True + try: target = next(target_iterator) + except StopIteration: return [] @@ -349,7 +370,15 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: return successors def _ok_to_enqueue(self, item: Deploy) -> bool: - """Return true if ALL dependencies of item are complete.""" + """Check if all dependencies jobs are completed. + + Args: + item: is a deployment job. + + Returns: + true if ALL dependencies of item are complete. + + """ for dep in item.dependencies: # Ignore dependencies that were not scheduled to run. if dep not in self.items: @@ -366,11 +395,18 @@ def _ok_to_enqueue(self, item: Deploy) -> bool: return True def _ok_to_run(self, item: Deploy) -> bool: - """Return true if the required dependencies have passed. + """Check if a job is ready to start. The item's needs_all_dependencies_passing setting is used to figure out whether we can run this item or not, based on its dependent jobs' statuses. + + Args: + item: is a deployment job. + + Returns: + true if the required dependencies have passed. + """ # 'item' can run only if its dependencies have passed (their results # should already show up in the item to status map). @@ -395,7 +431,9 @@ def _ok_to_run(self, item: Deploy) -> bool: def _poll(self, hms: str) -> bool: """Check for running items that have finished. - Returns True if something changed. + Returns: + True if something changed. + """ max_poll = min( self.launcher_cls.max_poll, @@ -615,6 +653,11 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: Supplied item may be in _scheduled list or the _queued list. From either, we move it straight to _killed. + + Args: + item: is a deployment job. + cancel_successors: if set then cancel successors as well (True). + """ self.item_status[item.full_name] = "K" self._killed[item.target].add(item) @@ -627,7 +670,12 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: self._cancel_successors(item) def _kill_item(self, item: Deploy) -> None: - """Kill a running item and cancel all of its successors.""" + """Kill a running item and cancel all of its successors. + + Args: + item: is a deployment job. + + """ self._launchers[item.full_name].kill() self.item_status[item.full_name] = "K" self._killed[item.target].add(item) diff --git a/src/dvsim/sim_utils.py b/src/dvsim/sim_utils.py index 1c4f314..48ee6c8 100644 --- a/src/dvsim/sim_utils.py +++ b/src/dvsim/sim_utils.py @@ -6,15 +6,19 @@ import re from collections import OrderedDict +from collections.abc import Sequence +from io import TextIOWrapper from pathlib import Path -def get_cov_summary_table(cov_report_txt: Path, tool: str): +def get_cov_summary_table( + txt_cov_report: Path, + tool: str, +) -> tuple[Sequence[Sequence[str]], str]: """Capture the summary results as a list of lists. The text coverage report is passed as input to the function, in addition to - the tool used. The tool returns a 2D list if the coverage report file was read - and the coverage was extracted successfully. + the tool used. Returns: tuple of, List of metrics and values, and final coverage total @@ -23,7 +27,7 @@ def get_cov_summary_table(cov_report_txt: Path, tool: str): the appropriate exception if the coverage summary extraction fails. """ - with Path(cov_report_txt).open() as f: + with Path(txt_cov_report).open() as f: if tool == "xcelium": return xcelium_cov_summary_table(f) @@ -35,7 +39,8 @@ def get_cov_summary_table(cov_report_txt: Path, tool: str): # Same desc as above, but specific to Xcelium and takes an opened input stream. -def xcelium_cov_summary_table(buf): +def xcelium_cov_summary_table(buf: TextIOWrapper) -> tuple[Sequence[Sequence[str]], str]: + """Capture the summary results as a list of lists from Xcelium.""" for line in buf: if "name" in line: # Strip the line and remove the unwanted "* Covered" string. @@ -87,7 +92,8 @@ def xcelium_cov_summary_table(buf): # Same desc as above, but specific to VCS and takes an opened input stream. -def vcs_cov_summary_table(buf): +def vcs_cov_summary_table(buf: TextIOWrapper) -> tuple[Sequence[Sequence[str]], str]: + """Capture the summary results as a list of lists from VCS.""" for line in buf: match = re.match("total coverage summary", line, re.IGNORECASE) if match: @@ -124,7 +130,7 @@ def get_job_runtime(log_text: list, tool: str) -> tuple[float, str]: tool: is the EDA tool used to run the job. Returns: - the runtime, units as a tuple. + a tuple of (runtime, units). Raises: NotImplementedError: exception if the EDA tool is not supported. @@ -184,8 +190,7 @@ def xcelium_job_runtime(log_text: list) -> tuple[float, str]: """ pattern = r"^TOOL:\s*xrun.*: Exiting on .*\(total:\s*(\d+):(\d+):(\d+)\)\s*$" for line in reversed(log_text): - m = re.search(pattern, line) - if m: + if m := re.search(pattern, line): t = int(m.group(1)) * 3600 + int(m.group(2)) * 60 + int(m.group(3)) return t, "s" msg = "Job runtime not found in the log." @@ -265,9 +270,10 @@ def vcs_simulated_time(log_text: list) -> tuple[float, str]: next_line = "" for line in reversed(log_text): - if "V C S S i m u l a t i o n R e p o r t" in line and ( - m := re.search(pattern, next_line) - ): + if "V C S S i m u l a t i o n R e p o r t" not in line: + continue + + if m := re.search(pattern, next_line): return float(m.group(1)), m.group(2).lower() next_line = line From 51fa0ef3db9fd6e3845df47ba4e404c1253d55b7 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Thu, 16 Oct 2025 19:09:02 +0100 Subject: [PATCH 4/6] refactor: add WorkspaceCfg Reduce dependency on sim_cfg by pulling out some of the data and putting it on a new WorkspaceCfg pydantic object. Signed-off-by: James McCorrie --- src/dvsim/job/deploy.py | 24 ++++++ src/dvsim/launcher/base.py | 36 +++++---- src/dvsim/launcher/fake.py | 17 +++-- src/dvsim/launcher/local.py | 16 ++-- src/dvsim/launcher/lsf.py | 114 +++++++++++++++++++++++------ src/dvsim/launcher/nc.py | 20 +++-- src/dvsim/launcher/sge/launcher.py | 18 +++-- src/dvsim/launcher/slurm.py | 18 +++-- 8 files changed, 186 insertions(+), 77 deletions(-) diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 5948867..4e274ad 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -11,6 +11,8 @@ from pathlib import Path from typing import TYPE_CHECKING, ClassVar +from pydantic import BaseModel +from pydantic.config import ConfigDict from tabulate import tabulate from dvsim.job.time import JobTime @@ -27,6 +29,20 @@ if TYPE_CHECKING: from dvsim.flow.sim import SimCfg + +class WorkspaceConfig(BaseModel): + """Workspace configuration.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + project: str + timestamp: str + + project_root: Path + scratch_root: Path + scratch_path: Path + + __all__ = ( "CompileSim", "Deploy", @@ -101,6 +117,14 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Job's wall clock time (a.k.a CPU time, or runtime). self.job_runtime = JobTime() + self.workspace_cfg = WorkspaceConfig( + project=sim_cfg.name, + project_root=sim_cfg.proj_root, + scratch_root=Path(sim_cfg.scratch_root), + scratch_path=Path(sim_cfg.scratch_path), + timestamp=sim_cfg.args.timestamp, + ) + def _define_attrs(self) -> None: """Define the attributes this instance needs to have. diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index 3614e78..39a0934 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -19,7 +19,7 @@ from dvsim.utils import clean_odirs, mk_symlink, rm_path if TYPE_CHECKING: - from dvsim.job.deploy import Deploy + from dvsim.job.deploy import Deploy, WorkspaceConfig class LauncherError(Exception): @@ -97,18 +97,18 @@ def __init__(self, deploy: "Deploy") -> None: deploy: deployment object that will be launched. """ - cfg = deploy.sim_cfg + workspace_cfg = deploy.workspace_cfg # One-time preparation of the workspace. if not Launcher.workspace_prepared: - # TODO: CLI args should be processed far earlier than this - self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args) + self.prepare_workspace(workspace_cfg) Launcher.workspace_prepared = True # One-time preparation of the workspace, specific to the cfg. - if cfg not in Launcher.workspace_prepared_for_cfg: - self.prepare_workspace_for_cfg(cfg) - Launcher.workspace_prepared_for_cfg.add(cfg) + project = workspace_cfg.project + if project not in Launcher.workspace_prepared_for_cfg: + self.prepare_workspace_for_cfg(workspace_cfg) + Launcher.workspace_prepared_for_cfg.add(project) # Store the deploy object handle. self.deploy = deploy @@ -155,34 +155,40 @@ def set_pyvenv(project: str) -> None: # The code below allows each launcher variant to set its own virtualenv # because the loading / activating mechanism could be different between # them. - Launcher.pyvenv = os.environ.get( - f"{project.upper()}_PYVENV_{Launcher.variant.upper()}", - ) + common_venv = f"{project.upper()}_PYVENV" + variant = Launcher.variant.upper() + + venv_path = os.environ.get(f"{common_venv}_{variant}") + + if not venv_path: + venv_path = os.environ.get(common_venv) if not Launcher.pyvenv: Launcher.pyvenv = os.environ.get(f"{project.upper()}_PYVENV") @staticmethod @abstractmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod @abstractmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ def __str__(self) -> str: diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 8bd3844..82e1da9 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -4,14 +4,14 @@ """Fake Launcher that returns random results.""" -from collections.abc import Mapping from random import choice, random from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher if TYPE_CHECKING: - from dvsim.job.deploy import CovReport, Deploy, RunTest + from dvsim.job.deploy import CovReport, Deploy, RunTest, WorkspaceConfig + __all__ = ("FakeLauncher",) @@ -75,22 +75,23 @@ def kill(self) -> None: ) @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index 8ea688c..bc703f4 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -8,14 +8,13 @@ import os import shlex import subprocess -from collections.abc import Mapping from pathlib import Path from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError if TYPE_CHECKING: - from dvsim.job.deploy import Deploy + from dvsim.job.deploy import Deploy, WorkspaceConfig class LocalLauncher(Launcher): @@ -184,22 +183,23 @@ def _close_job_log_file(self) -> None: self._log_file.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index e2933c7..19b8377 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -7,19 +7,27 @@ import subprocess import tarfile from pathlib import Path +from typing import TYPE_CHECKING, ClassVar from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import clean_odirs +if TYPE_CHECKING: + from dvsim.job.deploy import Deploy, WorkspaceConfig + +__all__ = ("LsfLauncher",) + class LsfLauncher(Launcher): + """Launcher for IBM Spectrum LSF.""" + # A hidden directory specific to a cfg, where we put individual 'job' # scripts. - jobs_dir = {} + jobs_dir: ClassVar = {} # All launcher instances available for lookup. - jobs = {} + jobs: ClassVar = {} # When the job completes, we try to read the job script output to determine # the outcome. It may not have been completely written the first time we @@ -32,7 +40,16 @@ class LsfLauncher(Launcher): # `DVSIM_LSF_CFG` environment variable. @staticmethod - def prepare_workspace(project, repo_top, args) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + + """ # Since we dispatch to remote machines, a project specific python # virtualenv is exists, needs to be activated when launching the job. Launcher.set_pyvenv(project) @@ -52,20 +69,31 @@ def prepare_workspace(project, repo_top, args) -> None: if not path.is_dir(): log.info("[prepare_workspace]: [pyvenv]: Extracting %s", Launcher.pyvenv) with tarfile.open(Launcher.pyvenv, mode="r") as tar: - tar.extractall(args.scratch_root) + tar.extractall(cfg.scratch_root) + log.info("[prepare_workspace]: [pyvenv]: Done: %s", path) + Launcher.pyvenv = path @staticmethod - def prepare_workspace_for_cfg(cfg) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + + """ # Create the job dir. LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf", cfg.timestamp) clean_odirs(odir=LsfLauncher.jobs_dir[cfg], max_odirs=2) os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True) @staticmethod - def make_job_script(cfg, job_name): - """Creates the job script. + def make_job_script(cfg: "WorkspaceConfig", job_name: str): + """Create the job script. Once all jobs in the array are launched, the job script can be created. It is a bash script that takes the job index as a single argument. @@ -76,7 +104,13 @@ def make_job_script(cfg, job_name): creating individual scripts for each job which incurs additional file I/O overhead when the scratch area is on NFS, causing a slowdown. - Returns the path to the job script. + Args: + cfg: workspace configuration + job_name: name of the job to run + + Returns: + the path to the job script. + """ lines = ["#!/usr/bin/env bash\nset -e\n"] @@ -107,7 +141,13 @@ def make_job_script(cfg, job_name): log.verbose("[job_script]: %s", job_script) return job_script - def __init__(self, deploy) -> None: + def __init__(self, deploy: "Deploy") -> None: + """Initialise the LSF Launcher. + + Args: + deploy: job to remotely deploy. + + """ super().__init__(deploy) # Maintain the job script output as an instance variable for polling @@ -136,10 +176,11 @@ def __init__(self, deploy) -> None: self.index = len(job_name_list) def _do_launch(self) -> None: + """Launch the job.""" # Add self to the list of jobs. job_name = self.deploy.job_name - cfg = self.deploy.sim_cfg - job_total = len(LsfLauncher.jobs[cfg][job_name]) + cfg = self.deploy.workspace_cfg + job_total = len(LsfLauncher.jobs[cfg.project][job_name]) # The actual launching of the bsub command cannot happen until the # Scheduler has dispatched ALL jobs in the array. @@ -211,13 +252,15 @@ def _do_launch(self) -> None: # Need to mark all jobs in this range with this fail pattern. err_msg = e.stderr.decode("utf-8").strip() self._post_finish_job_array(cfg, job_name, err_msg) - raise LauncherError(err_msg) + + raise LauncherError(err_msg) from e # Extract the job ID. result = p.stdout.decode("utf-8").strip() job_id = result.split("Job <")[1].split(">")[0] if not job_id: self._post_finish_job_array(cfg, job_name, "Job ID not found!") + err_msg = f"job (id:{job_id}) not found" raise LauncherError(err_msg) for job in LsfLauncher.jobs[cfg][job_name]: @@ -225,7 +268,13 @@ def _do_launch(self) -> None: job.job_id = f"{job_id}[{job.index}]" job._link_odir("D") - def poll(self): + def poll(self) -> str | None: + """Poll the status of the job. + + Returns: + status of the job or None + + """ # It is possible we may have determined the status already. if self.status: return self.status @@ -243,6 +292,7 @@ def poll(self): try: if not self.bsub_out.stat().st_size: return "D" + except FileNotFoundError: return "D" @@ -250,6 +300,7 @@ def poll(self): # file for reading. try: self.bsub_out_fd = open(self.bsub_out) + except OSError as e: self._post_finish( "F", @@ -283,7 +334,7 @@ def poll(self): # will resume reading it again at the next poll. We will do this upto # max_poll_retries times before giving up and flagging an error. # - # TODO: Consider using the IBM Plarform LSF Python APIs instead. + # TODO: Consider using the IBM Platform LSF Python APIs instead. # (deferred due to shortage of time / resources). # TODO: Parse job telemetry data for performance insights. @@ -306,15 +357,18 @@ def poll(self): if self.num_poll_retries == LsfLauncher.max_poll_retries: self._post_finish( "F", - "ERROR: Reached max retries while " - f"reading job script output {self.bsub_out} to determine" - " the outcome.", + ErrorMessage( + message="ERROR: Reached max retries while " + f"reading job script output {self.bsub_out} to determine" + " the outcome.", + context=[], + ), ) return "F" return "D" - def _get_job_exit_code(self): + def _get_job_exit_code(self) -> int | None: """Read the job script output to retrieve the exit code. Also read the error message if any, which will appear at the beginning @@ -337,7 +391,9 @@ def _get_job_exit_code(self): indicating whether it was successful or it failed with an exit code is used to return the exit code. - Returns the exit code if found, else None. + Returns: + the exit code if found, else None. + """ # Job script output must have been opened already. assert self.bsub_out_fd @@ -364,6 +420,7 @@ def _get_job_exit_code(self): return None def kill(self) -> None: + """Kill the remote job.""" if self.job_id: try: subprocess.run( @@ -376,9 +433,10 @@ def kill(self) -> None: else: log.error("Job ID for %s not found", self.deploy.full_name) - self._post_finish("K", "Job killed!") + self._post_finish("K", ErrorMessage(message="Job killed!", context=[])) - def _post_finish(self, status, err_msg) -> None: + def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: + """Tidy up after the job is complete.""" if self.bsub_out_fd: self.bsub_out_fd.close() if self.exit_code is None: @@ -386,12 +444,20 @@ def _post_finish(self, status, err_msg) -> None: super()._post_finish(status, err_msg) @staticmethod - def _post_finish_job_array(cfg, job_name, err_msg) -> None: + def _post_finish_job_array( + cfg: "WorkspaceConfig", + job_name: str, + err_msg: str, + ) -> None: """On LSF error, mark all jobs in this array as killed. - err_msg is the error message indicating the cause of failure. + Args: + cfg: workspace configuration + job_name: name of the job to run + err_msg: is the error message indicating the cause of failure. + """ - for job in LsfLauncher.jobs[cfg][job_name]: + for job in LsfLauncher.jobs[cfg.project][job_name]: job._post_finish( "F", ErrorMessage(line_number=None, message=err_msg, context=[err_msg]), diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index 2a5af9f..ba7cc86 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -9,12 +9,15 @@ import pathlib import subprocess import sys -from collections.abc import Mapping +from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import rm_path +if TYPE_CHECKING: + from dvsim.job.deploy import WorkspaceConfig + class NcLauncher(Launcher): """Implementation of Launcher to launch jobs using altair nc.""" @@ -45,7 +48,7 @@ def create_run_sh(self, full_path, cmd) -> None: "echo Launch end : `date`", "echo CPU time : $SECONDS sec", ] - with open(run_file, "w") as f: + with pathlib.Path(run_file).open("w") as f: f.write("\n".join(lines)) pathlib.Path(run_file).chmod(0o755) @@ -266,22 +269,23 @@ def _close_process(self) -> None: self.process.stdout.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 0d75976..3989d17 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -8,12 +8,15 @@ import pathlib import shlex import subprocess -from collections.abc import Mapping from subprocess import PIPE, Popen +from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.launcher.sge.engine import * # noqa: F403 +if TYPE_CHECKING: + from dvsim.job.deploy import WorkspaceConfig + global job_name pid = os.getpid() @@ -167,22 +170,23 @@ def _close_process(self) -> None: self.process.stdout.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index e631a55..d246d31 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -7,11 +7,14 @@ import shlex import shutil import subprocess -from collections.abc import Mapping +from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log +if TYPE_CHECKING: + from dvsim.job.deploy import WorkspaceConfig + SLURM_QUEUE = os.environ.get("SLURM_QUEUE", "hw-m") SLURM_MEM = os.environ.get("SLURM_MEM", "16G") SLURM_MINCPUS = os.environ.get("SLURM_MINCPUS", "8") @@ -149,22 +152,23 @@ def _close_process(self) -> None: self.process.stdout.close() @staticmethod - def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + def prepare_workspace(cfg: "WorkspaceConfig") -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. Args: - project: the name of the project. - repo_top: the path to the repository. - args: command line args passed to dvsim. + cfg: workspace configuration """ @staticmethod - def prepare_workspace_for_cfg(cfg: Mapping) -> None: + def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. - 'cfg' is the flow configuration object. + + Args: + cfg: workspace configuration + """ From bd02e88ac9264fc5a9de1d9b3b602592ae76ed49 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Fri, 17 Oct 2025 08:55:12 +0100 Subject: [PATCH 5/6] test: add initial CompileSim unittest Add a `new()` static method to allow tests to test against the desired interface. The intention is to convert the deployment classes to Pydantic typed data classes. The deployment classes seem to be builders of fairly simple data classes once the construction process has been completed. The constructors take the data in other formats and then filter and transform to result in a new data class (deployment object). Pydantic data class constructors take in the final data, so we need a function to take the original data and do the transform. Having a `new()` static method is a fairly standard convention in languages such as Rust. For the moment this method just delegates to the existing constructor. The `CompileSim` unittests make use of this new method instead of the constructor directly. Which means they will be able to test the new code as well as the old code for A/B testing, without having to continually modify the tests. Signed-off-by: James McCorrie --- src/dvsim/job/deploy.py | 15 +++ tests/job/test_deploy.py | 219 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 tests/job/test_deploy.py diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 4e274ad..511f157 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -28,6 +28,7 @@ if TYPE_CHECKING: from dvsim.flow.sim import SimCfg + from dvsim.modes import BuildMode class WorkspaceConfig(BaseModel): @@ -392,6 +393,20 @@ def __init__(self, build_mode, sim_cfg) -> None: self.build_timeout_mins, ) + @staticmethod + def new(build_mode_obj: "BuildMode", sim_cfg: "SimCfg") -> "CompileSim": + """Create a new CompileSim object. + + Args: + build_mode_obj: build mode instance + sim_cfg: Simulation config object + + Returns: + new CompileSim object. + + """ + return CompileSim(build_mode=build_mode_obj, sim_cfg=sim_cfg) + def _define_attrs(self) -> None: """Define attributes.""" super()._define_attrs() diff --git a/tests/job/test_deploy.py b/tests/job/test_deploy.py new file mode 100644 index 0000000..7f3c825 --- /dev/null +++ b/tests/job/test_deploy.py @@ -0,0 +1,219 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Test Job deployment models.""" + +from collections.abc import Mapping + +import pytest +from hamcrest import assert_that, equal_to + +from dvsim.job.deploy import CompileSim + +__all__ = () + + +class FakeCliArgs: + """Fake CLI args.""" + + def __init__(self) -> None: + """Initialise fake command line arguments.""" + self.build_timeout_mins = None + self.timestamp = "timestamp" + + +class FakeSimCfg: + """Fake sim configuration.""" + + def __init__(self) -> None: + """Initialise fake sim configuration.""" + self.name = "flow_name" + self.variant = "variant" + + self.args = FakeCliArgs() + self.dry_run = True + self.gui = False + + self.scratch_path = "/scratch_path" + self.scratch_root = "/scratch_root" + self.proj_root = "/project" + + self.exports = [] + + self.flow_makefile = "path/to/makefile" + self.build_cmd = "path/to/{build_mode}/build_cmd" + self.pre_build_cmds = ["A", "B"] + self.post_build_cmds = ["C", "D"] + self.build_dir = "build/dir" + self.build_pass_patterns = None + self.build_fail_patterns = None + self.build_seed = 123 + + self.sv_flist_gen_cmd = "gen_cmd" + self.sv_flist_gen_opts = [] + self.sv_flist_gen_dir = "path/to/gen" + + self.cov = True + self.cov_db_dir = "path" + + +class FakeBuildMode: + """Fake BuildMode.""" + + def __init__(self) -> None: + """Initialise fake BuildMode.""" + self.name = "build_name" + self.build_timeout_mins = 500 + self.build_mode = "build_mode" + self.build_opts = ["-b path/here", '-a "Quoted"'] + + +def _build_compile_sim( + *, + build_overrides: Mapping | None = None, + sim_overrides: Mapping | None = None, + cli_args_overrides: Mapping | None = None, +) -> CompileSim: + """Build CompileSim object. + + Test helper that takes overrides to apply on top of the default values for + the BuildMode and SimCfg fake objects. + """ + cli_args = FakeCliArgs() + if cli_args_overrides: + for arg, value in cli_args_overrides.items(): + setattr(cli_args, arg, value) + + build_mode_obj = FakeBuildMode() + if build_overrides: + for arg, value in build_overrides.items(): + setattr(build_mode_obj, arg, value) + + sim_cfg = FakeSimCfg() + if sim_overrides: + for arg, value in sim_overrides.items(): + setattr(sim_cfg, arg, value) + + # Override the cli args in the sim configuration + sim_cfg.args = cli_args + + return CompileSim.new( + build_mode_obj=build_mode_obj, + sim_cfg=sim_cfg, + ) + + +class TestCompileSim: + """Test CompileSim.""" + + @staticmethod + @pytest.mark.parametrize( + ("build_overrides", "sim_overrides", "exp_cmd"), + [ + ( + {"dry_run": True}, + {}, + "make -f path/to/makefile build " + "-n " + "build_cmd=path/to/build_name/build_cmd " + "build_dir=build/dir " + "build_opts='-b path/here -a \"Quoted\"' " + "post_build_cmds='C && D' " + "pre_build_cmds='A && B' " + "proj_root=/project " + "sv_flist_gen_cmd=gen_cmd " + "sv_flist_gen_dir=path/to/gen " + "sv_flist_gen_opts=''", + ), + ( + {"dry_run": False}, + {}, + "make -f path/to/makefile build " + "build_cmd=path/to/build_name/build_cmd " + "build_dir=build/dir " + "build_opts='-b path/here -a \"Quoted\"' " + "post_build_cmds='C && D' " + "pre_build_cmds='A && B' " + "proj_root=/project " + "sv_flist_gen_cmd=gen_cmd " + "sv_flist_gen_dir=path/to/gen " + "sv_flist_gen_opts=''", + ), + ], + ) + def test_cmd(build_overrides: Mapping, sim_overrides: Mapping, exp_cmd: str) -> None: + """Test that a CompileSim has the expected cmd.""" + job = _build_compile_sim( + build_overrides=build_overrides, + sim_overrides=sim_overrides, + ) + + assert_that(job.cmd, equal_to(exp_cmd)) + + @staticmethod + @pytest.mark.parametrize( + ("build_overrides", "sim_overrides", "name", "full_name"), + [ + ({"name": "fred"}, {"variant": None}, "fred", "flow_name:fred"), + ({"name": "fred"}, {"variant": "v1"}, "fred", "flow_name_v1:fred"), + ({"name": "fred"}, {"name": "flow", "variant": None}, "fred", "flow:fred"), + ({"name": "george"}, {"variant": None}, "george", "flow_name:george"), + ({"name": "george"}, {"variant": "v2"}, "george", "flow_name_v2:george"), + ], + ) + def test_names( + build_overrides: Mapping, + sim_overrides: Mapping, + name: str, + full_name: str, + ) -> None: + """Test that a CompileSim ends up with the expected names.""" + job = _build_compile_sim( + build_overrides=build_overrides, + sim_overrides=sim_overrides, + ) + + assert_that(job.name, equal_to(name)) + assert_that(job.qual_name, equal_to(name)) + assert_that(job.full_name, equal_to(full_name)) + + @staticmethod + @pytest.mark.parametrize( + ("sim_overrides", "seed"), + [ + ({"build_seed": 123}, 123), + ({"build_seed": 631}, 631), + ], + ) + def test_seed( + sim_overrides: Mapping, + seed: int, + ) -> None: + """Test that a CompileSim ends up with the expected seed.""" + job = _build_compile_sim( + sim_overrides=sim_overrides, + ) + + assert_that(job.seed, equal_to(seed)) + + @staticmethod + @pytest.mark.parametrize( + ("cli_args_overrides", "build_overrides", "timeout"), + [ + ({"build_timeout_mins": 111}, {}, 111), + ({}, {"build_timeout_mins": 112}, 112), + ], + ) + def test_timeout( + cli_args_overrides: Mapping, + build_overrides: Mapping, + timeout: int, + ) -> None: + """Test that a CompileSim ends up with the expected timeout.""" + job = _build_compile_sim( + build_overrides=build_overrides, + cli_args_overrides=cli_args_overrides, + ) + + assert_that(job.build_timeout_mins, equal_to(timeout)) From c75629d3e91e17fbb3e89e67da701edebc84452b Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Fri, 17 Oct 2025 10:53:07 +0100 Subject: [PATCH 6/6] refactor: improvements in lsf launcher Migrate a dictionary to use sim_cfg names as keys rather than the config object itself, which assumes the object is hashable and creates unnecessary references. Signed-off-by: James McCorrie --- src/dvsim/launcher/base.py | 4 ++-- src/dvsim/launcher/lsf.py | 24 ++++++++++++++---------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index 39a0934..b852442 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -163,8 +163,8 @@ def set_pyvenv(project: str) -> None: if not venv_path: venv_path = os.environ.get(common_venv) - if not Launcher.pyvenv: - Launcher.pyvenv = os.environ.get(f"{project.upper()}_PYVENV") + if venv_path: + Launcher.pyvenv = Path(venv_path) @staticmethod @abstractmethod diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index 19b8377..e8d5277 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -52,22 +52,25 @@ def prepare_workspace(cfg: "WorkspaceConfig") -> None: """ # Since we dispatch to remote machines, a project specific python # virtualenv is exists, needs to be activated when launching the job. - Launcher.set_pyvenv(project) - if Launcher.pyvenv is None: + Launcher.set_pyvenv(cfg.project) + + pyvenv = Launcher.pyvenv + if pyvenv is None: return # If it is already a dir, then nothing to be done. - if os.path.isdir(Launcher.pyvenv): # noqa: PTH112 + if pyvenv.is_dir(): return # If not, then it needs to be a valid tarball. Extract it in the # scratch area if it does not exist. - stem = Path(Launcher.pyvenv).stem + stem = pyvenv.stem if stem.endswith("tar"): stem = stem[:-4] - path = Path(args.scratch_root, stem) + + path = cfg.scratch_root / stem if not path.is_dir(): - log.info("[prepare_workspace]: [pyvenv]: Extracting %s", Launcher.pyvenv) + log.info("[prepare_workspace]: [pyvenv]: Extracting %s", pyvenv) with tarfile.open(Launcher.pyvenv, mode="r") as tar: tar.extractall(cfg.scratch_root) @@ -87,9 +90,9 @@ def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """ # Create the job dir. - LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf", cfg.timestamp) - clean_odirs(odir=LsfLauncher.jobs_dir[cfg], max_odirs=2) - os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True) + LsfLauncher.jobs_dir[cfg.project] = cfg.scratch_path / "lsf" / cfg.timestamp + clean_odirs(odir=LsfLauncher.jobs_dir[cfg.project], max_odirs=2) + os.makedirs(Path(LsfLauncher.jobs_dir[cfg.project]), exist_ok=True) @staticmethod def make_job_script(cfg: "WorkspaceConfig", job_name: str): @@ -129,10 +132,11 @@ def make_job_script(cfg: "WorkspaceConfig", job_name: str): if Launcher.pyvenv: lines += ["deactivate\n"] - job_script = Path(LsfLauncher.jobs_dir[cfg], job_name) + job_script = LsfLauncher.jobs_dir[cfg] / job_name try: with open(job_script, "w", encoding="utf-8") as f: f.writelines(lines) + except OSError as e: err_msg = f"ERROR: Failed to write {job_script}:\n{e}" LsfLauncher._post_finish_job_array(cfg, job_name, err_msg)