diff --git a/.claude/plans/pileup-column-aggregation.md b/.claude/plans/pileup-column-aggregation.md new file mode 100644 index 00000000..f3bc8ee8 --- /dev/null +++ b/.claude/plans/pileup-column-aggregation.md @@ -0,0 +1,881 @@ +# Plan: pileup column summaries and custom aggregation + +## Why this exists + +This is a high-level but implementation-oriented plan for making seqair's pileup engine more directly useful to downstream tools such as perbase. + +perbase currently uses pileup columns as an intermediate representation. For every output row it walks the pileup alignments and recomputes command-specific summary state: + +- output depth, +- A/C/G/T/N counts, +- insertion count, +- deletion count, +- reference-skip count, +- failed-read count, +- optionally mate-overlap resolution. + +seqair already walks the active records for each reference position to construct each `PileupAlignment`. The opportunity is to let seqair either: + +1. compute common column summaries while doing that work, or +2. let callers install a custom per-column accumulator and avoid both a downstream second pass and, for simple use cases, the internal per-column alignment `Vec` materialization. + +This is the same broad motivation as the older noodles pileup draft: make the pileup engine provide richer per-column information directly, instead of requiring every downstream consumer to re-analyze the stack of reads. + +## Current seqair shape + +Relevant current code is in `crates/seqair/src/bam/pileup.rs`. + +Today, the public API is: + +```rust +pub fn pileups(&mut self) -> Option> +``` + +`PileupColumn` internally owns a per-column alignment vector and exposes iterators over it: + +```rust +pub struct PileupColumn<'store, U = ()> { + pos: Pos0, + reference_base: Base, + alignments: Vec, + store: &'store RecordStore, +} +``` + +Important wording: seqair does **not** publicly return a `Vec` directly. It returns a `PileupColumn`. The `Vec` is an internal implementation detail that matters for performance/API design. + +The hot path currently does roughly: + +```rust +let mut alignments = Vec::with_capacity(self.active.len()); + +for active in &self.active { + let Some(info) = active.cigar.pos_info_at(pos) else { continue }; + let op = match info { ... }; + alignments.push(PileupAlignment { ... }); +} + +if let Some(max) = self.max_depth { + alignments.truncate(max as usize); +} + +return Some((pos, reference_base, alignments)); +``` + +Downstream perbase then iterates `column.raw_alignments()` or `column.alignments()` to build its own row. + +## Perbase requirements to keep in mind + +### Non-mate `base-depth` + +For each alignment in a column, perbase currently applies these semantics: + +1. If the read fails the user read filter, it contributes to `FAIL` and to no other output count. +2. If the read is a refskip, it contributes to `REF_SKIP` and not to output depth. +3. If the read is a deletion, it contributes to `DEL` and does contribute to output depth. +4. If the read has a base, it contributes to output depth and one of A/C/G/T/N. +5. If there is a base-quality cutoff and the observed base quality is below that cutoff or unavailable, the base is counted as N. +6. If the read has a simple insertion anchored at this position, it contributes to `INS`. + +A direct accumulator can compute this from zero instead of starting from raw pileup depth and subtracting fail/refskip counts. + +### Mate-aware `base-depth -m` + +Mate-aware mode is harder. It groups alignments by QNAME within each column and picks a representative based on perbase's mate resolution strategy. That requires comparing multiple alignments from the same read name and sometimes using base qualities and recommended IUPAC bases. + +Do not make mate-aware mode the first target. The first useful milestone is faster non-mate `base-depth`. Later, the same custom aggregation hook may support mate-aware aggregation, but it will still need some per-column grouping storage. + +### `only-depth` + +`only-depth` is mostly a separate optimization path. It wants reference intervals/CIGAR spans, not full pileup columns. Do not block this plan on `only-depth`, but keep the same design principle in mind: downstream consumers should be able to request less work when they need less information. + +## Goals + +1. Add cheap built-in column summaries computed while building each pileup column. +2. Add an opt-in custom per-column aggregation/fold API. +3. Let perbase classify records into count/fail/drop using existing seqair extension points first. +4. Avoid breaking or changing `PileupEngine::pileups()` behavior. +5. Validate everything against the current htslib parity tests and perbase output parity. + +## Non-goals for the first prototype + +- Do not remove `PileupColumn::alignments()` or `raw_alignments()`. +- Do not change default read filtering semantics. +- Do not make perbase-specific fields part of the default `PileupColumn` API. +- Do not require all users to adopt the accumulator API. +- Do not solve mate-aware aggregation before the non-mate path proves useful. +- Do not change max-depth behavior except where an internal refactor is provably equivalent. + +--- + +# Phase 0: specs, terminology, and measurement + +## Add Tracey spec rules first + +Before implementation, add spec rules in `docs/spec/4-pileup.md`. + +Potential rules: + +- `r[pileup.summary]`: `PileupColumn` exposes a column summary. +- `r[pileup.summary_matches_alignments]`: `PileupColumn::summary()` reports the same post-max-depth counts that a caller would compute by iterating `raw_alignments()`. +- `r[pileup.summary_base_counts]`: summary base counts include only alignments with an actual query base. +- `r[pileup.summary_indel_counts]`: summary insertion/deletion/refskip counts are defined in terms of `PileupOp` variants. +- `r[pileup.custom_aggregation]`: an opt-in custom aggregation API can observe each emitted alignment in a column. +- `r[pileup.custom_aggregation_no_materialization]`: custom aggregation does not need to allocate a per-column alignment vector unless the accumulator chooses to store alignments. +- `r[pileup.custom_aggregation_max_depth]`: custom aggregation observes the same alignments that `pileups()` would expose after max-depth truncation. +- `r[pileup.custom_aggregation_existing_api_unchanged]`: the existing lending `pileups()` API remains unchanged. +- `r[pileup.read_disposition_via_extras]`: downstream read disposition can be represented as `RecordStore` extras without changing default binary filtering. + +## Define terms carefully + +Avoid ambiguous names around insertions and complex indels. + +Current `PileupOp` variants: + +```rust +Match { qpos, base, qual } +Insertion { qpos, base, qual, insert_len } +Deletion { del_len } +ComplexIndel { del_len, insert_len, is_refskip } +RefSkip +``` + +Potential summary terminology: + +- `depth`: number of emitted alignments in the column; same as current `column.depth()`. +- `match_depth`: number of emitted alignments with a query base; `Match` + `Insertion`. +- `base_counts`: counts bases from `Match` + `Insertion` only. +- `op_counts.match_ops`: count of `PileupOp::Match`. +- `op_counts.insertion_ops`: count of `PileupOp::Insertion`. +- `op_counts.deletion_ops`: count of `PileupOp::Deletion`. +- `op_counts.complex_indel_ops`: count of `PileupOp::ComplexIndel`. +- `op_counts.refskip_ops`: count of `PileupOp::RefSkip`. +- `insertion_count`: decide explicitly whether this means only `Insertion` or any op with `insert_len > 0` (`Insertion` + `ComplexIndel`). For a generic seqair summary, `insertion_count` should probably mean any insertion signal. If this is too ambiguous, prefer `insertion_signal_count` or keep only `op_counts` initially. +- `deletion_count`: `Deletion` + `ComplexIndel { is_refskip: false, .. }`. +- `ref_skip_count`: `RefSkip` + `ComplexIndel { is_refskip: true, .. }`. + +perbase's current `INS` semantics may not equal a generic `insert_len > 0` count for `ComplexIndel`. Keep perbase-specific counting in the custom accumulator, not in the built-in summary. + +## Baseline before implementation + +Before changing code, record current benchmark/profile numbers in seqair and perbase. + +perbase baseline from the HG00157 10 Mb BAM subset after seqair v0.1.0 wiring: + +- `base-depth`: htslib `4.992 ± 0.207 s`, seqair `5.323 ± 0.127 s`, exact parity. +- `base-depth -m`: htslib `49.455 s`, seqair `32.429 s`, seqair `1.53x` faster, with known sparse default `-F 0` mate-order diffs. +- `only-depth`: htslib `914.5 ± 38.7 ms`, seqair `1.024 ± 0.072 s`, exact parity. +- `only-depth -x`: htslib `757.2 ± 6.5 ms`, seqair `1.012 ± 0.088 s`, exact parity. + +Expected first win: non-mate `base-depth`, because it currently materializes/iterates pileup alignments and then performs perbase's counting pass. + +--- + +# Phase 1: built-in `PileupSummary` + +## API sketch + +Add small summary/count structs to `pileup.rs`. + +```rust +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct BaseCounts { + pub a: usize, + pub c: usize, + pub g: usize, + pub t: usize, + pub unknown: usize, +} + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct PileupOpCounts { + pub match_ops: usize, + pub insertion_ops: usize, + pub deletion_ops: usize, + pub complex_indel_ops: usize, + pub refskip_ops: usize, +} + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct PileupSummary { + pub depth: usize, + pub match_depth: usize, + pub insertion_count: usize, + pub deletion_count: usize, + pub ref_skip_count: usize, + pub op_counts: PileupOpCounts, + pub base_counts: BaseCounts, +} +``` + +Expose it from `PileupColumn`: + +```rust +pub struct PileupColumn<'store, U = ()> { + pos: Pos0, + reference_base: Base, + alignments: Vec, + summary: PileupSummary, + store: &'store RecordStore, +} + +impl<'store, U> PileupColumn<'store, U> { + pub fn summary(&self) -> &PileupSummary; + + // Optional convenience accessors backed by summary: + pub fn match_depth(&self) -> usize; + pub fn insertion_count(&self) -> usize; + pub fn deletion_count(&self) -> usize; + pub fn ref_skip_count(&self) -> usize; + pub fn base_counts(&self) -> BaseCounts; +} +``` + +`match_depth()` currently iterates alignments. After this change it can return `self.summary.match_depth`. + +## Summary semantics + +`summary()` should describe the emitted column exactly. That means: + +- post-max-depth truncation, +- same alignments as `raw_alignments()` exposes, +- same `depth()` as current `self.alignments.len()`, +- no hidden perbase read filtering, +- no base-quality threshold. + +If pre-truncation information becomes useful later, add separate fields such as: + +```rust +pub raw_depth_before_max_depth: usize, +pub truncated_by_max_depth: bool, +``` + +Do not mix pre- and post-truncation counts in the first API. + +## Implementation approach + +Refactor the current column-building loop so summary is updated only for alignments that will be emitted. + +Instead of building every alignment and then truncating, apply the max-depth limit while pushing: + +```rust +let mut alignments = Vec::with_capacity(self.active.len()); +let mut summary = PileupSummary::default(); +let max = self.max_depth.map(|m| m as usize); + +for active in &self.active { + if max.is_some_and(|max| alignments.len() >= max) { + break; + } + + let Some(info) = active.cigar.pos_info_at(pos) else { continue }; + let alignment = build_alignment(active, info, &self.store); + summary.observe(&alignment); + alignments.push(alignment); +} +``` + +This should be equivalent to the current `truncate(max)` behavior because only the first `max` emitted alignments survive. It also avoids constructing discarded `PileupAlignment`s when max depth is hit. + +Add helper methods: + +```rust +impl BaseCounts { + fn add_base(&mut self, base: Base); +} + +impl PileupSummary { + fn observe(&mut self, alignment: &PileupAlignment); +} +``` + +Keep these helpers private initially unless there is a clear downstream reason to expose them. + +## Tests + +Add unit tests in `pileup.rs` or an existing pileup test module. + +For each test, compute expected summary independently from the emitted alignments in the test body, not by calling the same `PileupSummary::observe` helper. + +Cases: + +1. Only matches: base counts and match depth. +2. Empty `SEQ=*`: `Base::Unknown`, unavailable quality, still counts in `match_depth` and `base_counts.unknown`. +3. Deletion: `depth` includes deletion, `match_depth` does not, `deletion_count` increments. +4. Refskip: `depth` includes refskip, `match_depth` does not, `ref_skip_count` increments. +5. Simple insertion: `match_depth` and base count increment; insertion op/count increments. +6. Complex indel with deletion: complex op increments; deletion-like count increments; insertion signal behavior matches documented choice. +7. Complex indel with refskip: complex op increments; refskip-like count increments; insertion signal behavior matches documented choice. +8. Max depth: summary equals only the emitted/truncated alignments. +9. `match_depth()` accessor no longer scans but returns the same value as manual iteration. + +## Why this helps perbase + +This is not enough to replace perbase's counting path because perbase has read filters, fail counts, and optional base-quality filtering. However, it is still useful because: + +- it removes repeated scans for generic counts, +- it gives a correctness oracle for the later custom accumulator, +- it establishes clear summary semantics around complex indels and max depth, +- it is a small safe first PR before the more experimental fold API. + +--- + +# Phase 2: internal column observer refactor + +Before exposing a public custom aggregation API, refactor the hot loop around an internal observer/sink. The goal is to have one piece of code that: + +1. advances positions, +2. maintains the active set, +3. builds `PileupOp`/`PileupAlignment` values, +4. applies max-depth semantics, +5. either materializes alignments or feeds them to a custom accumulator. + +## Internal helper shape + +Possible internal helper: + +```rust +struct ColumnStart { + pos: Pos0, + reference_base: Base, +} + +struct ColumnEnd { + depth: usize, + truncated_by_max_depth: bool, +} + +trait ColumnSink { + type Output; + + fn begin_column(&mut self, start: ColumnStart); + + fn observe_alignment( + &mut self, + alignment: PileupAlignment, + store: &RecordStore, + ); + + fn finish_column(&mut self, end: ColumnEnd) -> Option; +} +``` + +Use by-value `PileupAlignment` in `observe_alignment`. The engine has already constructed the value. A materializing sink can push it into a `Vec`; a counting sink can inspect and drop it. This avoids giving the accumulator references that might escape the hot loop. + +The existing `pileups()` path can use a private materializing sink that returns: + +```rust +struct MaterializedColumn { + pos: Pos0, + reference_base: Base, + alignments: Vec, + summary: PileupSummary, +} +``` + +Then `pileups()` attaches `store: &self.store` and returns `PileupColumn<'_, U>` as it does today. + +## Important max-depth rule + +The observer must see the same alignments that `pileups()` would expose. + +Recommended behavior: + +- Maintain an emitted alignment count for the current column. +- If `max_depth` is set and emitted count reaches the limit, stop observing/building additional alignments for that column. +- This matches current `alignments.truncate(max)` output while avoiding work after the limit. + +Do not let the custom API observe alignments that `pileups()` would later truncate unless the API explicitly grows a pre-max-depth mode. + +## Expected code movement + +Likely split current `advance()` into smaller helpers: + +```rust +fn advance_active_set_to(&mut self, pos: Pos0) -> Option<()>; +fn next_reference_base(&self, pos: Pos0) -> Base; +fn build_alignment(&self, active: &ActiveRecord, pos: Pos0) -> Option; +fn advance_with_sink(&mut self, sink: &mut S) -> Option +where + S: ColumnSink; +``` + +Keep helper visibility private at first. + +## Tests + +At this phase, existing public behavior should be unchanged. Good tests: + +- Run existing pileup parity tests unchanged. +- Add a test-only sink that collects simple `(pos, depth, match_depth)` tuples and compare to `pileups()`. +- Add a max-depth test proving sink-observed output is exactly the materialized output. + +--- + +# Phase 3: public custom aggregation API + +After the internal sink refactor is working, expose an opt-in accumulator API. + +## API sketch + +A minimal infallible API: + +```rust +pub struct PileupColumnContext<'store, U> { + pub pos: Pos0, + pub reference_base: Base, + pub store: &'store RecordStore, + pub depth: usize, + pub truncated_by_max_depth: bool, +} + +pub trait PileupColumnAccumulator { + type Output; + + fn begin_column(&mut self, pos: Pos0, reference_base: Base); + + fn observe_alignment( + &mut self, + alignment: PileupAlignment, + store: &RecordStore, + ); + + fn finish_column(&mut self, context: PileupColumnContext<'_, U>) -> Option; +} + +impl PileupEngine { + pub fn pileup_with(&mut self, accumulator: &mut A) -> Option + where + A: PileupColumnAccumulator; +} +``` + +Notes: + +- `Output` should be owned. Do not let outputs borrow from the engine/store in v1; that keeps lifetimes tractable and fits perbase rows. +- `observe_alignment` gets `PileupAlignment` by value. Accumulators that need to retain it can store it. Accumulators that only count can avoid per-column allocation. +- Store access is passed to support `RecordStore::extra(record_idx)`, QNAME lookup, aux lookup, and future downstream grouping. +- `finish_column` can return `None` if the accumulator wants to skip output for a column. + +If fallibility is needed later, add a second API rather than complicating v1: + +```rust +pub trait TryPileupColumnAccumulator { + type Output; + type Error; + ... +} + +pub fn try_pileup_with(&mut self, accumulator: &mut A) -> Option>; +``` + +perbase's first use case should not need fallibility. + +## Alternative closure API + +A closure API may be ergonomic for simple summaries: + +```rust +engine.pileups_with(|column, alignment| { + // update caller state +}); +``` + +However, a trait is probably better for the first serious prototype because it gives explicit lifecycle hooks: + +- initialize per-column state, +- observe alignments, +- finish/emit output, +- reuse allocation inside the accumulator across columns. + +A closure helper can be layered on later. + +## Example: reproduce `PileupSummary` + +A seqair test/example accumulator: + +```rust +#[derive(Default)] +struct SummaryAccumulator { + summary: PileupSummary, +} + +impl PileupColumnAccumulator for SummaryAccumulator { + type Output = PileupSummary; + + fn begin_column(&mut self, _pos: Pos0, _reference_base: Base) { + self.summary = PileupSummary::default(); + } + + fn observe_alignment(&mut self, alignment: PileupAlignment, _store: &RecordStore) { + self.summary.observe(&alignment); + } + + fn finish_column(&mut self, _context: PileupColumnContext<'_, U>) -> Option { + Some(self.summary) + } +} +``` + +Test it by comparing `engine.pileup_with(&mut SummaryAccumulator)` against `engine.pileups().summary()` over the same synthetic store. + +## Example: perbase-like non-mate accumulator + +Pseudo-code for the shape perbase wants: + +```rust +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum PerbaseDisposition { + Count, + FailOnly, +} + +struct PerbaseColumnAccumulator { + min_base_quality: Option, + row: PerbaseRow, +} + +impl PileupColumnAccumulator for PerbaseColumnAccumulator { + type Output = PerbaseRow; + + fn begin_column(&mut self, pos: Pos0, reference_base: Base) { + self.row = PerbaseRow::new(pos, reference_base); + } + + fn observe_alignment( + &mut self, + alignment: PileupAlignment, + store: &RecordStore, + ) { + match store.extra(alignment.record_idx()) { + PerbaseDisposition::FailOnly => { + self.row.fail += 1; + return; + } + PerbaseDisposition::Count => {} + } + + match alignment.op() { + PileupOp::RefSkip => { + self.row.ref_skip += 1; + } + PileupOp::ComplexIndel { is_refskip: true, .. } => { + self.row.ref_skip += 1; + } + PileupOp::Deletion { .. } | PileupOp::ComplexIndel { is_refskip: false, .. } => { + self.row.depth += 1; + self.row.del += 1; + } + PileupOp::Match { base, qual, .. } => { + self.row.depth += 1; + self.row.add_base_with_quality(*base, *qual, self.min_base_quality); + } + PileupOp::Insertion { base, qual, .. } => { + self.row.depth += 1; + self.row.add_base_with_quality(*base, *qual, self.min_base_quality); + self.row.ins += 1; + } + } + } + + fn finish_column(&mut self, _context: PileupColumnContext<'_, PerbaseDisposition>) -> Option { + Some(self.row.clone()) + } +} +``` + +This mirrors perbase's current non-mate semantics but avoids: + +- constructing a public `PileupColumn`, +- storing an internal per-column `Vec` for simple counting, +- a downstream second pass over `column.raw_alignments()`. + +## Tests + +1. Summary accumulator equals `PileupColumn::summary()`. +2. A toy accumulator that counts only depth equals `column.depth()`. +3. A perbase-like accumulator equals manual iteration over `raw_alignments()` for: + - passing reads, + - fail-only reads, + - base-quality demotion to N, + - deletion, + - refskip, + - insertion, + - complex indel. +4. Max-depth gives identical observed alignments/counts to current `pileups()`. + +--- + +# Phase 4: read disposition for pass/fail/drop + +## Problem + +`CustomizeRecordStore::filter_raw` and `filter` are binary: keep or reject. + +That is ideal for use cases where rejected reads truly disappear, but perbase `base-depth` has a `FAIL` output column. Reads that fail user filters should still contribute to `FAIL` if they cover the position. + +So perbase needs three conceptual states: + +```rust +enum ReadDisposition { + Count, + FailOnly, + Drop, +} +``` + +Meaning: + +- `Count`: read contributes normally. +- `FailOnly`: read remains in the pileup for coverage overlap but contributes only to `FAIL`. +- `Drop`: read is omitted completely. + +## Prototype using existing seqair APIs + +Do **not** start by changing `CustomizeRecordStore`. + +Use existing extras first: + +```rust +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum PerbaseDisposition { + Count, + FailOnly, +} +``` + +Then implement a customizer in perbase: + +```rust +impl CustomizeRecordStore for PerbaseCustomize { + type Extra = PerbaseDisposition; + + fn filter_raw(&mut self, fields: &FilterRawFields<'_>) -> bool { + // Return false only for true Drop cases. + // For base-depth, normal user read-filter failures are NOT dropped, + // because they need to be counted as FAIL per covered position. + true + } + + fn compute( + &mut self, + rec: &SlimRecord, + _store: &RecordStore, + ) -> PerbaseDisposition { + if self.read_filter_passes(rec.flags, rec.mapq) { + PerbaseDisposition::Count + } else { + PerbaseDisposition::FailOnly + } + } +} +``` + +Then the custom pileup accumulator reads: + +```rust +store.extra(alignment.record_idx()) +``` + +and decides whether an observed alignment contributes to normal counts or fail-only counts. + +## Why extras-first is the right first step + +- No public seqair filtering API churn. +- Perbase can test the semantics without waiting on a new record-store trait. +- It uses the existing `RecordStore` design exactly as intended. +- The custom accumulator API must support store extras anyway. + +## Potential future seqair API + +If many users need this pattern, consider a first-class classification trait later: + +```rust +pub enum ReadDisposition { + Count, + KeepAsExtraOnly, + Drop, +} + +pub trait ClassifyRecordStore: Clone { + type Extra; + + fn classify_raw(&mut self, fields: &FilterRawFields<'_>) -> ReadDisposition; + + fn compute( + &mut self, + rec: &SlimRecord, + store: &RecordStore, + ) -> Self::Extra; +} +``` + +But this should be driven by evidence from the perbase prototype. It may turn out that `CustomizeRecordStore::compute` + extras is sufficient. + +## Optional future decode-profile optimization + +For perbase fail-only reads, the accumulator only needs to know that the read covers a position. It does not need base/quality values because fail-only reads do not contribute to base counts. + +A future, larger optimization could combine disposition with decode profiles: + +- `Count`: decode/store CIGAR + SEQ + QUAL + needed metadata. +- `FailOnly`: decode/store CIGAR + minimal metadata, maybe skip SEQ/QUAL. +- `Drop`: skip entirely. + +That is outside the first pileup aggregation prototype, but it is worth keeping in mind if fail-only reads are a meaningful cost. + +--- + +# Phase 5: perbase integration experiment + +Once seqair has the custom aggregation API on a branch, wire perbase's experimental seqair backend to it. + +## Initial perbase target + +Start with non-mate `base-depth --seqair-pileup` only. + +Suggested perbase flow: + +1. Define `PerbaseDisposition` as a seqair `RecordStore` extra. +2. Fetch records into `RecordStore`. +3. Use `PileupEngine::pileup_with(&mut PerbaseColumnAccumulator)`. +4. Produce `PileupPosition` rows directly from accumulator output. +5. Keep `base-depth -m` on the current materialized `PileupColumn` path at first. + +## Perbase parity checklist + +Compare htslib and seqair custom aggregation for: + +- default filters, +- `-F` exclude flags, +- `-f` include flags if applicable, +- `-q` min MAPQ, +- `-Q` min base quality, +- insertions, +- deletions, +- refskips, +- empty `SEQ=*`, +- max depth, +- zero-position filling. + +Expected row semantics for non-mate custom aggregation: + +- `DEPTH`: count of passing reads with match/insertion/deletion observations; excludes refskips and fail-only reads. +- `A/C/G/T/N`: passing reads with base observations after base-quality demotion. +- `INS`: passing reads with simple insertion observations matching current perbase behavior. +- `DEL`: passing reads with deletion observations. +- `REF_SKIP`: passing reads with refskip observations. +- `FAIL`: fail-only reads with any pileup observation at the position. + +## Benchmark plan + +Run three versions: + +1. htslib baseline, +2. current seqair materialized pileup path, +3. seqair custom accumulator path. + +Commands should match the latest perbase benchmark setup: + +- release build, +- `--features seqair-pileup`, +- HG00157 chr1 10 Mb BAM subset, +- same BED/window split, +- TSV output to disk or same output sink for all runs. + +Benchmarks: + +- `base-depth` non-mate default filters: primary target. +- `base-depth -Q `: validates base-quality path and overhead. +- `base-depth -F 2304`: useful exact-parity control for secondary/supplementary behavior. +- `base-depth -m`: control, should remain unchanged until a mate-aware accumulator exists. + +Success criteria: + +- Byte-for-byte parity with current htslib/reference outputs for non-mate mode. +- No regressions in existing seqair htslib pileup parity tests. +- Custom accumulator path is measurably faster than current seqair non-mate `base-depth`, or profiling clearly identifies a different bottleneck. + +--- + +# Phase 6: mate-aware follow-up, only if worthwhile + +If non-mate custom aggregation works, explore mate-aware aggregation. + +Challenges: + +- Need group-by-QNAME per column. +- Need compare multiple observations from the same QNAME using perbase's mate strategy. +- Need base quality and possibly recommended IUPAC base output. +- Current `AlignmentView::qname()` borrows from the store; storing borrowed QNAMEs in an accumulator is lifetime-sensitive. + +Possible approaches: + +1. Accumulator stores owned QNAME bytes or hashes for grouping. Simple, but allocates. +2. Accumulator stores `record_idx` and sorts/group by QNAME at finish using store lookups. This may resemble current materialized behavior, but could still avoid some wrapper work. +3. seqair provides a specialized qname-grouped column helper later. This is probably too downstream-specific for v1. + +Do not optimize this until the non-mate path shows the API is viable. + +--- + +# Implementation order + +Recommended PR sequence in the fork: + +1. **Spec-only PR or first commit**: add Tracey spec rules for summaries/custom aggregation. +2. **Summary PR**: + - add `BaseCounts`, `PileupOpCounts`, `PileupSummary`, + - add `PileupColumn::summary()`, + - switch `match_depth()` to summary, + - test summary against manual iteration. +3. **Internal sink refactor PR**: + - introduce private `ColumnSink`, + - keep `pileups()` output unchanged, + - prove with tests that behavior matches. +4. **Public accumulator PR**: + - expose `PileupColumnAccumulator`, context type, and `pileup_with`, + - add examples/tests. +5. **perbase branch**: + - use `RecordStore`, + - add non-mate accumulator, + - benchmark and compare outputs. +6. **API cleanup PR**: + - rename types/methods based on real perbase usage, + - decide whether fallible accumulator or first-class disposition is needed. + +--- + +# Open questions + +## Summary semantics + +- Should `insertion_count` include `ComplexIndel`, or should the API avoid a single insertion count and expose only precise `op_counts` initially? +- Should summary expose `truncated_by_max_depth` and pre-truncation depth now, or wait until needed? +- Should `BaseCounts` use `usize`, `u32`, or a small fixed array `[usize; 5]` internally? + +## Accumulator API + +- Should `observe_alignment` receive `PileupAlignment` by value, by reference, or an `AlignmentView`-like wrapper? +- Should output be required to be owned in v1? +- Should the first API be infallible only? +- Should there be both `pileup_with` for one column and `fold_columns` for full-region consumption? +- Should a custom accumulator be able to request pre-max-depth observations? + +## Perbase integration + +- How should perbase represent `PerbaseDisposition` without making seqair depend on perbase concepts? +- Can perbase's read filter be evaluated fully from `SlimRecord` fields (`flags`, `mapq`) for all current modes? +- How should complex indel insertion counts be handled to preserve current htslib parity? +- How much does avoiding the internal alignment `Vec` help compared with other costs such as CIGAR mapping, base decode, and output formatting? + +## Future performance work + +- If custom aggregation is still slower than htslib for non-mate `base-depth`, profile whether the bottleneck is: + - CIGAR position lookup, + - base/quality lookup, + - active set maintenance, + - record-store decode, + - output construction/serialization. +- If fail-only reads are common, consider decode profiles to skip SEQ/QUAL for fail-only records. +- For `only-depth`, pursue a separate CIGAR/interval-only API rather than forcing it through pileup. diff --git a/crates/seqair/src/bam.rs b/crates/seqair/src/bam.rs index c304a055..767c1663 100644 --- a/crates/seqair/src/bam.rs +++ b/crates/seqair/src/bam.rs @@ -80,7 +80,10 @@ pub use header::{BamHeader, BamHeaderError, ContigInfo}; pub use index::{AlignmentIndex, BaiError, BamIndex}; pub use nm_md::NmMdError; pub use owned_record::{OwnedBamRecord, OwnedRecordError}; -pub use pileup::{AlignmentView, PileupColumn, PileupEngine, PileupGuard, PileupOp, RefSeq}; +pub use pileup::{ + AlignmentView, BaseCounts, PileupColumn, PileupColumnAccumulator, PileupColumnContext, + PileupEngine, PileupGuard, PileupOp, PileupOpCounts, PileupSummary, RefSeq, +}; pub use reader::{BamError, BamShared, IndexedBamReader}; // `record` is intentionally not re-exported as a type. The production decode // path is `RecordStore::push_raw`; the `record` module exposes only shared diff --git a/crates/seqair/src/bam/pileup.rs b/crates/seqair/src/bam/pileup.rs index 49e3748e..02c4706d 100644 --- a/crates/seqair/src/bam/pileup.rs +++ b/crates/seqair/src/bam/pileup.rs @@ -166,6 +166,115 @@ fn base_qual_at( (store.seq_at(active.record_idx, qpos as usize), q) } +// r[impl pileup.summary_base_counts] +/// Counts of canonical and unknown bases observed in a pileup column. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct BaseCounts { + /// Count of `A` bases. + pub a: usize, + /// Count of `C` bases. + pub c: usize, + /// Count of `G` bases. + pub g: usize, + /// Count of `T` bases. + pub t: usize, + /// Count of unknown bases. + pub unknown: usize, +} + +impl BaseCounts { + /// Add one observed base to the counts. + pub fn add_base(&mut self, base: Base) { + match base { + Base::A => self.a = self.a.saturating_add(1), + Base::C => self.c = self.c.saturating_add(1), + Base::G => self.g = self.g.saturating_add(1), + Base::T => self.t = self.t.saturating_add(1), + Base::Unknown => self.unknown = self.unknown.saturating_add(1), + } + } +} + +/// Counts of raw [`PileupOp`] variants observed in a pileup column. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct PileupOpCounts { + /// Count of [`PileupOp::Match`] operations. + pub match_ops: usize, + /// Count of [`PileupOp::Insertion`] operations. + pub insertion_ops: usize, + /// Count of [`PileupOp::Deletion`] operations. + pub deletion_ops: usize, + /// Count of [`PileupOp::ComplexIndel`] operations. + pub complex_indel_ops: usize, + /// Count of [`PileupOp::RefSkip`] operations. + pub refskip_ops: usize, +} + +// r[impl pileup.summary] +// r[impl pileup.summary_indel_counts] +/// Common counts for a single emitted pileup column. +/// +/// Summary counts describe the same post-max-depth alignments exposed by +/// [`PileupColumn::raw_alignments`]. They do not apply downstream read filters +/// or base-quality thresholds. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct PileupSummary { + /// Total number of emitted alignments. + pub depth: usize, + /// Number of emitted alignments with a query base. + pub match_depth: usize, + /// Number of emitted alignments with an insertion signal. + pub insertion_count: usize, + /// Number of emitted alignments with a deletion signal. + pub deletion_count: usize, + /// Number of emitted alignments with a reference-skip signal. + pub ref_skip_count: usize, + /// Counts of raw pileup operation variants. + pub op_counts: PileupOpCounts, + /// Counts of bases from operations that carry a query base. + pub base_counts: BaseCounts, +} + +impl PileupSummary { + /// Add one emitted alignment to the summary. + pub fn observe(&mut self, alignment: &PileupAlignment) { + self.depth = self.depth.saturating_add(1); + match alignment.op { + PileupOp::Match { base, .. } => { + self.match_depth = self.match_depth.saturating_add(1); + self.op_counts.match_ops = self.op_counts.match_ops.saturating_add(1); + self.base_counts.add_base(base); + } + PileupOp::Insertion { base, .. } => { + self.match_depth = self.match_depth.saturating_add(1); + self.insertion_count = self.insertion_count.saturating_add(1); + self.op_counts.insertion_ops = self.op_counts.insertion_ops.saturating_add(1); + self.base_counts.add_base(base); + } + PileupOp::Deletion { .. } => { + self.deletion_count = self.deletion_count.saturating_add(1); + self.op_counts.deletion_ops = self.op_counts.deletion_ops.saturating_add(1); + } + PileupOp::ComplexIndel { insert_len, is_refskip, .. } => { + self.op_counts.complex_indel_ops = + self.op_counts.complex_indel_ops.saturating_add(1); + if insert_len > 0 { + self.insertion_count = self.insertion_count.saturating_add(1); + } + if is_refskip { + self.ref_skip_count = self.ref_skip_count.saturating_add(1); + } else { + self.deletion_count = self.deletion_count.saturating_add(1); + } + } + PileupOp::RefSkip => { + self.ref_skip_count = self.ref_skip_count.saturating_add(1); + self.op_counts.refskip_ops = self.op_counts.refskip_ops.saturating_add(1); + } + } + } +} + // r[impl pileup.column_contents] // r[impl pileup.htslib_compat] // r[impl pileup.lending_iterator] @@ -179,6 +288,7 @@ pub struct PileupColumn<'store, U = ()> { pos: Pos0, reference_base: Base, alignments: Vec, + summary: PileupSummary, store: &'store RecordStore, } @@ -187,7 +297,7 @@ impl std::fmt::Debug for PileupColumn<'_, U> { f.debug_struct("PileupColumn") .field("pos", &self.pos) .field("reference_base", &self.reference_base) - .field("depth", &self.alignments.len()) + .field("depth", &self.summary.depth) .finish_non_exhaustive() } } @@ -201,7 +311,7 @@ impl<'store, U> PileupColumn<'store, U> { // r[impl pileup_indel.depth_includes_all] #[must_use] pub fn depth(&self) -> usize { - self.alignments.len() + self.summary.depth } /// Iterate alignments with store access for per-record extras, qname, and aux. @@ -228,6 +338,13 @@ impl<'store, U> PileupColumn<'store, U> { self.store } + /// Common counts for this emitted column. + // r[impl pileup.summary] + #[must_use] + pub fn summary(&self) -> &PileupSummary { + &self.summary + } + /// Count of alignments with a query base at this position. /// /// Unlike [`depth`](Self::depth), deletions and ref-skips are not counted. @@ -235,7 +352,37 @@ impl<'store, U> PileupColumn<'store, U> { /// the position with a base (e.g., for allele frequency calculations). #[must_use] pub fn match_depth(&self) -> usize { - self.alignments.iter().filter(|a| a.qpos().is_some()).count() + self.summary.match_depth + } + + /// Count of alignments with an insertion signal at this position. + #[must_use] + pub fn insertion_count(&self) -> usize { + self.summary.insertion_count + } + + /// Count of alignments with a deletion signal at this position. + #[must_use] + pub fn deletion_count(&self) -> usize { + self.summary.deletion_count + } + + /// Count of alignments with a reference-skip signal at this position. + #[must_use] + pub fn ref_skip_count(&self) -> usize { + self.summary.ref_skip_count + } + + /// Base counts for operations with a query base at this position. + #[must_use] + pub fn base_counts(&self) -> BaseCounts { + self.summary.base_counts + } + + /// Raw pileup operation counts for this position. + #[must_use] + pub fn op_counts(&self) -> PileupOpCounts { + self.summary.op_counts } } @@ -431,6 +578,153 @@ impl PileupAlignment { } } +/// Metadata supplied when a custom pileup accumulator finishes a column. +pub struct PileupColumnContext<'store, U> { + pos: Pos0, + reference_base: Base, + depth: usize, + store: &'store RecordStore, +} + +impl<'store, U> PileupColumnContext<'store, U> { + /// Reference position for the column. + #[must_use] + pub fn pos(&self) -> Pos0 { + self.pos + } + + /// Reference base for the column, or [`Base::Unknown`] when unavailable. + pub fn reference_base(&self) -> Base { + self.reference_base + } + + /// Number of alignments observed by the accumulator. + #[must_use] + pub fn depth(&self) -> usize { + self.depth + } + + /// Borrow the record store for per-record extras, names, and auxiliary data. + #[must_use] + pub fn store(&self) -> &'store RecordStore { + self.store + } +} + +// r[impl pileup.custom_aggregation] +/// Accumulates caller-defined output while the pileup engine builds one column. +/// +/// This is an opt-in alternative to [`PileupEngine::pileups`] for callers that +/// want to compute per-column summaries without materializing a public +/// [`PileupColumn`]. The accumulator observes the same post-max-depth +/// alignments that `pileups()` would expose. +pub trait PileupColumnAccumulator { + /// Owned output produced for each emitted column. + type Output; + + /// Start a new emitted pileup column. + fn begin_column(&mut self, pos: Pos0, reference_base: Base); + + /// Optional reservation hint for accumulators that store per-alignment data. + fn reserve_column(&mut self, _active_upper_bound: usize) {} + + /// Observe one emitted alignment. + fn observe_alignment(&mut self, alignment: PileupAlignment, store: &RecordStore); + + /// Finish the current emitted column. + /// + /// Returning `None` skips user-visible output for this column; iteration will + /// continue with the next column. + fn finish_column(&mut self, context: PileupColumnContext<'_, U>) -> Option; +} + +enum AccumulatorAdvance { + End, + Skip, + Output(T), +} + +struct MaterializedColumn { + pos: Pos0, + reference_base: Base, + alignments: Vec, + summary: PileupSummary, +} + +#[derive(Default)] +struct MaterializeColumnAccumulator { + pos: Option, + reference_base: Base, + alignments: Vec, + summary: PileupSummary, +} + +impl PileupColumnAccumulator for MaterializeColumnAccumulator { + type Output = MaterializedColumn; + + fn begin_column(&mut self, pos: Pos0, reference_base: Base) { + self.pos = Some(pos); + self.reference_base = reference_base; + self.alignments.clear(); + self.summary = PileupSummary::default(); + } + + fn reserve_column(&mut self, active_upper_bound: usize) { + self.alignments.reserve(active_upper_bound); + } + + fn observe_alignment(&mut self, alignment: PileupAlignment, _store: &RecordStore) { + self.summary.observe(&alignment); + self.alignments.push(alignment); + } + + fn finish_column(&mut self, _context: PileupColumnContext<'_, U>) -> Option { + let pos = self.pos.take()?; + Some(MaterializedColumn { + pos, + reference_base: self.reference_base, + alignments: std::mem::take(&mut self.alignments), + summary: self.summary, + }) + } +} + +fn build_alignment( + store: &RecordStore, + active: &ActiveRecord, + pos: Pos0, +) -> Option { + let info = active.cigar.pos_info_at(pos)?; + + let op = match info { + CigarPosInfo::Match { qpos } => { + let (base, qual) = base_qual_at(store, active, qpos); + PileupOp::Match { qpos, base, qual } + } + CigarPosInfo::Insertion { qpos, insert_len } => { + let (base, qual) = base_qual_at(store, active, qpos); + PileupOp::Insertion { qpos, base, qual, insert_len } + } + CigarPosInfo::Deletion { del_len } => PileupOp::Deletion { del_len }, + // r[impl pileup_indel.complex_indel] + CigarPosInfo::ComplexIndel { del_len, insert_len, is_refskip } => { + PileupOp::ComplexIndel { del_len, insert_len, is_refskip } + } + CigarPosInfo::RefSkip => PileupOp::RefSkip, + }; + + Some(PileupAlignment { + op, + mapq: active.mapq, + flags: active.flags, + strand: active.strand, + seq_len: active.seq_len, + matching_bases: active.matching_bases, + indel_bases: active.indel_bases, + record_idx: active.record_idx, + }) +} + impl PileupEngine { /// Create a pileup engine that owns the record store. pub fn new(store: RecordStore, region_start: Pos0, region_end: Pos0) -> Self { @@ -611,14 +905,15 @@ impl Drop for PileupGuard<'_, U> { // r[impl pileup.position_iteration] impl PileupEngine { - /// Core iteration logic used by [`pileups`](Self::pileups). - /// - /// Returns the per-column data (pos, reference base, alignments) so the caller - /// can attach a store reference to build a [`PileupColumn`]. - fn advance(&mut self) -> Option<(Pos0, Base, Vec)> { + /// Core iteration logic used by [`pileups`](Self::pileups) and + /// [`pileup_with`](Self::pileup_with). + fn advance_with_accumulator(&mut self, accumulator: &mut A) -> AccumulatorAdvance + where + A: PileupColumnAccumulator, + { loop { if self.current_pos > self.region_end { - return None; + return AccumulatorAdvance::End; } let pos = self.current_pos; @@ -629,10 +924,14 @@ impl PileupEngine { reason = "infallible: current_pos <= region_end < u32::MAX - 1" )] { - self.current_pos = self + let Some(next_pos) = self .current_pos .checked_add_offset(Offset::new(1)) - .trace_err("BUG: current_pos + 1 overflowed despite being <= region_end")?; + .trace_err("BUG: current_pos + 1 overflowed despite being <= region_end") + else { + return AccumulatorAdvance::End; + }; + self.current_pos = next_pos; } // Evict expired records. Iterate the compact end_pos vec (4-byte stride) @@ -649,7 +948,12 @@ impl PileupEngine { self.active_end_pos.swap_remove(i); self.active.swap_remove(i); } else { - i = i.checked_add(1).trace_err("active set size exceeded usize::MAX")?; + let Some(next_i) = + i.checked_add(1).trace_err("active set size exceeded usize::MAX") + else { + return AccumulatorAdvance::End; + }; + i = next_i; } } } @@ -666,8 +970,12 @@ impl PileupEngine { if rec.pos > pos { break; } - self.next_entry = - self.next_entry.checked_add(1).trace_err("next_entry exceeded usize::MAX")?; + let Some(next_entry) = + self.next_entry.checked_add(1).trace_err("next_entry exceeded usize::MAX") + else { + return AccumulatorAdvance::End; + }; + self.next_entry = next_entry; if rec.end_pos < pos { continue; @@ -678,8 +986,11 @@ impl PileupEngine { continue; } - let cigar = CigarMapping::new(rec.pos, self.store.cigar(idx)) - .trace_err("failed to generate cigar mapping")?; + let Some(cigar) = CigarMapping::new(rec.pos, self.store.cigar(idx)) + .trace_err("failed to generate cigar mapping") + else { + return AccumulatorAdvance::End; + }; self.active_end_pos.push(rec.end_pos); self.active.push(ActiveRecord { @@ -698,7 +1009,7 @@ impl PileupEngine { if self.active.is_empty() { // r[impl pileup.trailing_empty_termination] if self.next_entry >= self.store.len() { - return None; + return AccumulatorAdvance::End; } #[expect( clippy::cast_possible_truncation, @@ -720,61 +1031,70 @@ impl PileupEngine { // r[impl pileup_indel.deletions_included] // r[impl pileup_indel.refskips_included] // r[related record_store.field_access] - // r[impl perf.reuse_alignment_vec+2] - let mut alignments = Vec::with_capacity(self.active.len()); + // r[impl pileup.custom_aggregation_max_depth] + let max_depth = self.max_depth.map(|max| usize::try_from(max).unwrap_or(usize::MAX)); + let mut emitted_depth = 0usize; + let mut started = false; + let reference_base = self.ref_seq.as_ref().map_or(Base::Unknown, |r| r.base_at(pos)); + for active in &self.active { - let Some(info) = active.cigar.pos_info_at(pos) else { continue }; + if max_depth.is_some_and(|max| emitted_depth >= max) { + break; + } - let op = match info { - CigarPosInfo::Match { qpos } => { - let (base, qual) = base_qual_at(&self.store, active, qpos); - PileupOp::Match { qpos, base, qual } - } - CigarPosInfo::Insertion { qpos, insert_len } => { - let (base, qual) = base_qual_at(&self.store, active, qpos); - PileupOp::Insertion { qpos, base, qual, insert_len } - } - CigarPosInfo::Deletion { del_len } => PileupOp::Deletion { del_len }, - // r[impl pileup_indel.complex_indel] - CigarPosInfo::ComplexIndel { del_len, insert_len, is_refskip } => { - PileupOp::ComplexIndel { del_len, insert_len, is_refskip } - } - CigarPosInfo::RefSkip => PileupOp::RefSkip, - }; + let Some(alignment) = build_alignment(&self.store, active, pos) else { continue }; - alignments.push(PileupAlignment { - op, - mapq: active.mapq, - flags: active.flags, - strand: active.strand, - seq_len: active.seq_len, - matching_bases: active.matching_bases, - indel_bases: active.indel_bases, - record_idx: active.record_idx, - }); - } + if !started { + accumulator.begin_column(pos, reference_base); + accumulator.reserve_column(self.active.len()); + started = true; + } - // r[impl pileup.max_depth_per_position] - if let Some(max) = self.max_depth { - alignments.truncate(max as usize); + emitted_depth = emitted_depth.saturating_add(1); + accumulator.observe_alignment(alignment, &self.store); } - if !alignments.is_empty() { + if emitted_depth > 0 { self.columns_produced = self.columns_produced.saturating_add(1); - #[expect( - clippy::cast_possible_truncation, - reason = "depth is bounded by max_depth (u32) or typical pileup sizes; saturates at u32::MAX for profiling" - )] - let depth_u32 = alignments.len() as u32; + let depth_u32 = u32::try_from(emitted_depth).unwrap_or(u32::MAX); self.max_active_depth = self.max_active_depth.max(depth_u32); - let reference_base = - self.ref_seq.as_ref().map_or(Base::Unknown, |r| r.base_at(pos)); - return Some((pos, reference_base, alignments)); + let context = PileupColumnContext { + pos, + reference_base, + depth: emitted_depth, + store: &self.store, + }; + return match accumulator.finish_column(context) { + Some(output) => AccumulatorAdvance::Output(output), + None => AccumulatorAdvance::Skip, + }; + } + } + } + + // r[impl pileup.custom_aggregation] + /// Advance to the next accumulator-produced pileup output. + /// + /// The accumulator observes the same post-max-depth alignments that + /// [`pileups`](Self::pileups) would expose, but it can summarize them + /// without materializing a [`PileupColumn`]. If the accumulator returns + /// `None` from [`PileupColumnAccumulator::finish_column`], iteration skips + /// that column and continues to the next one. + pub fn pileup_with(&mut self, accumulator: &mut A) -> Option + where + A: PileupColumnAccumulator, + { + loop { + match self.advance_with_accumulator(accumulator) { + AccumulatorAdvance::End => return None, + AccumulatorAdvance::Skip => continue, + AccumulatorAdvance::Output(output) => return Some(output), } } } // r[impl pileup.lending_iterator] + // r[impl pileup.custom_aggregation_existing_api_unchanged] /// Advance to the next pileup column. /// /// Returns `Some(PileupColumn<'_, U>)` borrowing the record store. Call @@ -786,8 +1106,15 @@ impl PileupEngine { /// subsequent `pileups` calls. Extract primitive data (pos, depth, etc.) /// if you need to retain it. pub fn pileups(&mut self) -> Option> { - let (pos, reference_base, alignments) = self.advance()?; - Some(PileupColumn { pos, reference_base, alignments, store: &self.store }) + let mut accumulator = MaterializeColumnAccumulator::default(); + let materialized = self.pileup_with(&mut accumulator)?; + Some(PileupColumn { + pos: materialized.pos, + reference_base: materialized.reference_base, + alignments: materialized.alignments, + summary: materialized.summary, + store: &self.store, + }) } /// Remaining positions in the current region — lower-bound estimate for diff --git a/crates/seqair/tests/pileup.rs b/crates/seqair/tests/pileup.rs index 44d1d3a7..0b3b4519 100644 --- a/crates/seqair/tests/pileup.rs +++ b/crates/seqair/tests/pileup.rs @@ -16,7 +16,9 @@ mod helpers; use helpers::{cigar_op, collect_columns, make_record, make_record_with_cigar}; use proptest::prelude::*; -use seqair::bam::pileup::RefSeq; +use seqair::bam::pileup::{ + BaseCounts, PileupColumnAccumulator, PileupColumnContext, PileupOpCounts, PileupSummary, RefSeq, +}; use seqair::bam::{Pos0, RecordStore, pileup::PileupEngine}; use seqair_types::Base; use std::{cell::Cell, rc::Rc}; @@ -225,6 +227,214 @@ proptest! { } } +// ---- pileup.summary + pileup.custom_aggregation ---- + +fn summary_test_store() -> RecordStore { + let mut arena = RecordStore::new(); + arena.push_raw(&make_record(0, 0, 99, 60, 1), &mut ()).unwrap(); + arena.push_raw(&make_record(0, 0, 99, 60, 1), &mut ()).unwrap(); + arena + .push_raw( + &make_record_with_cigar(0, 0, 99, 60, &[cigar_op(1, 0), cigar_op(1, 1)], 2), + &mut (), + ) + .unwrap(); + arena.push_raw(&make_record_with_cigar(0, 0, 99, 60, &[cigar_op(1, 2)], 0), &mut ()).unwrap(); + arena.push_raw(&make_record_with_cigar(0, 0, 99, 60, &[cigar_op(1, 3)], 0), &mut ()).unwrap(); + arena +} + +// r[verify pileup.summary] +// r[verify pileup.summary_matches_alignments] +// r[verify pileup.summary_base_counts] +// r[verify pileup.summary_indel_counts] +#[test] +fn pileup_summary_counts_emitted_alignments() { + let mut engine = + PileupEngine::new(summary_test_store(), Pos0::new(0).unwrap(), Pos0::new(0).unwrap()); + let column = engine.pileups().unwrap(); + + let expected = PileupSummary { + depth: 5, + match_depth: 3, + insertion_count: 1, + deletion_count: 1, + ref_skip_count: 1, + op_counts: PileupOpCounts { + match_ops: 2, + insertion_ops: 1, + deletion_ops: 1, + complex_indel_ops: 0, + refskip_ops: 1, + }, + base_counts: BaseCounts { a: 3, c: 0, g: 0, t: 0, unknown: 0 }, + }; + + assert_eq!(*column.summary(), expected); + assert_eq!(column.depth(), expected.depth); + assert_eq!(column.match_depth(), expected.match_depth); + assert_eq!(column.insertion_count(), expected.insertion_count); + assert_eq!(column.deletion_count(), expected.deletion_count); + assert_eq!(column.ref_skip_count(), expected.ref_skip_count); + assert_eq!(column.base_counts(), expected.base_counts); + assert_eq!(column.op_counts(), expected.op_counts); + assert_eq!(column.raw_alignments().count(), expected.depth); +} + +// r[verify pileup.summary_matches_alignments] +// r[verify pileup.custom_aggregation_max_depth] +#[test] +fn summary_and_custom_aggregation_are_post_max_depth() { + let mut arena = RecordStore::new(); + for _ in 0..5 { + arena.push_raw(&make_record(0, 0, 99, 60, 1), &mut ()).unwrap(); + } + + let mut engine = PileupEngine::new(arena, Pos0::new(0).unwrap(), Pos0::new(0).unwrap()); + engine.set_max_depth(3); + let column = engine.pileups().unwrap(); + + assert_eq!(column.depth(), 3); + assert_eq!(column.summary().depth, 3); + assert_eq!(column.summary().match_depth, 3); + assert_eq!(column.summary().base_counts.a, 3); +} + +#[derive(Default)] +struct DepthAccumulator { + pos: Option, + reference_base: Base, + depth: usize, + match_depth: usize, +} + +impl PileupColumnAccumulator for DepthAccumulator { + type Output = (Pos0, Base, usize, usize); + + fn begin_column(&mut self, pos: Pos0, reference_base: Base) { + self.pos = Some(pos); + self.reference_base = reference_base; + self.depth = 0; + self.match_depth = 0; + } + + fn observe_alignment( + &mut self, + alignment: seqair::bam::pileup::PileupAlignment, + _store: &RecordStore, + ) { + self.depth += 1; + if alignment.qpos().is_some() { + self.match_depth += 1; + } + } + + fn finish_column(&mut self, context: PileupColumnContext<'_, U>) -> Option { + assert_eq!(context.depth(), self.depth); + Some((self.pos.take()?, self.reference_base, self.depth, self.match_depth)) + } +} + +// r[verify pileup.custom_aggregation] +// r[verify pileup.custom_aggregation_existing_api_unchanged] +#[test] +fn custom_accumulator_matches_materialized_column_counts() { + let mut materialized_engine = + PileupEngine::new(summary_test_store(), Pos0::new(0).unwrap(), Pos0::new(0).unwrap()); + let materialized = materialized_engine.pileups().unwrap(); + + let mut accumulator_engine = + PileupEngine::new(summary_test_store(), Pos0::new(0).unwrap(), Pos0::new(0).unwrap()); + let mut accumulator = DepthAccumulator::default(); + let (pos, reference_base, depth, match_depth) = + accumulator_engine.pileup_with(&mut accumulator).unwrap(); + + assert_eq!(pos, materialized.pos()); + assert_eq!(reference_base, materialized.reference_base()); + assert_eq!(depth, materialized.depth()); + assert_eq!(match_depth, materialized.match_depth()); +} + +// r[verify pileup.read_disposition_via_extras] +#[test] +fn custom_accumulator_can_use_extras_for_count_fail_drop() { + use seqair::bam::record_store::{CustomizeRecordStore, SlimRecord}; + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum Disposition { + Count, + FailOnly, + Drop, + } + + #[derive(Clone)] + struct ClassifyByMapq; + + impl CustomizeRecordStore for ClassifyByMapq { + type Extra = Disposition; + + fn filter(&mut self, rec: &SlimRecord, _store: &RecordStore) -> bool { + rec.mapq != 10 + } + + fn compute(&mut self, rec: &SlimRecord, _store: &RecordStore) -> Disposition { + match rec.mapq { + 0 => Disposition::FailOnly, + 10 => Disposition::Drop, + _ => Disposition::Count, + } + } + } + + #[derive(Default)] + struct DispositionAccumulator { + count: usize, + fail: usize, + } + + impl PileupColumnAccumulator for DispositionAccumulator { + type Output = (usize, usize, usize); + + fn begin_column(&mut self, _pos: Pos0, _reference_base: Base) { + self.count = 0; + self.fail = 0; + } + + fn observe_alignment( + &mut self, + alignment: seqair::bam::pileup::PileupAlignment, + store: &RecordStore, + ) { + match store.extra(alignment.record_idx()) { + Disposition::Count => self.count += 1, + Disposition::FailOnly => self.fail += 1, + Disposition::Drop => panic!("dropped records must not reach pileup"), + } + } + + fn finish_column( + &mut self, + context: PileupColumnContext<'_, Disposition>, + ) -> Option { + Some((context.depth(), self.count, self.fail)) + } + } + + let mut arena = RecordStore::new(); + let mut customize = ClassifyByMapq; + arena.push_raw(&make_record(0, 0, 99, 60, 1), &mut customize).unwrap(); + arena.push_raw(&make_record(0, 0, 99, 0, 1), &mut customize).unwrap(); + arena.push_raw(&make_record(0, 0, 99, 10, 1), &mut customize).unwrap(); + + let mut engine = PileupEngine::new(arena, Pos0::new(0).unwrap(), Pos0::new(0).unwrap()); + let mut accumulator = DispositionAccumulator::default(); + let (raw_depth, count, fail) = engine.pileup_with(&mut accumulator).unwrap(); + + assert_eq!(raw_depth, 2); + assert_eq!(count, 1); + assert_eq!(fail, 1); +} + // ---- pileup.qpos ---- // r[verify pileup.qpos] diff --git a/docs/spec/4-pileup.md b/docs/spec/4-pileup.md index ffcb72ef..5daaec6d 100644 --- a/docs/spec/4-pileup.md +++ b/docs/spec/4-pileup.md @@ -31,6 +31,32 @@ The engine MUST support a maximum depth setting. When more records pass the filt r[pileup.max_depth_per_position] Max depth MUST be enforced per-position, not globally at arena-load time. A read that is rejected at one high-coverage position may still be included at an adjacent lower-coverage position. This matches htslib's behavior where `MAXCNT` is checked against the current column's depth. +## Column summaries and aggregation + +r[pileup.summary] +A `PileupColumn` SHOULD expose a `PileupSummary` containing common counts for the emitted column: total depth, match depth, operation counts, insertion/deletion/ref-skip signal counts, and A/C/G/T/Unknown base counts. + +r[pileup.summary_matches_alignments] +`PileupColumn::summary()` MUST describe exactly the alignments exposed by `PileupColumn::raw_alignments()`. When max depth is set, summary counts are post-truncation and MUST match a caller recomputing the same counts by iterating the emitted alignments. + +r[pileup.summary_base_counts] +Summary base counts MUST include only pileup operations with a query base (`Match` and `Insertion`). `Base::Unknown` MUST be counted separately from A/C/G/T. + +r[pileup.summary_indel_counts] +Summary insertion/deletion/ref-skip signal counts MUST be defined in terms of `PileupOp`: insertion signals include simple insertions and complex indels with an insertion length; deletion signals include deletions and non-refskip complex indels; ref-skip signals include ref-skips and refskip complex indels. + +r[pileup.custom_aggregation] +The engine MAY expose an opt-in custom aggregation API that lets callers observe emitted alignments for each column and produce caller-defined owned output. + +r[pileup.custom_aggregation_existing_api_unchanged] +Custom aggregation MUST NOT change the behavior of `PileupEngine::pileups()` or the lending lifetime of `PileupColumn`. + +r[pileup.custom_aggregation_max_depth] +Custom aggregation MUST observe the same alignments that `PileupEngine::pileups()` would expose after max-depth truncation. + +r[pileup.read_disposition_via_extras] +Downstream tools MAY represent count/fail/drop-style read disposition using `RecordStore` extras: dropped reads are rejected by `CustomizeRecordStore`, while kept reads carry caller-defined extras that custom aggregators can inspect through the store. + ## Query position The query position (`qpos`) maps a reference position back into a read's coordinate system. For example, if a read starts at reference position 100 and has CIGAR `50M`, then at reference position 125, `qpos` = 25. For complex CIGARs with insertions or deletions, the mapping requires walking the CIGAR operations (see `cigar.md`).