scripts to calculate DWPC directly#7
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a direct, local DWPC computation path (via hetmatpy) intended to run after null dataset generation, reducing reliance on Docker/API lookups and enabling caching/parallelism.
Changes:
- Introduces a new
src/dwpc_direct.pymodule to compute/cache DWPC matrices and extract DWPC values for pair lists. - Adds
scripts/compute_dwpc_direct.pyto run the direct DWPC computation across real/permuted/random datasets. - Updates pipeline/tasking/docs and tweaks permutation validation output formatting.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| src/dwpc_direct.py | New DWPC computation/caching module + helpers for pairwise extraction and parallel runs. |
| scripts/compute_dwpc_direct.py | New executable script (jupytext export) that precomputes matrices and writes per-dataset DWPC outputs. |
| scripts/pipeline_production.py | Alters subprocess invocation for pipeline steps. |
| scripts/permutation_null_datasets.py | Minor formatting changes in permutation validation output. |
| pyproject.toml | Adds/adjusts Poe tasks and optional dev dependencies. |
| README.md | Updates workflow/task documentation. |
Comments suppressed due to low confidence (1)
pyproject.toml:60
pipeline-publicationis referenced in README.md but is not defined in the Poe tasks anymore (only pipeline-production / pipeline-null exist). Consider re-adding apipeline-publicationtask (it looks like scripts/pipeline_publication.py still exists) or removing the README reference to avoid broken docs.
# Grouped tasks
pipeline-production = "python scripts/pipeline_production.py"
pipeline-null = ["gen-permutation", "gen-random"]
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| args_list = [ | ||
| (str(data_dir), mp, source_indices, target_indices, damping, transform) | ||
| for mp in metapaths | ||
| ] |
There was a problem hiding this comment.
compute_dwpc_parallel() builds args_list containing full source_indices/target_indices arrays and sends them to every process. For large datasets this multiplies memory usage and serialization overhead by n_workers. Consider chunking pairs, using shared memory, or parallelizing over chunks within a single process after precomputing matrices.
| existing = [] | ||
| edge_files = list((Path(metagraph.get("_data_dir", ".")) / "edges").glob("*.npz")) | ||
| available_edges = {f.stem.replace(".sparse", "") for f in edge_files} | ||
|
|
||
| for mp in gene_bp_metapaths: | ||
| existing.append(mp) | ||
|
|
||
| return existing |
There was a problem hiding this comment.
get_gene_bp_metapaths() computes available_edges but never uses it; it currently returns the full hard-coded list unconditionally and also relies on metagraph.get('_data_dir') which is not established elsewhere. Either implement the intended filtering/validation against available edge files (with an explicit data_dir argument) or remove the unused logic to avoid misleading behavior.
| # Run individual steps | ||
| poe load-data | ||
| poe filter-change | ||
| poe go-hierarchy-analysis | ||
| poe filter-jaccard | ||
| poe gen-permutation | ||
| poe gen-random | ||
| ``` |
There was a problem hiding this comment.
This README section still references poe pipeline-publication, but pyproject.toml no longer defines that Poe task (only pipeline-production / pipeline-null are defined). Update the README to match the available tasks (or re-add pipeline-publication) so the documented commands work end-to-end.
| proc = subprocess.Popen( | ||
| cmd, | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.STDOUT, | ||
| text=True, | ||
| cwd=repo_root, | ||
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True | ||
| ) |
There was a problem hiding this comment.
PR description says the production pipeline runner was updated to run direct DWPC after null generation, but PIPELINE_STEPS in this file still ends at random_null_datasets.py and does not invoke scripts/compute_dwpc_direct.py. Either add the DWPC step here (if intended) or adjust the PR description/docs to reflect the actual pipeline behavior.
| stderr=subprocess.STDOUT, | ||
| text=True, | ||
| cwd=repo_root, | ||
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True |
There was a problem hiding this comment.
run_step() no longer sets cwd=repo_root for subprocesses. Several pipeline scripts derive repo_root from Path.cwd(), so running pipeline_production.py from outside the repo root (e.g., via an absolute path) will cause child steps to read/write the wrong directories. Pass cwd=repo_root (or update child scripts to derive repo_root from file instead of cwd).
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True | |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=repo_root |
| source_id_to_idx = dict(zip(source_nodes["identifier"], source_nodes.index)) | ||
| target_id_to_idx = dict(zip(target_nodes["identifier"], target_nodes.index)) | ||
|
|
There was a problem hiding this comment.
create_node_index_mapping() maps identifiers to source_nodes.index / target_nodes.index. HetMat node TSVs typically include an explicit matrix index/position column, and other code in this PR uses node_df['position']. Consider mapping via the explicit position column when present (falling back to index only if absent) to avoid off-by-one/misordered index bugs.
| source_id_to_idx = dict(zip(source_nodes["identifier"], source_nodes.index)) | |
| target_id_to_idx = dict(zip(target_nodes["identifier"], target_nodes.index)) | |
| # Prefer explicit matrix position column if available to avoid index/order bugs | |
| if "position" in source_nodes.columns: | |
| source_positions = source_nodes["position"] | |
| else: | |
| source_positions = source_nodes.index | |
| if "position" in target_nodes.columns: | |
| target_positions = target_nodes["position"] | |
| else: | |
| target_positions = target_nodes.index | |
| source_id_to_idx = dict(zip(source_nodes["identifier"], source_positions)) | |
| target_id_to_idx = dict(zip(target_nodes["identifier"], target_positions)) |
| gene_id_to_idx = dict(zip(gene_nodes["identifier"], gene_nodes["position"])) | ||
| bp_id_to_idx = dict(zip(bp_nodes["identifier"], bp_nodes["position"])) |
There was a problem hiding this comment.
This script maps identifiers using the position column, but other pipeline code (e.g., scripts/load_data.py uses gene_nodes.loc[rows, 'identifier']) implicitly treats the node DataFrame index as the matrix index. To avoid mismatched indexing (or a KeyError if position is absent), consider standardizing on one approach (preferably the DataFrame index, or falling back to it when position isn’t present).
| gene_id_to_idx = dict(zip(gene_nodes["identifier"], gene_nodes["position"])) | |
| bp_id_to_idx = dict(zip(bp_nodes["identifier"], bp_nodes["position"])) | |
| # Prefer the explicit 'position' column when available; otherwise, fall back to the DataFrame index | |
| gene_positions = gene_nodes["position"] if "position" in gene_nodes.columns else gene_nodes.index | |
| bp_positions = bp_nodes["position"] if "position" in bp_nodes.columns else bp_nodes.index | |
| gene_id_to_idx = dict(zip(gene_nodes["identifier"], gene_positions)) | |
| bp_id_to_idx = dict(zip(bp_nodes["identifier"], bp_positions)) |
| cache_path = data_dir / "metapath-dwpc-stats.tsv" | ||
|
|
||
| if not cache_path.exists(): | ||
| import urllib.request | ||
| urllib.request.urlretrieve(METAPATH_STATS_URL, cache_path) | ||
|
|
||
| df = pd.read_csv(cache_path, sep="\t") | ||
| df = df.rename(columns={"dwpc-0.5_raw_mean": "dwpc_raw_mean"}) | ||
| return df |
There was a problem hiding this comment.
load_metapath_stats() downloads to a shared cache_path with a non-atomic exists check. When used from multiple processes (e.g., compute_dwpc_parallel), concurrent workers can race and corrupt/partially write the TSV. Add a file lock or download to a temp file and atomically rename, or require the file be provisioned up-front.
| pos += length - 1 | ||
| found = True | ||
| break | ||
| if not found: | ||
| pos += 1 |
There was a problem hiding this comment.
parse_metapath() advances pos by length - 1 after matching a candidate. For overlapping abbreviations like GpBPpG, this will re-scan from the wrong offset and can mis-parse or loop unexpectedly. The parser should advance in a way that aligns with metapath structure (node/edge/node...), or delegate parsing to hetmatpy’s metagraph/metapath utilities.
| pos += length - 1 | |
| found = True | |
| break | |
| if not found: | |
| pos += 1 | |
| # Advance to the next node position (last character of this match). | |
| pos += length - 1 | |
| found = True | |
| break | |
| if not found: | |
| raise ValueError( | |
| f"Unable to parse metapath abbreviation '{metapath_abbrev}' at position " | |
| f"{pos}: no matching metaedge for substring starting here." | |
| ) |
|
|
||
| def _get_cache_path(self, metapath: str, damping: float) -> Path: | ||
| """Get the disk cache path for a DWPC matrix.""" | ||
| return self.cache_dir / f"dwpc_{metapath}_d{damping:.2f}.npz" |
There was a problem hiding this comment.
Disk cache filenames use d{damping:.2f}. Different damping values that round to the same 2-decimal string will collide and reuse the wrong cache entry. Use a higher-precision format (or a sanitized full repr/hash) in the cache key/filename.
| return self.cache_dir / f"dwpc_{metapath}_d{damping:.2f}.npz" | |
| damping_str = repr(damping).replace(".", "p").replace("-", "m") | |
| return self.cache_dir / f"dwpc_{metapath}_d{damping_str}.npz" |
This PR focuses on calculating DWPC scores for metapaths after null generation.
Updated production pipeline runner to include direct DWPC after null generation:
Added direct DWPC execution script
Fixed permutation validation to iterate existing
updated README and pyproject.toml to include instructions for DWPC calc tasks