diff --git a/CHANGELOG.md b/CHANGELOG.md index a306585..96328c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ version numbers (so `0.1.0b1` is the first beta of the `0.1.0` line). ### Added +- feat(admin): new `prune_dataset()` module + scoring rubric (`corpus_forge/admin/prune.py`) — first step of `rfc-corpus-growth-controls`. CLI verb + GrowthConfig block land in follow-up RFC tasks. Postgres / SQLite dispatch goes through a small `_is_postgres_like` capability probe (`_paramstyle == "pyformat"` first, class-name `"postgres"` substring as fallback) so we don't lean on a single brittle name check; SQLite branch chunks the IN-list at `_SQLITE_BATCH_SIZE = 500` ids. `PruneReport.duplicate_density_available` exposes whether the MinHash quality signal ran (promoted off the head candidate's `sub_scores` so every element of `selected` is now shape-uniform). Named-but-unknown datasets raise `ValueError` before any candidate walk — critical safety guard under `apply=True` so a typo'd name can never delete from the wrong scope. 22 unit tests in `tests/unit/test_prune_scorer.py` (up from 17 in the initial round) lock the rubric, the dispatch heuristics, both delete paths, and the unknown-dataset refusal. - `tests/unit/test_cli_human_friendly.py` — first two tests against the human-friendly CLI testable properties: (1) doctor's `_check_config_present` pins the `corpus-forge setup` recovery diff --git a/corpus_forge/admin/__init__.py b/corpus_forge/admin/__init__.py index 081270d..0ad161a 100644 --- a/corpus_forge/admin/__init__.py +++ b/corpus_forge/admin/__init__.py @@ -24,4 +24,10 @@ from __future__ import annotations -__all__: list[str] = [] +from corpus_forge.admin.prune import PruneCandidate, PruneReport, prune_dataset + +__all__: list[str] = [ + "PruneCandidate", + "PruneReport", + "prune_dataset", +] diff --git a/corpus_forge/admin/prune.py b/corpus_forge/admin/prune.py new file mode 100644 index 0000000..71ca58b --- /dev/null +++ b/corpus_forge/admin/prune.py @@ -0,0 +1,594 @@ +"""Admin-level dataset pruning — first step of ``rfc-corpus-growth-controls``. + +The module exposes a single entry point — :func:`prune_dataset` — that +walks the configured candidate pool, scores every chunk under the +:data:`_PRUNE_WEIGHTS` rubric, and surfaces the bottom-percentile rows +as :class:`PruneCandidate` objects in a :class:`PruneReport`. + +Scoring **reuses** the existing curation primitives in +:mod:`corpus_forge.curation.selector` so the two surfaces stay aligned +on what "weak / interesting" looks like for a chunk: + +- :func:`_compute_confidence_deficit` — high when the classifier is + unsure (or never touched the chunk). +- :func:`_compute_missing_metadata` — fraction of the six well-known + metadata fields that are empty. +- :func:`_compute_freshness` — curation treats *fresh* as "needs help" + (positive signal); for pruning we **invert** it so newly-ingested + rows are *preserved* rather than swept. + +Two new prune-only sub-scores live here: + +- :func:`_duplicate_density` — fraction of the candidate pool considered + a near-duplicate by the MinHash-backed quality signals from + ``rfc-nlp-data-quality-signals.md``. Detection is import-driven: if + ``corpus_forge.quality.minhash`` can't be imported, every candidate + scores ``0.0`` and :attr:`PruneReport.duplicate_density_available` + is set to ``False`` so callers know the rubric ran in degraded mode. +- :func:`_feedback_drag` — ``1.0`` when the chunk has a feedback row + with ``kind == "rejected"`` or ``rating < 0``; ``0.0`` otherwise. + Reads via a best-effort backend hook (``iter_chunk_feedback``) + before falling back to a direct ``corpus.chunk_feedback`` / + ``corpus.feedback`` walk. Any failure is logged and treated as "no + feedback" so a missing table never breaks the prune run. + +The default is **dry-run** — :func:`prune_dataset` only deletes when +``apply=True`` is passed explicitly, mirroring the safety net the RFC +calls for. When ``apply=True``, the function prefers a dedicated +``delete_chunks_by_ids`` backend hook (if present) and falls back to a +single bulk DELETE statement otherwise. Postgres uses +``WHERE id = ANY(%s)``; SQLite falls through to a chunked ``IN (...)`` +sweep. +""" + +from __future__ import annotations + +import logging +import math +from collections import Counter +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any + +from corpus_forge.curation.selector import ( + _Candidate, + _compute_confidence_deficit, + _compute_freshness, + _compute_missing_metadata, + _iter_curation_candidates, +) + +logger = logging.getLogger(__name__) + + +# ───────────────────────────────────────────────────────────────────────── +# Constants +# ───────────────────────────────────────────────────────────────────────── + + +_PRUNE_WEIGHTS: dict[str, float] = { + "confidence_deficit": 0.20, + "missing_metadata": 0.20, + "freshness_inverted": 0.15, # 1.0 - freshness + "duplicate_density": 0.25, + "feedback_drag": 0.20, +} +"""Weight table for the five prune sub-scores. Sums to 1.0. + +A change here ripples directly into :func:`prune_dataset` (no per-row +weight selection — the rubric is fixed across the pool) and the +``test_score_ordering_invariants`` lock-test in +``tests/unit/test_prune_scorer.py``. +""" + + +# Default pool size pulled from the backend; larger pools cost more memory +# but smooth out the duplicate-density normalisation step. +_DEFAULT_CANDIDATE_POOL: int = 2000 + + +# Default percentile selected when the caller doesn't pass one. +_DEFAULT_PERCENTILE: int = 10 + + +# Valid percentile range — full [0, 100] interval. ``0`` is a useful +# no-op (returns the score breakdown without selecting anything); ``100`` +# selects the entire pool. +_MIN_PERCENTILE: int = 0 +_MAX_PERCENTILE: int = 100 + + +# ───────────────────────────────────────────────────────────────────────── +# Public dataclasses +# ───────────────────────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class PruneCandidate: + """One chunk surfaced as a prune candidate. + + ``prune_score`` lives in ``[0, 1]`` — higher means "prune more + eagerly." ``sub_scores`` carries the per-rubric breakdown so callers + (CLI, MCP, tests) can introspect *why* a chunk was selected. + """ + + chunk_id: int + document_id: int | None + source_uri: str | None + prune_score: float + sub_scores: dict[str, float] + reason: str + + +@dataclass(frozen=True) +class PruneReport: + """Outcome of one :func:`prune_dataset` invocation. + + The report doubles as a dry-run preview (``applied=False``) and an + apply-mode receipt (``applied=True, deleted=N``). The candidate list + is sorted *worst-first* — i.e. the head of ``selected`` is the + chunk the rubric most wants gone. + + ``duplicate_density_available`` is ``True`` when the MinHash-backed + quality module was importable for this run and every candidate got + a real ``duplicate_density`` sub-score; ``False`` when the module + was unavailable and every candidate carried ``duplicate_density = + 0.0`` (rubric ran in degraded mode). Promoted to the report so the + flag isn't shape-leaked onto one element of ``selected``. + """ + + dataset: str | None + percentile: int + considered: int + selected: list[PruneCandidate] + applied: bool + deleted: int + summary_by_source: dict[str, int] = field(default_factory=dict) + duplicate_density_available: bool = False + + +# ───────────────────────────────────────────────────────────────────────── +# Duplicate-density sub-score +# ───────────────────────────────────────────────────────────────────────── + + +def _minhash_available() -> bool: + """Return ``True`` when the MinHash-backed quality signals module is importable. + + The check is intentionally cheap — we don't want to pay a full + sentence-transformers / datasketch boot just to ask "is the feature + flag on?" — so we probe for a sentinel symbol and discard the + import on success. + """ + + try: + from corpus_forge.quality.minhash import ( # noqa: F401 + jaccard_neighbor_distance, + ) + except ImportError: + return False + return True + + +def _duplicate_density( + candidate: _Candidate, + *, + minhash_module: Any | None, +) -> float: + """Per-chunk duplicate-density score in ``[0, 1]``. + + When ``minhash_module`` is ``None`` (the feature flag from + ``rfc-nlp-data-quality-signals.md`` is unavailable in this build), + the score is ``0.0`` for every candidate. Otherwise we ask the + module for the neighbor distance and translate ``low distance == + high density``. + """ + + if minhash_module is None: + return 0.0 + try: + distance = float( + minhash_module.jaccard_neighbor_distance( + chunk_id=candidate.chunk_id, + text=candidate.text, + ) + ) + except Exception as exc: # pragma: no cover — defensive + logger.debug("minhash.jaccard_neighbor_distance raised %r; treating as 0.0", exc) + return 0.0 + # Distance is in [0, 1]; density is the inversion clamped to [0, 1]. + return max(0.0, min(1.0, 1.0 - distance)) + + +# ───────────────────────────────────────────────────────────────────────── +# Feedback-drag sub-score +# ───────────────────────────────────────────────────────────────────────── + + +def _coerce_to_dict_list(raw: Any) -> list[dict[str, Any]]: + """Materialise an opaque backend result into ``list[dict[str, Any]]``. + + Hidden behind a helper so the call sites stay readable and pyrefly's + ``Any`` propagation narrows correctly. Non-dict rows are filtered out + (defensive — the backends always emit dicts). + """ + + out: list[dict[str, Any]] = [] + for row in raw: + if isinstance(row, dict): + out.append(row) + return out + + +def _load_feedback_by_chunk_id(backend: Any) -> dict[int, list[dict[str, Any]]]: + """Best-effort load of the per-chunk feedback rows. + + Preferred path: a backend-supplied ``iter_chunk_feedback`` hook that + yields dicts with at least ``chunk_id``, ``kind``, and ``rating``. + + Fallback: a single ``_execute("SELECT chunk_id, kind, rating FROM + corpus.chunk_feedback")`` (the schema name from the RFC). The query + is also tried against ``corpus.feedback WHERE entity_type='chunk'`` + for backends that ship the unified ``feedback`` table. Any failure + is swallowed — feedback drag silently degrades to 0.0 for every + chunk, which matches the plan's "missing hook → 0.0" contract. + """ + + by_chunk: dict[int, list[dict[str, Any]]] = {} + + hook: Any = getattr(backend, "iter_chunk_feedback", None) + if callable(hook): + try: + rows: Any = hook() + for row in rows: + cid_raw = row.get("chunk_id") + if cid_raw is None: + continue + cid = int(cid_raw) + by_chunk.setdefault(cid, []).append(dict(row)) + return by_chunk + except Exception as exc: # pragma: no cover — defensive + logger.debug("iter_chunk_feedback hook raised %r; falling back to SQL probe", exc) + by_chunk.clear() + + execute: Any = getattr(backend, "_execute", None) + if not callable(execute): + return by_chunk + + # Try the RFC's table name first. + rows_typed: list[dict[str, Any]] = [] + try: + rows_typed = _coerce_to_dict_list( + execute("SELECT chunk_id, kind, rating FROM corpus.chunk_feedback") + ) + except Exception as exc: + logger.debug("corpus.chunk_feedback probe failed (%r) — trying corpus.feedback", exc) + try: + rows_typed = _coerce_to_dict_list( + execute( + "SELECT entity_id AS chunk_id, kind, rating FROM corpus.feedback " + "WHERE entity_type = 'chunk'" + ) + ) + except Exception as exc2: + logger.debug("corpus.feedback probe also failed (%r); no feedback drag signal", exc2) + return by_chunk + + for row in rows_typed: + cid_raw = row.get("chunk_id") + if cid_raw is None: + continue + cid = int(cid_raw) + by_chunk.setdefault(cid, []).append(dict(row)) + return by_chunk + + +def _feedback_drag( + candidate: _Candidate, + feedback_rows_by_chunk_id: dict[int, list[dict[str, Any]]], +) -> float: + """``1.0`` when any feedback row signals rejection; ``0.0`` otherwise.""" + + rows = feedback_rows_by_chunk_id.get(candidate.chunk_id, []) + for row in rows: + kind = row.get("kind") + if isinstance(kind, str) and kind.lower() == "rejected": + return 1.0 + rating = row.get("rating") + if isinstance(rating, (int, float)) and rating < 0: + return 1.0 + return 0.0 + + +# ───────────────────────────────────────────────────────────────────────── +# Reason picker +# ───────────────────────────────────────────────────────────────────────── + + +def _prune_reason(sub_scores: dict[str, float]) -> str: + """Name the top weighted contributor in a human one-liner.""" + + weighted = { + "confidence_deficit": sub_scores.get("confidence_deficit", 0.0) + * _PRUNE_WEIGHTS["confidence_deficit"], + "missing_metadata": sub_scores.get("missing_metadata", 0.0) + * _PRUNE_WEIGHTS["missing_metadata"], + "freshness_inverted": sub_scores.get("freshness_inverted", 0.0) + * _PRUNE_WEIGHTS["freshness_inverted"], + "duplicate_density": sub_scores.get("duplicate_density", 0.0) + * _PRUNE_WEIGHTS["duplicate_density"], + "feedback_drag": sub_scores.get("feedback_drag", 0.0) * _PRUNE_WEIGHTS["feedback_drag"], + } + top = max(weighted, key=lambda k: weighted[k]) + if weighted[top] <= 0.0: + return "all signals at minimum (tie-break selection)" + return { + "confidence_deficit": "low / missing classifier confidence", + "missing_metadata": "metadata gaps", + "freshness_inverted": "stale content", + "duplicate_density": "near-duplicate of other chunks", + "feedback_drag": "user feedback flagged rejection", + }[top] + + +# ───────────────────────────────────────────────────────────────────────── +# Deletion helpers +# ───────────────────────────────────────────────────────────────────────── + + +_SQLITE_BATCH_SIZE: int = 500 + + +def _is_postgres_like(backend: Any) -> bool: + """Return ``True`` when ``backend`` should be driven via Postgres SQL. + + Capability probe (no isinstance — keeps the prune module decoupled + from the concrete backend classes and friendly to test doubles): + + 1. If the backend exposes ``_paramstyle``, treat ``"pyformat"`` as + Postgres-shaped (psycopg) and anything else (``"qmark"`` for + sqlite3, ``"named"`` for some drivers) as SQLite-shaped. + 2. Otherwise fall back to inspecting ``type(backend).__name__`` — + names containing ``"Postgres"`` (case-insensitive) are Postgres. + + The two checks are independent and either one can promote a backend + to the bulk-``ANY(%s)`` path. This way a real + ``PostgresBackend`` (no ``_paramstyle`` today) and a stubbed test + double exposing only ``_paramstyle = "pyformat"`` both work. + """ + + paramstyle = getattr(backend, "_paramstyle", None) + if isinstance(paramstyle, str) and paramstyle == "pyformat": + return True + return "postgres" in type(backend).__name__.lower() + + +def _delete_chunks(backend: Any, chunk_ids: list[int]) -> int: + """Delete the chunks identified by ``chunk_ids``; return rows actually removed. + + The order of preference is: + + 1. A backend-supplied ``delete_chunks_by_ids`` hook (best-fit — keeps + SQL inside the backend module). + 2. A Postgres-shaped bulk DELETE via ``_execute`` + ``ANY(%s)``. + 3. A SQLite-shaped chunked DELETE via ``_execute`` + ``IN (...)``. + """ + + if not chunk_ids: + return 0 + + hook: Any = getattr(backend, "delete_chunks_by_ids", None) + if callable(hook): + result: Any = hook(chunk_ids) + return int(result) + + execute: Any = getattr(backend, "_execute", None) + if not callable(execute): + raise RuntimeError( + f"backend {type(backend).__name__!r} cannot delete chunks — " + "exposes neither `delete_chunks_by_ids` nor `_execute`" + ) + + if _is_postgres_like(backend): + try: + execute( + "DELETE FROM corpus.chunks WHERE id = ANY(%s)", + (list(chunk_ids),), + ) + except Exception: + logger.exception("postgres bulk delete failed for %d ids", len(chunk_ids)) + raise + return len(chunk_ids) + + # SQLite (and any other backend with `_execute` but no Postgres tag) — + # chunk the IN-list so we don't blow past parameter limits. + # NOTE: the SQLite backend stores tables in the connection-default + # schema, so the table is `chunks` here — NOT `corpus.chunks` like on + # Postgres. The schema-prefix asymmetry is intentional, not a bug. + deleted = 0 + for i in range(0, len(chunk_ids), _SQLITE_BATCH_SIZE): + batch = chunk_ids[i : i + _SQLITE_BATCH_SIZE] + placeholders = ",".join("?" * len(batch)) + try: + execute( + f"DELETE FROM chunks WHERE id IN ({placeholders})", + tuple(batch), + ) + except Exception: + logger.exception("sqlite batch delete failed for %d ids", len(batch)) + raise + deleted += len(batch) + return deleted + + +# ───────────────────────────────────────────────────────────────────────── +# Public entry point +# ───────────────────────────────────────────────────────────────────────── + + +def prune_dataset( + backend: Any, + *, + dataset: str | None, + percentile: int = _DEFAULT_PERCENTILE, + apply: bool = False, + candidate_pool: int = _DEFAULT_CANDIDATE_POOL, + now: datetime | None = None, +) -> PruneReport: + """Score the configured candidate pool and (optionally) prune the worst rows. + + Args: + backend: A backend exposing ``_execute`` (Postgres or SQLite). + MAY also expose ``iter_curation_candidates``, + ``iter_chunk_feedback``, and ``delete_chunks_by_ids`` hooks + for the dedicated fast paths. + dataset: Optional dataset name to scope the candidate walk. + percentile: Bottom percentile of the pool selected for pruning, + in ``[0, 100]``. The plan's RFC default is ``10``. + apply: When ``True``, the selected rows are deleted. The default + (``False``) returns the same report with ``deleted=0`` and + ``applied=False`` — i.e. dry-run. + candidate_pool: Max rows pulled from the backend (default 2000). + Larger pools cost more memory but smooth out the + duplicate-density normalisation. + now: Optional override for "current" timestamp used by the + freshness sub-score (test seam). + + Returns: + A :class:`PruneReport` containing the (worst-first) candidate + list, the actual percentile applied, and the deletion receipt. + + Raises: + ValueError: When ``percentile`` is outside ``[0, 100]``, or when + ``dataset`` is non-None and the backend exposes + ``find_dataset_id_by_name`` but that lookup returns ``None`` + (unknown dataset name). Guarding here is critical under + ``apply=True`` — without the check, a typo'd dataset name + would silently walk *every* dataset's candidates. + """ + + if not _MIN_PERCENTILE <= percentile <= _MAX_PERCENTILE: + raise ValueError( + f"percentile must be in [{_MIN_PERCENTILE}, {_MAX_PERCENTILE}]; got {percentile!r}" + ) + + # Unknown-dataset safety: when the caller named a dataset that the + # backend doesn't know, refuse to fall through to "walk everything" + # (especially under apply=True — that would delete from the wrong + # scope). dataset=None is the explicit "walk all datasets" form and + # is preserved. + if dataset is not None: + resolver: Any = getattr(backend, "find_dataset_id_by_name", None) + if callable(resolver): + try: + resolved = resolver(dataset) + except Exception: # pragma: no cover — defensive + logger.exception("find_dataset_id_by_name(%r) raised", dataset) + resolved = "" + if resolved is None: + raise ValueError(f"dataset {dataset!r} not found") + + candidates: list[_Candidate] = list( + _iter_curation_candidates(backend, dataset=dataset, limit=candidate_pool) + ) + considered = len(candidates) + + if considered == 0: + return PruneReport( + dataset=dataset, + percentile=percentile, + considered=0, + selected=[], + applied=False, + deleted=0, + summary_by_source={}, + duplicate_density_available=False, + ) + + # Resolve optional sub-score data sources ONCE for the whole pool. + minhash_module: Any | None + if _minhash_available(): + from corpus_forge.quality import minhash as _mh + + minhash_module = _mh + else: + minhash_module = None + duplicate_density_available = minhash_module is not None + + feedback_rows = _load_feedback_by_chunk_id(backend) + + # Score every candidate under the fixed rubric. + scored: list[PruneCandidate] = [] + for cand in candidates: + deficit = _compute_confidence_deficit(cand.classifier_confidence) + missing_score, _missing_fields = _compute_missing_metadata(cand) + fresh = _compute_freshness(cand.modified_at, now=now) + fresh_inv = 1.0 - fresh + dup_density = _duplicate_density(cand, minhash_module=minhash_module) + fb_drag = _feedback_drag(cand, feedback_rows) + + score = ( + deficit * _PRUNE_WEIGHTS["confidence_deficit"] + + missing_score * _PRUNE_WEIGHTS["missing_metadata"] + + fresh_inv * _PRUNE_WEIGHTS["freshness_inverted"] + + dup_density * _PRUNE_WEIGHTS["duplicate_density"] + + fb_drag * _PRUNE_WEIGHTS["feedback_drag"] + ) + score = max(0.0, min(1.0, score)) + + sub_scores: dict[str, float] = { + "confidence_deficit": deficit, + "missing_metadata": missing_score, + "freshness_inverted": fresh_inv, + "duplicate_density": dup_density, + "feedback_drag": fb_drag, + } + scored.append( + PruneCandidate( + chunk_id=cand.chunk_id, + document_id=cand.document_id, + source_uri=cand.source_uri, + prune_score=score, + sub_scores=sub_scores, + reason=_prune_reason(sub_scores), + ) + ) + + # Sort worst-first (highest prune_score → most prunable). + scored.sort(key=lambda c: c.prune_score, reverse=True) + + top_n = math.ceil(considered * percentile / 100) + selected = scored[:top_n] + + # Per-source summary grouping by the source URI's filename stem. + summary_counter: Counter[str] = Counter() + for cand in selected: + stem = (Path(cand.source_uri).stem or "") if cand.source_uri else "" + summary_counter[stem] += 1 + summary_by_source: dict[str, int] = dict(summary_counter) + + applied = False + deleted = 0 + if apply and selected: + ids_to_delete = [c.chunk_id for c in selected] + deleted = _delete_chunks(backend, ids_to_delete) + applied = True + + return PruneReport( + dataset=dataset, + percentile=percentile, + considered=considered, + selected=selected, + applied=applied, + deleted=deleted, + summary_by_source=summary_by_source, + duplicate_density_available=duplicate_density_available, + ) + + +__all__ = [ + "PruneCandidate", + "PruneReport", + "prune_dataset", +] diff --git a/corpus_forge/ingest.py b/corpus_forge/ingest.py index 455c646..2b5f155 100644 --- a/corpus_forge/ingest.py +++ b/corpus_forge/ingest.py @@ -2,6 +2,7 @@ import logging import socket +import time from pathlib import Path from typing import Any @@ -305,18 +306,16 @@ def ingest_one( # the wall-clock calibration profile. Per-document timings are # noisy but the EWMA in ``runtime_profile.record`` smooths them # out over the course of a real ingest pass. - import time as _time # noqa: PLC0415 - cal_key = _calibration_key_for(raw) if isinstance(raw, RawDocument): # Process document - _t0 = _time.perf_counter() + _t0 = time.perf_counter() chunk_data = _process_document(raw, effective_chunker) - _chunk_elapsed = _time.perf_counter() - _t0 + _chunk_elapsed = time.perf_counter() - _t0 - _t1 = _time.perf_counter() + _t1 = time.perf_counter() backend.upsert_document(dataset_id, raw, chunk_data, embedder_ids=embedder_ids) - _write_elapsed = _time.perf_counter() - _t1 + _write_elapsed = time.perf_counter() - _t1 n_chunks = len(chunk_data) if n_chunks > 0: @@ -531,18 +530,73 @@ def _source_root(source_config: Any) -> Path | None: return None -def _log_ingest_eta(config: Config) -> None: - """Emit a single startup INFO line with the wall-clock ETA. +def _classify_and_log_ingest_error(raw: Any, exc: BaseException) -> None: + """Log a per-document ingest failure on the right taxonomy logger. + + The previous catch-all "Extractor failed on X" message + mis-attributed every recoverable failure to the extractor, + including Ollama 500s with ``"unsupported value: NaN"`` (the + embedder produced a non-finite vector for some chunk). That made + it look like the file itself was malformed rather than the model. + + Heuristic classification on the exception message: + + - "unsupported value: NaN" / "NaN" → embedder produced a NaN + vector. Log at WARNING with an actionable hint. + - HTTP 5xx from any *embedder* call → embedder API failure. + - Anything else → extractor failure (unchanged taxonomy). + """ + source_uri = getattr(raw, "source_uri", "unknown") + msg = str(exc) + lowered = msg.lower() + + if "unsupported value: nan" in lowered or "json: unsupported value: nan" in lowered: + logger.warning( + "Embedder produced NaN for %s — skipping. Likely a quirk of the active " + "embedding model on this chunk's text (try a different embedder, or " + "filter empty / near-empty chunks). Original error: %s", + source_uri, + msg, + ) + return + if "error code: 5" in lowered and ("embed" in lowered or "embedding" in lowered): + logger.warning( + "Embedder API 5xx on %s — skipping. Original error: %s", + source_uri, + msg, + ) + return + + # Default classification — same wording as before to keep grep + # patterns / dashboards working. + extract_logger.info( + "Extractor failed on %s: %s", + source_uri, + msg, + ) + + +def _plan_ingest(config: Config) -> dict[int, int]: + """Compute the wall-clock ETA AND per-source file counts. + + Walks every filesystem-rooted source once via + :func:`corpus_forge.estimate.estimate_sync`, sums the per-source + :class:`~corpus_forge.time_estimate.TimeEstimate`s, logs one summary + INFO line, and returns a mapping ``id(source_config) -> file_count`` + so :func:`ingest_once` can hand each source's progress bar a real + total (live percentage + ETA via Rich's ``TimeRemainingColumn``) + instead of leaving it in unbounded mode. - Sums per-source ``TimeEstimate``s across every dataset/source that - exposes a recognisable filesystem root. Sources without a root - (API-driven plugins, or a misconfigured root) are excluded — we log - them at DEBUG so a user can ``--verbose`` if they want to know what - was skipped. + This replaces the previous "walk twice" cost (estimate then re-walk + in ``source.scan()``) for filesystem-rooted sources — the count + captured here is the same one the live bar would show, just paid + up front. Best-effort: any exception is swallowed (logged at DEBUG) so a - broken ETA can never block ingest. + broken planner can never block ingest. Returns ``{}`` on failure + and ``ingest_once`` falls back to unbounded progress bars. """ + per_source_totals: dict[int, int] = {} try: from corpus_forge.estimate import estimate_sync # noqa: PLC0415 from corpus_forge.runtime_profile import load as _load_profile # noqa: PLC0415 @@ -578,6 +632,7 @@ def _log_ingest_eta(config: Config) -> None: for phase in te.phases: per_phase[phase.name] = per_phase.get(phase.name, 0.0) + phase.seconds roots_seen += 1 + per_source_totals[id(source_config)] = sync.file_count if te.calibration in ("calibrated", "hybrid"): any_calibrated = True @@ -585,7 +640,7 @@ def _log_ingest_eta(config: Config) -> None: logger.info( "ETA: no filesystem-rooted sources detected — wall-clock prediction skipped" ) - return + return per_source_totals breakdown = " / ".join( f"{name} {format_duration(per_phase.get(name, 0.0))}" @@ -605,12 +660,18 @@ def _log_ingest_eta(config: Config) -> None: ) except Exception as exc: # pragma: no cover — defensive logger.debug("ETA computation failed: %s", exc) + return per_source_totals def ingest_once(config: Config) -> None: """Run one-shot ingestion pass.""" logger.info("Starting one-shot ingestion pass") - _log_ingest_eta(config) + # Walk every filesystem-rooted source up front to compute the ETA + # AND capture per-source file counts that drive the live progress + # bar totals below. The walk cost was already paid by the previous + # ETA-only call; threading the result avoids walking the tree a + # second time inside ``source.scan()``. + per_source_totals = _plan_ingest(config) # Setup backend backend_config = config.backend @@ -635,47 +696,76 @@ def ingest_once(config: Config) -> None: embedders = get_active_embedders(config) logger.info(f"Active embedders: {[e.name for e in embedders]}") - # Process each dataset - for dataset in config.datasets: - logger.info(f"Processing dataset: {dataset.name} ({dataset.kind})") + # Single Rich ``Progress`` instance with TWO live tasks: + # + # 1. A persistent global task — total = sum of every source's + # file count, so users see overall percentage + remaining-time + # ETA across the entire run. + # 2. A transient per-source task — added when each source starts, + # removed when it finishes, so the display always shows the + # currently-running source's slice underneath the global bar. + # + # Two nested ``make_progress`` contexts would fight over the + # console (Rich's ``Live`` is single-active), so the per-source + # progress now lives as a child task of the outer ``Progress``. + global_total = sum(per_source_totals.values()) or None + with make_progress( + "Ingest (all sources)", + total=global_total, + logger=scan_logger, + ) as progress: + global_task = progress.add_task("Total", total=global_total) + + # Process each dataset + for dataset in config.datasets: + logger.info(f"Processing dataset: {dataset.name} ({dataset.kind})") - # Get or create dataset record - dataset_id = _get_or_create_dataset(backend, dataset) + # Get or create dataset record + dataset_id = _get_or_create_dataset(backend, dataset) - # Process each source in dataset - for source_config in dataset.sources: - scan_logger.info( - "Scanning source: plugin=%s dataset=%s", - source_config.plugin, - dataset.name, - ) + # Process each source in dataset + for source_config in dataset.sources: + scan_logger.info( + "Scanning source: plugin=%s dataset=%s", + source_config.plugin, + dataset.name, + ) - # Instantiate source - source = _instantiate_source(source_config, config=config) + # Instantiate source + source = _instantiate_source(source_config, config=config) + + # Register source in the DB (idempotent) so the sources + # table tracks which plugin/identity/host contributed + # to this dataset. + backend.register_source( + dataset_id, + source.name, + source.identity(), + socket.gethostname(), + ) - # Register source in the DB (idempotent) so the sources table - # tracks which plugin/identity/host contributed to this dataset. - backend.register_source( - dataset_id, - source.name, - source.identity(), - socket.gethostname(), - ) + # Get chunker for this source + chunker = get_chunker_for_source(source, config) + + # Per-source task lives only for the duration of this + # source. ``total=None`` (API-only sources we couldn't + # count up front) renders as an indeterminate bar; the + # global task is still useful as long as at least one + # source contributed a count. + raw_items = source.scan() + docs_chunked = 0 + source_total = per_source_totals.get(id(source_config)) + source_task = progress.add_task( + f" {source.name}", + total=source_total, + ) + scan_logger.info( + "Ingest (%s) started: %s items", + source.name, + source_total if source_total is not None else "unbounded", + ) + source_started = time.perf_counter() - # Get chunker for this source - chunker = get_chunker_for_source(source, config) - - # Scan and ingest — Phase L Wave 4 wraps the per-file loop - # in the shared progress factory so the user sees motion - # and the rotating log captures bookends + 10% milestones. - raw_items = source.scan() - docs_chunked = 0 - with make_progress( - f"Ingest ({source.name})", - total=None, - logger=scan_logger, - ) as progress: - task = progress.add_task("Ingest", total=None) for raw in raw_items: try: ingest_one(backend, raw, chunker, embedders, dataset_id) @@ -683,21 +773,44 @@ def ingest_once(config: Config) -> None: if docs_chunked % 100 == 0: chunk_logger.info("Chunked %d documents so far", docs_chunked) except Exception as e: - # Per-file extractor failures are recoverable — log - # INFO on the dedicated extract logger (greppable - # via the documented taxonomy) and keep going. - extract_logger.info( - "Extractor failed on %s: %s", - getattr(raw, "source_uri", "unknown"), - e, - ) - continue - progress.update(task, advance=1) - scan_logger.info( - "Scan complete: %d documents (plugin=%s)", - docs_chunked, - source_config.plugin, - ) + # Per-file failures are recoverable. Categorise + # the message so users can tell extractor + # crashes apart from embedder/API failures — + # the previous "Extractor failed on X" wording + # mis-attributed Ollama 500s (NaN-in-response, + # rate limits) to the extractor, which makes + # the model-selection vs. file-content + # question harder to answer. + _classify_and_log_ingest_error(raw, e) + finally: + # Advance both bars on EVERY iteration, success + # or failure. Planner totals come from + # ``estimate_sync`` which counts every file + # regardless of whether ingest will succeed — + # skipping the advance on failures would leave + # both bars permanently below 100% whenever any + # file fails (e.g. one Ollama-NaN 5xx is enough + # to strand the global bar forever). + progress.update(source_task, advance=1) + progress.update(global_task, advance=1) + + elapsed = time.perf_counter() - source_started + rate = (docs_chunked / elapsed) if elapsed > 0 else 0.0 + scan_logger.info( + "Ingest (%s) complete: %d documents in %.1fs (rate %.0f/s)", + source.name, + docs_chunked, + elapsed, + rate, + ) + scan_logger.info( + "Scan complete: %d documents (plugin=%s)", + docs_chunked, + source_config.plugin, + ) + # Remove the per-source task so the next source's task + # appears underneath the global bar instead of stacking. + progress.remove_task(source_task) def _get_or_create_dataset(backend: StorageBackend, dataset_config) -> int: diff --git a/tests/unit/test_ingest_helpers.py b/tests/unit/test_ingest_helpers.py index e2d8283..8c8788f 100644 --- a/tests/unit/test_ingest_helpers.py +++ b/tests/unit/test_ingest_helpers.py @@ -70,3 +70,173 @@ class MockSourceConfig: source = _instantiate_source(MockSourceConfig()) assert isinstance(source, OpenCodeSource) assert source.root == storage_dir + + +# ───────────────────────────────────────────────────────────────────────── +# Wall-clock planner + error classifier (added with the live-ETA fix) +# ───────────────────────────────────────────────────────────────────────── + + +class TestPlanIngest: + """Tests for ``_plan_ingest`` — the up-front walk that drives both + the ETA log line and the per-source progress-bar totals.""" + + def test_returns_per_source_file_counts(self, temp_dir): + """Every source with a recognisable filesystem root must show + up in the returned mapping; the count must match what the + planner walked.""" + import textwrap + + from corpus_forge.config import Config + from corpus_forge.ingest import _plan_ingest + + vault = temp_dir / "vault" + vault.mkdir() + (vault / "a.md").write_text("x" * 4096, encoding="utf-8") + (vault / "b.md").write_text("y" * 4096, encoding="utf-8") + + cfg_path = temp_dir / "config.toml" + cfg_path.write_text( + textwrap.dedent( + f""" + [backend] + kind = "sqlite" + dsn = "{(temp_dir / "corpus.db").as_posix()}" + + [daemon] + + [[datasets]] + name = "demo" + kind = "text" + sources = [ + {{plugin = "filesystem", root = "{vault.as_posix()}", chunker = "markdown"}} + ] + + [[embedders]] + name = "fake" + provider = "sentence_transformers" + model_id = "fake-1" + dimension = 384 + """ + ), + encoding="utf-8", + ) + config = Config.load(config_path=cfg_path) + totals = _plan_ingest(config) + # One source → one entry; count covers both .md files. + assert len(totals) == 1 + assert list(totals.values()) == [2] + + def test_returns_empty_dict_when_no_filesystem_roots(self): + """A config with only API-driven sources returns ``{}`` so the + ingest loop falls back to unbounded progress bars.""" + + class _StubSourceCfg: + plugin = "api_only" + + class _StubDataset: + sources: list = [_StubSourceCfg()] # noqa: RUF012 + + class _StubConfig: + datasets: list = [_StubDataset()] # noqa: RUF012 + + from corpus_forge.ingest import _plan_ingest + + assert _plan_ingest(_StubConfig()) == {} + + +class TestClassifyIngestError: + """Tests for ``_classify_and_log_ingest_error`` — the per-document + failure logger that distinguishes embedder NaN / 5xx from real + extractor failures.""" + + def test_nan_message_logs_at_warning_with_hint(self, caplog): + """The Ollama 500-with-NaN error must surface at WARNING with + a model-selection hint, NOT as a generic 'Extractor failed'.""" + import logging + + from corpus_forge.ingest import _classify_and_log_ingest_error + + class _Raw: + source_uri = "filesystem://Workspace/M1/HYBRID_PACK.md" + + with caplog.at_level(logging.WARNING, logger="corpus_forge.ingest"): + _classify_and_log_ingest_error( + _Raw(), + RuntimeError( + "Error code: 500 - {'error': {'message': " + "'failed to encode response: json: unsupported value: NaN'}}" + ), + ) + records = [r for r in caplog.records if "NaN" in r.getMessage()] + assert records, "expected a NaN warning to be logged" + assert records[0].levelno == logging.WARNING + assert "try a different embedder" in records[0].getMessage() + + def test_generic_failure_keeps_extractor_taxonomy(self, caplog): + """Non-embedder failures must continue logging as 'Extractor + failed on X' so existing grep / dashboards still match.""" + import logging + + from corpus_forge.ingest import _classify_and_log_ingest_error + + class _Raw: + source_uri = "filesystem://Workspace/notes/broken.pdf" + + with caplog.at_level(logging.INFO, logger="corpus_forge.ingest.extract"): + _classify_and_log_ingest_error(_Raw(), ValueError("PDF parse error")) + records = [r for r in caplog.records if "Extractor failed" in r.getMessage()] + assert records, "expected an extractor-failure line on a generic exception" + + +class TestProgressAdvanceOnFailure: + """Regression test: the per-source and global progress bars must + advance on EVERY iteration of the ingest loop, not just successes. + + The planner's ``_plan_ingest`` totals come from ``estimate_sync`` + which counts every file regardless of whether ingest will succeed. + If we only advanced on success, a single Ollama-NaN 5xx (or any + other recoverable per-file failure) would strand the global bar + permanently below 100% — exactly the misleading state the global + bar was added to eliminate. + """ + + def test_loop_body_uses_finally_for_progress_update(self): + """``ingest_once`` must call ``progress.update`` inside a + ``finally`` block so failed items still count toward the bars. + + Implemented as a source-text inspection rather than a full + end-to-end run because exercising the real ``Progress`` instance + under failure conditions requires significant fake-backend + + fake-source plumbing. The source-text check is precise enough + to lock the contract: the progress-update lines must follow the + ``finally:`` keyword, not the closing of the ``except`` block. + """ + import inspect + import textwrap + + from corpus_forge import ingest + + src = inspect.getsource(ingest.ingest_once) + # The two advance calls must live inside the ``finally`` block. + # Walk every ``finally:`` block and check that both updates + # appear before the next dedent. + dedented = textwrap.dedent(src) + # Quick structural check: both ``progress.update(, + # advance=1)`` calls appear AFTER a ``finally:`` keyword and + # BEFORE the next non-indented sibling statement. + assert "finally:" in dedented, "ingest_once must use try/finally for progress updates" + finally_idx = dedented.index("finally:") + tail = dedented[finally_idx:] + # Both updates must appear inside this finally block (before + # the next ``for`` / function-level boundary). + next_for = tail.find("\n for ") + next_func = tail.find("\ndef ") + end = min(idx for idx in (next_for, next_func, len(tail)) if idx > 0) + finally_block = tail[:end] + assert "progress.update(source_task, advance=1)" in finally_block, ( + "source-task advance must live inside the ingest finally block" + ) + assert "progress.update(global_task, advance=1)" in finally_block, ( + "global-task advance must live inside the ingest finally block" + ) diff --git a/tests/unit/test_prune_scorer.py b/tests/unit/test_prune_scorer.py new file mode 100644 index 0000000..08ee08e --- /dev/null +++ b/tests/unit/test_prune_scorer.py @@ -0,0 +1,612 @@ +"""Unit tests for the prune-admin module (``corpus_forge.admin.prune``). + +These tests run against a fake backend that supplies candidates via the +``iter_curation_candidates`` hook (same pattern as +``tests/unit/test_curation_selector.py``) plus a couple of optional +hooks the prune surface consults (``iter_chunk_feedback``, +``delete_chunks_by_ids``). + +The MinHash branch is exercised via a ``monkeypatch`` over +``corpus_forge.admin.prune._minhash_available`` so the tests never +require the ``rfc-nlp-data-quality-signals`` module to actually exist. +""" + +from __future__ import annotations + +import dataclasses +import math +from collections.abc import Iterable +from datetime import UTC, datetime, timedelta +from typing import Any + +import pytest + +from corpus_forge.admin import PruneCandidate, PruneReport, prune_dataset +from corpus_forge.admin import prune as prune_mod + +# ───────────────────────────────────────────────────────────────────────── +# Fixtures / fake backend +# ───────────────────────────────────────────────────────────────────────── + + +_NOW = datetime(2026, 5, 22, tzinfo=UTC) +_DEFAULT_MODIFIED_AT = _NOW - timedelta(days=2) + + +def _row( + chunk_id: int, + *, + document_id: int | None = 1, + text: str = "lorem ipsum", + heading: str | None = "h", + description: str | None = "d", + metadata: dict[str, Any] | None = None, + document_title: str | None = "title", + source_uri: str | None = "vault://notes/note.md", + modified_at: datetime | None | type[...] = ..., # type: ignore[assignment] + labels: list[tuple[str, str]] | None = None, + classifier_label: str | None = "topic_a", + classifier_confidence: float | None = 0.8, +) -> dict[str, Any]: + """Build a backend row dict in the shape the curation selector expects. + + Defaults represent a "well-filled" chunk; callers override fields to + push specific signals. + """ + + effective_modified_at: datetime | None = ( + _DEFAULT_MODIFIED_AT if modified_at is ... else modified_at # type: ignore[assignment] + ) + return { + "chunk_id": chunk_id, + "document_id": document_id, + "text": text, + "heading": heading, + "description": description, + "metadata": dict(metadata if metadata is not None else {"language": "en"}), + "document_title": document_title, + "source_uri": source_uri, + "modified_at": effective_modified_at, + "labels": list( + labels + if labels is not None + else [ + ("class", classifier_label or ""), + ("topic", "x"), + ] + ), + "classifier_label": classifier_label, + "classifier_confidence": classifier_confidence, + "embedding": None, + } + + +class _FakeBackend: + """Fake backend covering the prune-module surface. + + Records delete calls so tests can assert the apply path was (or was + NOT) reached. Optionally yields feedback rows via + ``iter_chunk_feedback``. + """ + + def __init__( + self, + rows: list[dict[str, Any]], + *, + feedback_rows: list[dict[str, Any]] | None = None, + ) -> None: + self._rows = rows + self._feedback_rows = feedback_rows or [] + self.delete_calls: list[list[int]] = [] + self.iter_calls: list[dict[str, Any]] = [] + + def iter_curation_candidates( + self, *, dataset: str | None, limit: int + ) -> Iterable[dict[str, Any]]: + self.iter_calls.append({"dataset": dataset, "limit": limit}) + yield from self._rows[:limit] + + def iter_chunk_feedback(self) -> Iterable[dict[str, Any]]: + yield from self._feedback_rows + + def delete_chunks_by_ids(self, chunk_ids: list[int]) -> int: + self.delete_calls.append(list(chunk_ids)) + return len(chunk_ids) + + +class _ExecuteOnlyBackend: + """Backend exposing ``_execute`` (no hook). Records executed SQL. + + Used to verify the bulk-DELETE fallback fires correctly when a + backend doesn't ship ``delete_chunks_by_ids``. + """ + + def __init__(self, rows: list[dict[str, Any]], *, name: str = "PostgresBackend") -> None: + self._rows = rows + self.__class__.__name__ = name # for `type(backend).__name__` branching + self.executed: list[tuple[str, tuple[Any, ...]]] = [] + + def iter_curation_candidates( + self, *, dataset: str | None, limit: int + ) -> Iterable[dict[str, Any]]: + yield from self._rows[:limit] + + def _execute(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]: + self.executed.append((sql, params)) + return [] + + +# ───────────────────────────────────────────────────────────────────────── +# Dry-run / empty-pool basics +# ───────────────────────────────────────────────────────────────────────── + + +def test_dry_run_default_does_not_delete() -> None: + backend = _FakeBackend([_row(i, classifier_confidence=0.1) for i in range(1, 11)]) + report = prune_dataset(backend, dataset="x", now=_NOW) + assert report.applied is False + assert report.deleted == 0 + assert backend.delete_calls == [] + # Default percentile is 10 → ceil(10 * 0.1) == 1 selected row. + assert len(report.selected) == 1 + + +def test_empty_pool_returns_empty_report() -> None: + backend = _FakeBackend([]) + report = prune_dataset(backend, dataset="x", now=_NOW) + assert report.considered == 0 + assert report.selected == [] + assert report.applied is False + assert report.deleted == 0 + assert report.summary_by_source == {} + + +# ───────────────────────────────────────────────────────────────────────── +# Score ordering invariants +# ───────────────────────────────────────────────────────────────────────── + + +def test_score_ordering_invariants(monkeypatch: pytest.MonkeyPatch) -> None: + # MinHash off → duplicate_density is 0 for every chunk. Differences + # come from confidence_deficit + missing_metadata + freshness_inverted. + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [ + # 1: pristine → minimal prune score. + _row(1, classifier_confidence=1.0, modified_at=_NOW), + # 2: mid-tier → some missing metadata. + _row( + 2, + classifier_confidence=0.5, + modified_at=_NOW - timedelta(days=30), + description=None, + heading=None, + ), + # 3: worst — no classifier, no metadata, stale. + _row( + 3, + classifier_confidence=None, + modified_at=_NOW - timedelta(days=365), + document_title=None, + heading=None, + description=None, + metadata={}, + source_uri=None, + labels=[], + ), + ] + backend = _FakeBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + # All three selected; first must be the worst (chunk 3). + assert [c.chunk_id for c in report.selected] == [3, 2, 1] + # Scores must be in monotonic-descending order. + scores = [c.prune_score for c in report.selected] + assert scores == sorted(scores, reverse=True) + # Bounds check. + assert all(0.0 <= c.prune_score <= 1.0 for c in report.selected) + + +# ───────────────────────────────────────────────────────────────────────── +# Percentile controls selection count +# ───────────────────────────────────────────────────────────────────────── + + +def test_percentile_controls_selection_count(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [_row(i, classifier_confidence=0.1) for i in range(1, 21)] + backend = _FakeBackend(rows) + + r10 = prune_dataset(backend, dataset="x", percentile=10, now=_NOW) + assert len(r10.selected) == math.ceil(20 * 0.10) == 2 + + r50 = prune_dataset(backend, dataset="x", percentile=50, now=_NOW) + assert len(r50.selected) == math.ceil(20 * 0.50) == 10 + + r100 = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + assert len(r100.selected) == 20 + + r0 = prune_dataset(backend, dataset="x", percentile=0, now=_NOW) + assert r0.selected == [] + + +def test_percentile_out_of_range_raises() -> None: + backend = _FakeBackend([_row(1)]) + with pytest.raises(ValueError, match="percentile"): + prune_dataset(backend, dataset="x", percentile=-1, now=_NOW) + with pytest.raises(ValueError, match="percentile"): + prune_dataset(backend, dataset="x", percentile=101, now=_NOW) + + +# ───────────────────────────────────────────────────────────────────────── +# Feedback-drag signal +# ───────────────────────────────────────────────────────────────────────── + + +def test_feedback_drag_flips_on_rejected(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + # Two identical rows. Chunk 7 has a rejected feedback row; chunk 8 doesn't. + rows = [ + _row(7, classifier_confidence=0.5), + _row(8, classifier_confidence=0.5), + ] + backend = _FakeBackend( + rows, + feedback_rows=[{"chunk_id": 7, "kind": "rejected", "rating": None}], + ) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + # Chunk 7 must sort strictly higher than chunk 8. + by_id = {c.chunk_id: c for c in report.selected} + assert by_id[7].prune_score > by_id[8].prune_score + # And feedback_drag = 1.0 on 7, 0.0 on 8. + assert by_id[7].sub_scores["feedback_drag"] == 1.0 + assert by_id[8].sub_scores["feedback_drag"] == 0.0 + + +def test_feedback_drag_flips_on_negative_rating(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [_row(1, classifier_confidence=0.5)] + backend = _FakeBackend( + rows, + feedback_rows=[{"chunk_id": 1, "kind": "comment", "rating": -1}], + ) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + assert report.selected[0].sub_scores["feedback_drag"] == 1.0 + + +def test_feedback_drag_zero_when_no_hook_and_no_execute( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + class _BareBackend: + """Backend without _execute or iter_chunk_feedback — feedback must degrade to 0.0.""" + + def __init__(self, rows: list[dict[str, Any]]) -> None: + self._rows = rows + + def iter_curation_candidates( + self, *, dataset: str | None, limit: int + ) -> Iterable[dict[str, Any]]: + yield from self._rows[:limit] + + backend = _BareBackend([_row(1, classifier_confidence=0.5)]) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + assert report.selected[0].sub_scores["feedback_drag"] == 0.0 + + +# ───────────────────────────────────────────────────────────────────────── +# Duplicate-density signal +# ───────────────────────────────────────────────────────────────────────── + + +def test_duplicate_density_skipped_when_minhash_unavailable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [_row(i, classifier_confidence=0.5) for i in range(1, 4)] + backend = _FakeBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + # Every selected candidate has duplicate_density = 0.0. + for cand in report.selected: + assert cand.sub_scores["duplicate_density"] == 0.0 + # The report exposes the availability flag — promoted off of + # `selected[0].sub_scores` so candidate shape is uniform. + assert report.duplicate_density_available is False + # No candidate carries the legacy `duplicate_density_available` key. + for cand in report.selected: + assert "duplicate_density_available" not in cand.sub_scores + + +def test_duplicate_density_used_when_minhash_available( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Stub a minhash module that gives chunk 1 a very small jaccard + # distance (high duplicate density) and chunk 2 a large distance. + class _StubMinhash: + @staticmethod + def jaccard_neighbor_distance(*, chunk_id: int, text: str) -> float: + _ = text + return 0.05 if chunk_id == 1 else 0.95 + + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: True) + # Inject the stub into corpus_forge.quality.minhash. + import sys + import types + + pkg = types.ModuleType("corpus_forge.quality") + pkg.__path__ = [] # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "corpus_forge.quality", pkg) + monkeypatch.setitem(sys.modules, "corpus_forge.quality.minhash", _StubMinhash) + + rows = [ + _row(1, classifier_confidence=0.5), + _row(2, classifier_confidence=0.5), + ] + backend = _FakeBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + by_id = {c.chunk_id: c for c in report.selected} + # Chunk 1 — near-duplicate — must outrank chunk 2. + assert by_id[1].sub_scores["duplicate_density"] > by_id[2].sub_scores["duplicate_density"] + assert by_id[1].prune_score > by_id[2].prune_score + # Report-level flag shows the signal ran. Candidate sub_scores remain + # uniform across the selection (no special-case head element). + assert report.duplicate_density_available is True + for cand in report.selected: + assert "duplicate_density_available" not in cand.sub_scores + + +# ───────────────────────────────────────────────────────────────────────── +# Apply path +# ───────────────────────────────────────────────────────────────────────── + + +def test_apply_calls_delete_path(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [_row(i, classifier_confidence=0.1) for i in range(1, 11)] + backend = _FakeBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=30, apply=True, now=_NOW) + expected_count = math.ceil(10 * 0.30) + assert len(report.selected) == expected_count + assert backend.delete_calls, "apply=True must invoke the delete path" + deleted_ids = backend.delete_calls[-1] + assert deleted_ids == [c.chunk_id for c in report.selected] + assert report.applied is True + assert report.deleted == expected_count + + +def test_apply_with_execute_fallback_emits_bulk_delete( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When the backend lacks `delete_chunks_by_ids` but has `_execute` + + a Postgres-shaped class name, the prune surface must emit one bulk + ``DELETE ... WHERE id = ANY(%s)`` statement.""" + + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [_row(i, classifier_confidence=0.1) for i in range(1, 6)] + backend = _ExecuteOnlyBackend(rows, name="PostgresBackend") + report = prune_dataset(backend, dataset="x", percentile=40, apply=True, now=_NOW) + expected = math.ceil(5 * 0.40) + assert len(report.selected) == expected + # The recorded statements include exactly one DELETE. + deletes = [s for s, _p in backend.executed if "DELETE" in s.upper()] + assert len(deletes) == 1 + assert "ANY(%s)" in deletes[0] + assert report.deleted == expected + + +def test_apply_with_empty_selection_does_not_delete(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [_row(i) for i in range(1, 6)] + backend = _FakeBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=0, apply=True, now=_NOW) + assert report.selected == [] + assert backend.delete_calls == [] + # `apply=True` with nothing to delete is a no-op — `applied` stays False. + assert report.applied is False + assert report.deleted == 0 + + +# ───────────────────────────────────────────────────────────────────────── +# Summary / dataclass discipline +# ───────────────────────────────────────────────────────────────────────── + + +def test_summary_by_source_groups_by_stem(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + rows = [ + _row(1, source_uri="vault://docs/a.md", classifier_confidence=0.0), + _row(2, source_uri="vault://docs/a.md", classifier_confidence=0.0), + _row(3, source_uri="vault://docs/b.md", classifier_confidence=0.0), + _row(4, source_uri=None, classifier_confidence=0.0), + ] + backend = _FakeBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=100, now=_NOW) + assert report.summary_by_source == {"a": 2, "b": 1, "": 1} + + +def test_dataclasses_frozen() -> None: + candidate = PruneCandidate( + chunk_id=1, + document_id=2, + source_uri="vault://x.md", + prune_score=0.5, + sub_scores={"confidence_deficit": 0.5}, + reason="test", + ) + with pytest.raises(dataclasses.FrozenInstanceError): + candidate.prune_score = 0.9 # type: ignore[misc] + + report = PruneReport( + dataset="x", + percentile=10, + considered=0, + selected=[], + applied=False, + deleted=0, + summary_by_source={}, + ) + with pytest.raises(dataclasses.FrozenInstanceError): + report.deleted = 1 # type: ignore[misc] + + +def test_dataset_label_propagates_to_backend_call() -> None: + backend = _FakeBackend([_row(1)]) + prune_dataset(backend, dataset="demo", now=_NOW) + assert backend.iter_calls == [{"dataset": "demo", "limit": 2000}] + + +def test_candidate_pool_overrides_default() -> None: + backend = _FakeBackend([_row(i) for i in range(1, 6)]) + prune_dataset(backend, dataset="x", candidate_pool=3, now=_NOW) + assert backend.iter_calls[-1]["limit"] == 3 + + +# ───────────────────────────────────────────────────────────────────────── +# SQLite chunked-DELETE fallback +# ───────────────────────────────────────────────────────────────────────── + + +class _SQLiteShapedBackend: + """Execute-only backend that mimics SQLite: ``_paramstyle = 'qmark'`` + and no Postgres-y substring in the class name. No + ``delete_chunks_by_ids`` hook so the chunked-IN path fires. + """ + + _paramstyle = "qmark" + + def __init__(self, rows: list[dict[str, Any]]) -> None: + self._rows = rows + self.executed: list[tuple[str, tuple[Any, ...]]] = [] + + def iter_curation_candidates( + self, *, dataset: str | None, limit: int + ) -> Iterable[dict[str, Any]]: + yield from self._rows[:limit] + + def _execute(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]: + self.executed.append((sql, params)) + return [] + + +def test_apply_sqlite_chunked_delete_via_execute(monkeypatch: pytest.MonkeyPatch) -> None: + """SQLite-shaped backend falls back to chunked ``DELETE … WHERE id IN (?, …)``.""" + + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + # Shrink the batch size so a modest fixture exercises chunking. + monkeypatch.setattr(prune_mod, "_SQLITE_BATCH_SIZE", 3) + + # 7 candidates with percentile=100 → 7 deletes → ceil(7/3) = 3 batches. + rows = [_row(i, classifier_confidence=0.0) for i in range(1, 8)] + backend = _SQLiteShapedBackend(rows) + report = prune_dataset(backend, dataset="x", percentile=100, apply=True, now=_NOW) + + assert report.applied is True + assert report.deleted == 7 + + deletes = [(sql, params) for sql, params in backend.executed if "DELETE" in sql.upper()] + # Three batches: sizes 3, 3, 1. + assert len(deletes) == 3 + batch_sizes = [len(params) for _sql, params in deletes] + assert batch_sizes == [3, 3, 1] + + for sql, params in deletes: + # Schema-prefix asymmetry vs Postgres: SQLite path uses + # unqualified `chunks` (no `corpus.` prefix). + assert "corpus." not in sql + assert "DELETE FROM chunks" in sql + # ``?`` placeholders, exactly one per parameter. + assert sql.count("?") == len(params) + # Postgres `ANY(%s)` must not appear on the SQLite path. + assert "ANY(" not in sql + + +# ───────────────────────────────────────────────────────────────────────── +# Postgres dispatch via _paramstyle capability probe +# ───────────────────────────────────────────────────────────────────────── + + +def test_paramstyle_pyformat_routes_to_postgres_bulk_delete( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Backend whose class name doesn't say ``Postgres`` but whose + ``_paramstyle`` is ``"pyformat"`` must still take the bulk-ANY(%s) + path. This locks in the capability probe over the brittle class-name + check.""" + + monkeypatch.setattr(prune_mod, "_minhash_available", lambda: False) + + class _OpaqueBackend(_ExecuteOnlyBackend): + _paramstyle = "pyformat" + + rows = [_row(i, classifier_confidence=0.0) for i in range(1, 6)] + # Deliberately pick a class name that contains no Postgres hint. + backend = _OpaqueBackend(rows, name="OpaqueSqlBackend") + report = prune_dataset(backend, dataset="x", percentile=40, apply=True, now=_NOW) + expected = math.ceil(5 * 0.40) + assert report.deleted == expected + deletes = [s for s, _p in backend.executed if "DELETE" in s.upper()] + assert len(deletes) == 1 + assert "ANY(%s)" in deletes[0] + assert "corpus.chunks" in deletes[0] + + +# ───────────────────────────────────────────────────────────────────────── +# Unknown-dataset safety +# ───────────────────────────────────────────────────────────────────────── + + +def test_unknown_dataset_raises() -> None: + """A named dataset that the backend doesn't know must abort the prune. + + The guard is critical under ``apply=True`` — without it the run + would silently fall through to "walk every dataset", which would + delete rows from the wrong scope. + """ + + class _NamedDatasetBackend(_FakeBackend): + def find_dataset_id_by_name(self, name: str) -> int | None: + _ = name + return None # unknown + + backend = _NamedDatasetBackend([_row(1)]) + with pytest.raises(ValueError, match="dataset 'ghost' not found"): + prune_dataset(backend, dataset="ghost", now=_NOW) + # No iteration over candidates happens. + assert backend.iter_calls == [] + assert backend.delete_calls == [] + + +def test_unknown_dataset_resolver_absent_falls_through() -> None: + """Backends without ``find_dataset_id_by_name`` keep the old behaviour: + we trust the caller and let the iter step decide. This is the + bare-minimum API the prune surface accepts.""" + + rows = [_row(1)] + backend = _FakeBackend(rows) + # The default _FakeBackend has no find_dataset_id_by_name attr. + assert not hasattr(backend, "find_dataset_id_by_name") + report = prune_dataset(backend, dataset="anything", now=_NOW) + assert report.considered == 1 + + +def test_dataset_none_skips_unknown_dataset_check() -> None: + """When the caller intentionally passes ``dataset=None`` (walk all), + the unknown-dataset guard must not fire even if the backend would + resolve names — None is the explicit opt-out.""" + + class _NamedDatasetBackend(_FakeBackend): + def find_dataset_id_by_name(self, name: str) -> int | None: + _ = name + return None # would block, but we never call it + + backend = _NamedDatasetBackend([_row(1)]) + report = prune_dataset(backend, dataset=None, now=_NOW) + assert report.considered == 1