From d92e72e2387cfcd6b1834d67228a87959616e5be Mon Sep 17 00:00:00 2001 From: Derek <256792747+decofe@users.noreply.github.com> Date: Thu, 18 Jun 2026 02:09:05 +0000 Subject: [PATCH] Optimize differential coverage analysis --- analysis/analyze.py | 95 ++++++++++++++++++++++++++++----------- analysis/requirements.txt | 1 - 2 files changed, 69 insertions(+), 27 deletions(-) diff --git a/analysis/analyze.py b/analysis/analyze.py index 3702625..f76de81 100644 --- a/analysis/analyze.py +++ b/analysis/analyze.py @@ -8,14 +8,12 @@ import shutil import statistics import sys -from collections import defaultdict +from collections import Counter, defaultdict from dataclasses import dataclass, replace from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple -from differential_coverage import DifferentialCoverage - @dataclass(frozen=True) class LogFile: path: Path @@ -1599,21 +1597,20 @@ def parse_showmap_approach_dir(name: str) -> Tuple[str, Optional[str]]: def read_afl_showmap(path: Path) -> Set[str]: edges: Set[str] = set() - for line_number, raw_line in enumerate( - path.read_text(errors="ignore").splitlines(), 1 - ): - line = raw_line.strip() - if not line: - continue - edge_id, sep, count_text = line.partition(":") - if not sep: - raise ValueError(f"invalid AFL showmap line {path}:{line_number}: {line}") - try: - count = int(count_text.strip()) - except ValueError as exc: - raise ValueError(f"invalid AFL showmap count {path}:{line_number}: {line}") from exc - if count > 0: - edges.add(edge_id.strip()) + with path.open("r", errors="ignore") as handle: + for line_number, raw_line in enumerate(handle, 1): + line = raw_line.strip() + if not line: + continue + edge_id, sep, count_text = line.partition(":") + if not sep: + raise ValueError(f"invalid AFL showmap line {path}:{line_number}: {line}") + try: + count = int(count_text.strip()) + except ValueError as exc: + raise ValueError(f"invalid AFL showmap count {path}:{line_number}: {line}") from exc + if count > 0: + edges.add(edge_id.strip()) return edges @@ -1721,20 +1718,66 @@ def write_showmap_campaign_dir( def calculate_relscores( campaign: Dict[str, Dict[str, Set[str]]], ) -> Dict[str, float]: - return dict(DifferentialCoverage(campaign).relscores()) + approach_unions: Dict[str, Set[str]] = {} + approach_edge_hits: Dict[str, Counter[str]] = {} + approach_nonempty_trials: Dict[str, int] = {} + + for approach, trials in campaign.items(): + edge_hits: Counter[str] = Counter() + covered_edges: Set[str] = set() + nonempty_trials = 0 + for edges in trials.values(): + if not edges: + continue + nonempty_trials += 1 + covered_edges.update(edges) + edge_hits.update(edges) + approach_unions[approach] = covered_edges + approach_edge_hits[approach] = edge_hits + approach_nonempty_trials[approach] = nonempty_trials + + approach_count = len(approach_unions) + approach_hit_counts: Counter[str] = Counter() + for covered_edges in approach_unions.values(): + approach_hit_counts.update(covered_edges) + + scores: Dict[str, float] = {} + for approach, edge_hits in approach_edge_hits.items(): + nonempty_trials = approach_nonempty_trials[approach] + if nonempty_trials == 0: + scores[approach] = 0.0 + continue + score = 0.0 + for edge, trials_that_hit_edge in edge_hits.items(): + approaches_that_never_hit_edge = approach_count - approach_hit_counts[edge] + score += ( + approaches_that_never_hit_edge + * trials_that_hit_edge + / nonempty_trials + ) + scores[approach] = score + return scores def calculate_relcovs( campaign: Dict[str, Dict[str, Set[str]]], ) -> Dict[str, Dict[str, float]]: - dc = DifferentialCoverage(campaign) - return { - approach: { - reference: dc.approaches[approach].relcov(dc.approaches[reference]) - for reference in dc.approaches - } - for approach in dc.approaches + reference_unions = { + approach: set().union(*trials.values()) for approach, trials in campaign.items() } + relcovs: Dict[str, Dict[str, float]] = {} + for approach, trials in campaign.items(): + relcovs[approach] = {} + trial_edges = tuple(trials.values()) + for reference, reference_edges in reference_unions.items(): + if not reference_edges: + relcovs[approach][reference] = 0.0 + continue + relcovs[approach][reference] = statistics.median( + len(edges.intersection(reference_edges)) / len(reference_edges) + for edges in trial_edges + ) + return relcovs def showmap_campaign_summary( diff --git a/analysis/requirements.txt b/analysis/requirements.txt index 834cb40..3f1d497 100644 --- a/analysis/requirements.txt +++ b/analysis/requirements.txt @@ -2,4 +2,3 @@ matplotlib>=3.7.0 numpy>=1.24.0 pandas>=2.0.0 scipy>=1.10.0 -differential-coverage>=1.1.0