diff --git a/archive/archive.md b/archive/archive.md new file mode 100644 index 00000000..c30e92de --- /dev/null +++ b/archive/archive.md @@ -0,0 +1,29 @@ +# Diagram +![diagram](diagram.jpg) +# Code description + +This toolkit runs parameter sweeps for OpenFold, evaluates results, and stores the best runs in a structured Zarr archive for later analysis. + +## Overview + +It provides: + +- Parameter sweep execution (grid or incremental) +- Scoring of model outputs +- Selection of best runs +- Archival of results, metadata, and artifacts + +--- + +## Usage + +```bash +python sweep.py \ + --base_command "python run_openfold.py --fasta_path input.fasta" \ + --grid_json params.json \ + --runs_root outputs/sweep_runs \ + --archive_path standardizedarchive/openfold_best_runs.zarr \ + --best_log_path standardizedarchive/best_entries.jsonl \ + --top_k 3 \ + --score_key plddt \ + --sweep_strategy incremental \ No newline at end of file diff --git a/archive/diagram.jpg b/archive/diagram.jpg new file mode 100644 index 00000000..59f88543 Binary files /dev/null and b/archive/diagram.jpg differ diff --git a/archive/openfold_sweep_to_zarr.py b/archive/openfold_sweep_to_zarr.py new file mode 100644 index 00000000..40b984bb --- /dev/null +++ b/archive/openfold_sweep_to_zarr.py @@ -0,0 +1,345 @@ +from __future__ import annotations + +import argparse +from itertools import product +import json +import pickle +from pathlib import Path +import subprocess +from typing import Any, Dict, Iterable, List, Tuple + +from standardizedarchive.openfold_zarr_archive import ( + OpenFoldRunResult, + OpenFoldZarrArchive, + score_from_output_dict, + select_best_entries, +) + + +def _flag_value(name: str, value: Any) -> str: + if isinstance(value, bool): + return f"--{name}" if value else "" + return f"--{name} {value}" + + +def _normalize_for_id(value: Any) -> str: + safe = str(value).replace(" ", "_").replace("/", "_") + return safe[:80] + + +def expand_grid(param_grid: Dict[str, List[Any]]) -> Iterable[Dict[str, Any]]: + keys = sorted(param_grid.keys()) + value_lists = [param_grid[key] for key in keys] + for values in product(*value_lists): + yield dict(zip(keys, values)) + + +def _find_output_pickle(run_output_dir: Path) -> Path: + candidates = sorted(run_output_dir.rglob("*_output_dict.pkl")) + if not candidates: + raise FileNotFoundError( + f"No *_output_dict.pkl found under {run_output_dir}. " + "Ensure the command enables --save_outputs." + ) + return candidates[-1] + + +def _run_single_openfold_command( + base_command: str, + params: Dict[str, Any], + run_output_dir: Path, +) -> Tuple[subprocess.CompletedProcess[str], str]: + attn_map_dir = run_output_dir / "attention_maps" + attn_map_dir.mkdir(parents=True, exist_ok=True) + + flags = [_flag_value(name, value) for name, value in sorted(params.items())] + flags = [flag for flag in flags if flag] + + command = " ".join( + [ + base_command, + "--save_outputs", + f"--output_dir {run_output_dir}", + f"--attn_map_dir {attn_map_dir}", + *flags, + ] + ) + + completed = subprocess.run(command, shell=True, capture_output=True, text=True) + return completed, command + + +def _collect_result( + run_id: str, + params: Dict[str, Any], + run_output_dir: Path, + command: str, + score_key: str, + changed_param: str | None = None, + from_value: Any | None = None, + to_value: Any | None = None, + score_delta: float | None = None, + step_index: int | None = None, +) -> OpenFoldRunResult: + output_pickle = _find_output_pickle(run_output_dir) + with output_pickle.open("rb") as handle: + output_dict = pickle.load(handle) + + score = score_from_output_dict(output_dict, score_key=score_key) + return OpenFoldRunResult( + run_id=run_id, + score=score, + params=dict(params), + output_dir=str(run_output_dir), + command=command, + model_output_path=str(output_pickle), + changed_param=changed_param, + from_value=from_value, + to_value=to_value, + score_delta=score_delta, + step_index=step_index, + ) + + +def run_sweep( + base_command: str, + param_grid: Dict[str, List[Any]], + runs_root: Path, + score_key: str, +) -> List[OpenFoldRunResult]: + results: List[OpenFoldRunResult] = [] + failures: List[Tuple[str, int, str, str]] = [] + + for idx, params in enumerate(expand_grid(param_grid), start=1): + run_id_parts = [f"{k}-{_normalize_for_id(v)}" for k, v in sorted(params.items())] + run_id = f"run-{idx:03d}__" + "__".join(run_id_parts) + + run_output_dir = runs_root / run_id + run_output_dir.mkdir(parents=True, exist_ok=True) + + completed, command = _run_single_openfold_command(base_command, params, run_output_dir) + if completed.returncode != 0: + failures.append( + ( + run_id, + completed.returncode, + completed.stderr.strip(), + completed.stdout.strip(), + ) + ) + continue + + results.append( + _collect_result( + run_id=run_id, + params=params, + run_output_dir=run_output_dir, + command=command, + score_key=score_key, + ) + ) + + if failures: + print(f"[openfold-sweep] failed runs: {len(failures)}") + for run_id, returncode, stderr, stdout in failures: + print(f"[openfold-sweep] run={run_id} returncode={returncode}") + if stderr: + print("[openfold-sweep] stderr:") + print(stderr[-2000:]) + elif stdout: + print("[openfold-sweep] stdout:") + print(stdout[-2000:]) + + if not results: + raise RuntimeError( + "All OpenFold sweep runs failed. Check the per-run stderr summaries above." + ) + + return results + + +def run_incremental_sweep( + base_command: str, + param_grid: Dict[str, List[Any]], + runs_root: Path, + score_key: str, +) -> Tuple[List[OpenFoldRunResult], List[OpenFoldRunResult], OpenFoldRunResult]: + if not param_grid: + raise ValueError("param_grid must contain at least one parameter") + + ordered_keys = sorted(param_grid.keys()) + for key in ordered_keys: + values = param_grid[key] + if not isinstance(values, list) or len(values) == 0: + raise ValueError(f"Parameter '{key}' must map to a non-empty list") + + baseline_params = {key: param_grid[key][0] for key in ordered_keys} + all_results: List[OpenFoldRunResult] = [] + best_increment_entries: List[OpenFoldRunResult] = [] + failures: List[Tuple[str, int, str, str]] = [] + + baseline_run_id = "run-000__baseline" + baseline_output_dir = runs_root / baseline_run_id + baseline_output_dir.mkdir(parents=True, exist_ok=True) + baseline_completed, baseline_command = _run_single_openfold_command( + base_command, + baseline_params, + baseline_output_dir, + ) + if baseline_completed.returncode != 0: + raise RuntimeError( + "Baseline incremental run failed. stderr:\n" + + baseline_completed.stderr[-2000:] + ) + + current_best = _collect_result( + run_id=baseline_run_id, + params=baseline_params, + run_output_dir=baseline_output_dir, + command=baseline_command, + score_key=score_key, + step_index=0, + ) + all_results.append(current_best) + + run_counter = 1 + for step_index, key in enumerate(ordered_keys, start=1): + current_value = current_best.params[key] + candidates = [value for value in param_grid[key] if value != current_value] + + best_trial: OpenFoldRunResult | None = None + for candidate in candidates: + trial_params = dict(current_best.params) + trial_params[key] = candidate + + run_id = f"run-{run_counter:03d}__step-{step_index:02d}__{key}-{_normalize_for_id(candidate)}" + run_counter += 1 + + run_output_dir = runs_root / run_id + run_output_dir.mkdir(parents=True, exist_ok=True) + + completed, command = _run_single_openfold_command(base_command, trial_params, run_output_dir) + if completed.returncode != 0: + failures.append( + ( + run_id, + completed.returncode, + completed.stderr.strip(), + completed.stdout.strip(), + ) + ) + continue + + trial_result = _collect_result( + run_id=run_id, + params=trial_params, + run_output_dir=run_output_dir, + command=command, + score_key=score_key, + changed_param=key, + from_value=current_value, + to_value=candidate, + step_index=step_index, + ) + all_results.append(trial_result) + + if best_trial is None or trial_result.score > best_trial.score: + best_trial = trial_result + + if best_trial is None: + continue + + score_delta = best_trial.score - current_best.score + if score_delta > 0: + improved = OpenFoldRunResult( + run_id=best_trial.run_id, + score=best_trial.score, + params=best_trial.params, + output_dir=best_trial.output_dir, + command=best_trial.command, + model_output_path=best_trial.model_output_path, + changed_param=best_trial.changed_param, + from_value=best_trial.from_value, + to_value=best_trial.to_value, + score_delta=score_delta, + step_index=best_trial.step_index, + ) + best_increment_entries.append(improved) + current_best = improved + + if failures: + print(f"[openfold-sweep] failed runs: {len(failures)}") + for run_id, returncode, stderr, stdout in failures: + print(f"[openfold-sweep] run={run_id} returncode={returncode}") + if stderr: + print("[openfold-sweep] stderr:") + print(stderr[-2000:]) + elif stdout: + print("[openfold-sweep] stdout:") + print(stdout[-2000:]) + + if len(all_results) == 1 and not best_increment_entries: + print("[openfold-sweep] no successful incremental candidate runs; baseline only") + + return all_results, best_increment_entries, current_best + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run OpenFold parameter sweep and archive best entries in Zarr") + parser.add_argument("--base_command", required=True, help="Base command used to launch OpenFold runs") + parser.add_argument("--grid_json", required=True, help="JSON file mapping parameter names to candidate values") + parser.add_argument("--runs_root", default="outputs/sweep_runs", help="Directory for per-run outputs") + parser.add_argument("--archive_path", default="standardizedarchive/openfold_best_runs.zarr", help="Zarr archive output path") + parser.add_argument("--best_log_path", default="standardizedarchive/best_entries.jsonl", help="Best entries log path") + parser.add_argument("--top_k", type=int, default=1, help="Number of best entries to keep") + parser.add_argument("--score_key", default="plddt", help="Output dict key used for scoring") + parser.add_argument( + "--sweep_strategy", + choices=["incremental", "grid"], + default="incremental", + help="Sweep strategy: incremental one-parameter-at-a-time, or full grid search", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + + with open(args.grid_json, "r", encoding="utf-8") as handle: + param_grid = json.load(handle) + + if not isinstance(param_grid, dict): + raise ValueError("grid_json must contain an object mapping parameter names to value lists") + + runs_root = Path(args.runs_root) + runs_root.mkdir(parents=True, exist_ok=True) + + if args.sweep_strategy == "grid": + results = run_sweep( + base_command=args.base_command, + param_grid=param_grid, + runs_root=runs_root, + score_key=args.score_key, + ) + best_entries = select_best_entries(results, top_k=args.top_k) + else: + results, best_increment_entries, final_best = run_incremental_sweep( + base_command=args.base_command, + param_grid=param_grid, + runs_root=runs_root, + score_key=args.score_key, + ) + if best_increment_entries: + best_entries = best_increment_entries + else: + best_entries = [final_best] + + archive = OpenFoldZarrArchive(args.archive_path) + archive.root.attrs["sweep_strategy"] = args.sweep_strategy + archive.root.attrs["score_key"] = args.score_key + archive.append_best_entries(best_entries) + archive.write_best_log(best_entries, args.best_log_path) + + +if __name__ == "__main__": + main() diff --git a/archive/openfold_zarr_archive.py b/archive/openfold_zarr_archive.py new file mode 100644 index 00000000..dde879c3 --- /dev/null +++ b/archive/openfold_zarr_archive.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +import json +from pathlib import Path +import pickle +import re +import shlex +from typing import Any, Dict, Iterable, List + +import numpy as np + +try: + import zarr +except ImportError as exc: # pragma: no cover - exercised in environments missing zarr + raise ImportError( + "zarr is required for standardizedarchive. Install with `pip install zarr`." + ) from exc + + +@dataclass(frozen=True) +class OpenFoldRunResult: + run_id: str + score: float + params: Dict[str, Any] + output_dir: str + command: str + model_output_path: str + changed_param: str | None = None + from_value: Any | None = None + to_value: Any | None = None + score_delta: float | None = None + step_index: int | None = None + + +def score_from_output_dict(output_dict: Dict[str, Any], score_key: str = "plddt") -> float: + """Compute a scalar quality score from an OpenFold output dictionary.""" + if score_key not in output_dict: + keys = ", ".join(sorted(output_dict.keys())) + raise KeyError(f"Score key '{score_key}' not present in output dict. Keys: {keys}") + + values = np.asarray(output_dict[score_key], dtype=np.float64) + if values.size == 0: + raise ValueError(f"Score key '{score_key}' contained no values") + + return float(values.mean()) + + +def select_best_entries(entries: Iterable[OpenFoldRunResult], top_k: int = 1) -> List[OpenFoldRunResult]: + if top_k < 1: + raise ValueError("top_k must be >= 1") + + ranked = sorted(entries, key=lambda entry: entry.score, reverse=True) + return ranked[:top_k] + + +class OpenFoldZarrArchive: + """Persist best OpenFold sweep results in a Zarr hierarchy.""" + + def __init__(self, archive_path: str | Path): + self.archive_path = Path(archive_path) + self.root = zarr.open_group(str(self.archive_path), mode="a") + self.root.attrs.setdefault("archive_type", "openfold_parameter_sweep") + self.root.attrs.setdefault("created_at_utc", datetime.now(timezone.utc).isoformat()) + + def append_best_entries(self, entries: Iterable[OpenFoldRunResult]) -> None: + best_group = self.root.require_group("best_entries") + + for entry in entries: + run_group = best_group.require_group(entry.run_id) + run_group.attrs["score"] = float(entry.score) + run_group.attrs["params_json"] = json.dumps(entry.params, sort_keys=True) + run_group.attrs["output_dir"] = entry.output_dir + run_group.attrs["command"] = entry.command + run_group.attrs["model_output_path"] = entry.model_output_path + if entry.changed_param is not None: + run_group.attrs["changed_param"] = entry.changed_param + if entry.from_value is not None: + run_group.attrs["from_value_json"] = json.dumps(entry.from_value) + if entry.to_value is not None: + run_group.attrs["to_value_json"] = json.dumps(entry.to_value) + if entry.score_delta is not None: + run_group.attrs["score_delta"] = float(entry.score_delta) + if entry.step_index is not None: + run_group.attrs["step_index"] = int(entry.step_index) + run_group.attrs["saved_at_utc"] = datetime.now(timezone.utc).isoformat() + self._archive_run_artifacts(run_group, entry) + + def write_best_log(self, entries: Iterable[OpenFoldRunResult], log_path: str | Path) -> None: + log_file = Path(log_path) + log_file.parent.mkdir(parents=True, exist_ok=True) + with log_file.open("w", encoding="utf-8") as handle: + for entry in entries: + record = { + "run_id": entry.run_id, + "score": float(entry.score), + "params": entry.params, + "output_dir": entry.output_dir, + "command": entry.command, + "model_output_path": entry.model_output_path, + } + if entry.changed_param is not None: + record["changed_param"] = entry.changed_param + if entry.from_value is not None: + record["from_value"] = entry.from_value + if entry.to_value is not None: + record["to_value"] = entry.to_value + if entry.score_delta is not None: + record["score_delta"] = float(entry.score_delta) + if entry.step_index is not None: + record["step_index"] = int(entry.step_index) + handle.write(json.dumps(record, sort_keys=True) + "\n") + + @staticmethod + def _sanitize_component(name: str) -> str: + safe = re.sub(r"[^0-9A-Za-z._-]+", "_", name) + safe = safe.strip("._") + return safe or "item" + + @staticmethod + def _extract_flag_value(command: str, flag: str) -> str | None: + try: + tokens = shlex.split(command) + except ValueError: + return None + + value: str | None = None + for idx, token in enumerate(tokens): + if token == flag and idx + 1 < len(tokens): + value = tokens[idx + 1] + elif token.startswith(f"{flag}="): + value = token.split("=", 1)[1] + + return value + + @staticmethod + def _as_numpy_array(value: Any) -> np.ndarray | None: + if isinstance(value, np.ndarray): + return value + + if isinstance(value, (int, float, bool, np.number)): + return np.asarray(value) + + if hasattr(value, "detach") and hasattr(value, "cpu") and hasattr(value, "numpy"): + return np.asarray(value.detach().cpu().numpy()) + + if isinstance(value, (list, tuple)): + try: + arr = np.asarray(value) + except Exception: + return None + if arr.dtype == object: + return None + return arr + + return None + + def _write_array_dataset(self, parent_group: Any, name: str, array: np.ndarray) -> None: + key = self._sanitize_component(name) + if key in parent_group: + del parent_group[key] + parent_group.create_dataset( + key, + data=array, + shape=array.shape, + dtype=array.dtype, + overwrite=True, + ) + + def _archive_output_dict_arrays(self, group: Any, obj: Any, depth: int = 0) -> None: + if depth > 8: + return + + if isinstance(obj, dict): + for key, value in sorted(obj.items(), key=lambda item: str(item[0])): + child_name = self._sanitize_component(str(key)) + child_group = group.require_group(child_name) + self._archive_output_dict_arrays(child_group, value, depth + 1) + return + + if isinstance(obj, (list, tuple)) and obj and any(isinstance(x, (dict, list, tuple)) for x in obj): + for idx, item in enumerate(obj): + child_group = group.require_group(f"idx_{idx}") + self._archive_output_dict_arrays(child_group, item, depth + 1) + return + + array = self._as_numpy_array(obj) + if array is None or array.dtype == object: + return + + self._write_array_dataset(group, "values", np.asarray(array)) + + def _archive_file_bytes(self, files_group: Any, file_path: Path, relative_path: Path) -> None: + # Preserve directory structure to avoid collisions for files sharing a basename. + safe_parts = [self._sanitize_component(part) for part in relative_path.parts] + file_group = files_group + for part in safe_parts: + file_group = file_group.require_group(part) + if "bytes" in file_group: + del file_group["bytes"] + + payload = np.frombuffer(file_path.read_bytes(), dtype=np.uint8) + file_group.create_dataset( + "bytes", + data=payload, + shape=payload.shape, + dtype=payload.dtype, + overwrite=True, + ) + file_group.attrs["source_path"] = str(file_path) + file_group.attrs["relative_path"] = str(relative_path) + file_group.attrs["size_bytes"] = int(file_path.stat().st_size) + + def _archive_run_artifacts(self, run_group: Any, entry: OpenFoldRunResult) -> None: + artifacts_group = run_group.require_group("artifacts") + + output_dict_path = Path(entry.model_output_path) + if output_dict_path.exists(): + try: + with output_dict_path.open("rb") as handle: + output_dict = pickle.load(handle) + + activations_group = artifacts_group.require_group("layer_wise_activations") + self._archive_output_dict_arrays(activations_group, output_dict) + except Exception as exc: + artifacts_group.attrs["layer_wise_activations_error"] = str(exc) + + attention_dir = self._extract_flag_value(entry.command, "--attn_map_dir") + attention_group = artifacts_group.require_group("attention_maps") + if attention_dir: + attn_path = Path(attention_dir) + attention_group.attrs["attention_dir"] = str(attn_path) + if attn_path.exists() and attn_path.is_dir(): + files_group = attention_group.require_group("files") + for file_path in sorted(attn_path.rglob("*")): + if file_path.is_file(): + self._archive_file_bytes( + files_group, + file_path, + file_path.relative_to(attn_path), + ) + + structure_group = artifacts_group.require_group("structural_outputs") + run_output_dir = Path(entry.output_dir) + structure_group.attrs["run_output_dir"] = str(run_output_dir) + if run_output_dir.exists() and run_output_dir.is_dir(): + files_group = structure_group.require_group("files") + for file_path in sorted(run_output_dir.rglob("*")): + if file_path.is_file() and file_path.suffix.lower() in {".pdb", ".cif", ".mmcif"}: + self._archive_file_bytes( + files_group, + file_path, + file_path.relative_to(run_output_dir), + ) + + metadata_group = artifacts_group.require_group("metadata") + metadata_group.attrs["saved_at_utc"] = datetime.now(timezone.utc).isoformat() + metadata_group.attrs["run_id"] = entry.run_id + metadata_group.attrs["score"] = float(entry.score) + metadata_group.attrs["params_json"] = json.dumps(entry.params, sort_keys=True) + metadata_group.attrs["command"] = entry.command + + model_version = self._extract_flag_value(entry.command, "--config_preset") + checkpoint_path = self._extract_flag_value(entry.command, "--openfold_checkpoint_path") + if model_version is not None: + metadata_group.attrs["model_version"] = model_version + if checkpoint_path is not None: + metadata_group.attrs["checkpoint_path"] = checkpoint_path diff --git a/archive/vizfold_to_zarr.py b/archive/vizfold_to_zarr.py new file mode 100644 index 00000000..4cd3d7f6 --- /dev/null +++ b/archive/vizfold_to_zarr.py @@ -0,0 +1,75 @@ +import numpy as np +import zarr + +# ============================================================ +# METHOD 8 +# ============================================================ + +def load_single_representation(archive_path: str, layer_index: int) -> np.ndarray: + """ + Load the per-residue (single) representation for one Evoformer layer. + + Archive location read: + representations/single/layer_ + + Parameters + ---------- + archive_path : str + Root path to the Zarr archive. + + layer_index : int + Zero-based Evoformer layer index to retrieve. + + Returns + ------- + numpy.ndarray + Per-residue representation, shape (num_residues, single_dim). + + Raises + ------ + KeyError + If the requested layer does not exist in the archive. + """ + root = zarr.open_group(archive_path, mode="r") + single = root["representations"]["single"] + layer_key = f"layer_{layer_index:02d}" + if layer_key not in single: + raise KeyError(f"Layer not found: representations/single/{layer_key}") + return np.asarray(single[layer_key]) + + +# ============================================================ +# METHOD 9 +# ============================================================ + +def load_pair_representation(archive_path: str, layer_index: int) -> np.ndarray: + """ + Load the pairwise representation for one Evoformer layer. + + Archive location read: + representations/pair/layer_ + + Parameters + ---------- + archive_path : str + Root path to the Zarr archive. + + layer_index : int + Zero-based Evoformer layer index to retrieve. + + Returns + ------- + numpy.ndarray + Pairwise representation, shape (num_residues, num_residues, pair_dim). + + Raises + ------ + KeyError + If the requested layer does not exist in the archive. + """ + root = zarr.open_group(archive_path, mode="r") + pair = root["representations"]["pair"] + layer_key = f"layer_{layer_index:02d}" + if layer_key not in pair: + raise KeyError(f"Layer not found: representations/pair/{layer_key}") + return np.asarray(pair[layer_key]) \ No newline at end of file