Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ASSETS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Run the command from the root of your bundle project. The CLI will prompt for an
| `sdp-checkpoint-recovery` | Reset checkpoint selection on a Lakeflow Spark Declarative Pipeline after a source table has been dropped and recreated. | Stable | [README](assets/sdp-checkpoint-recovery/README.md) |
| `dbx-ro-query` | Dependency-free Python wrapper around `databricks experimental aitools tools query` that gives LLM agents a guarded read-only SQL window into a Databricks workspace. Ships a `SKILL.md` for agent integration. | Stable | [README](assets/dbx-ro-query/README.md) |
| `monitoring-sql-warehouse` | Small, dedicated serverless SQL warehouse (2X-Small, `auto_stop_mins: 1`) for scheduled Databricks Alerts and monitoring queries. Keeps cost proportional to actual query time instead of idle warm-up. | Stable | [README](assets/monitoring-sql-warehouse/README.md) |
| `sdp-quarantine-pattern` | Lakeflow SDP pipeline demonstrating the inverse-expectations quarantine pattern on `samples.nyctaxi.trips`: critical (drop) expectations route bad rows into a separate quarantine table (silver schema), valid rows flow to silver, warn expectations log to a queryable event log. NULL-safe predicates keep the silver/quarantine split a clean partition. Ships a companion agent skill (`SKILL.md`) that adapts the pattern to your own dataset and self-verifies it. | Stable | [README](assets/sdp-quarantine-pattern/README.md) |

## What an asset is not

Expand Down
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/).

## [Unreleased]

## [1.9.0] - 2026-06-06

### Added
- **Asset `sdp-quarantine-pattern`**: Lakeflow Spark Declarative Pipeline demonstrating the inverse-expectations quarantine pattern on the public `samples.nyctaxi.trips` dataset.
- `raw_trips` ingests the read-only public sample as a streaming table (bronze schema). `clean_trips` applies the critical (drop) expectations; `quarantine_trips` applies their inverse, capturing exactly the rows the clean table drops; both land in the silver schema, following medallion schema separation. The schemas are the medallion layers; the table names describe content. Advisory (warn) expectations log to the pipeline event log without dropping. The public sample contains naturally invalid rows, so quarantine is populated on the first run.
- Drop predicates are written NULL-safe (total) so `expect_all_or_drop(drop)` and `expect_all_or_drop(NOT(drop))` partition the input exactly once. The README documents the three-valued-logic trap that breaks naive inverse expectations. Validated live on serverless SDP: the partition invariant held exactly across all cases (21,847 clean + 85 quarantine = 21,932 raw at baseline) and a NULL drop column routed to quarantine without leaking into the clean table.
- Rules live in a declarative `expectations.json`; a pure helper `expectations.py` (no pyspark import) loads them and derives the clean predicate (`build_silver_expr`) and its inverse (`build_quarantine_expr`). The asset also ships an offline unit-test suite (`<target_dir>/tests/`: a real local-Spark `pytest` that proves the partition invariant and NULL routing on crafted edge rows, modeling `expect_all_or_drop` keep-on-NULL semantics with `IS NOT FALSE`). The pipeline source is a Databricks notebook referenced from `resources/<pipeline_resource_key>.pipeline.yml` via a path relative to the resource file.
- The resource publishes a queryable event log (`event_log` block, table `quarantine_pipeline_event_log`) and ships `event_log_queries.sql` to parse per-expectation passed/failed counts; cost/trace tags identify the pipeline as created by this asset. In-bundle usage doc at `docs/sdp-quarantine-pattern/README.md`. All three pipeline tables are named with fully qualified `catalog.schema.table` identifiers so the source reads consistently and the in-pipeline dependencies resolve unambiguously; catalog, both schemas, and the source table flow to the pipeline through the resource `configuration` block (`quarantine.source` parameterizes the ingest so a test run can point at a curated dataset).
- **Companion agent skill (dual-door asset).** Alongside the reference pipeline, the asset installs an agent skill at `<skill_dir>/skills/sdp-quarantine-pattern/` (`SKILL.md` + `references/adapt-the-pattern.md` + `references/self-verify.md`) that instructs a coding agent to apply the quarantine pattern to the user's own dataset and verify it. The skill enforces the NULL-safe drop-predicate rule, splits the work at a deploy/run trust boundary (Phase A adapt + run the offline unit tests with no workspace; Phase B deploy and live-verify, defaulting to human-in-the-middle and delegating deploy/run mechanics to the runtime), and ships a three-tier verification ladder: offline rule unit tests (primary), live read-only audits (partition invariant + a NULL-in-clean check per dropped column), and an optional integration test via source parameterization. It includes a kickoff prompt template, is runtime-neutral about read-only query access (it uses whatever capability the agent's runtime provides), and states its limits: it applies and verifies a pattern, it does not guarantee correctness on arbitrary data. Seven prompts (`target_dir`, `pipeline_resource_key`, `pipeline_name`, `catalog`, `bronze_schema`, `silver_schema`, `skill_dir`) with safe defaults.

## [1.8.0] - 2026-05-30

### Added
Expand Down Expand Up @@ -185,6 +195,7 @@ Initial public release.
- L2 tests: YAML syntax, environment targets, content validation
- CI/CD tests: pipeline generation, auth patterns, branch references

[1.9.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.9.0
[1.8.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.8.0
[1.7.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.7.0
[1.6.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.6.0
Expand Down
6 changes: 6 additions & 0 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ Modular sub-templates installable via `databricks bundle init <repo-url> --templ
- `sdp-checkpoint-recovery`: Reset checkpoint selection on a Lakeflow Spark Declarative Pipeline after a source table has been dropped and recreated
- `dbx-ro-query`: Dependency-free Python wrapper around `databricks experimental aitools tools query` that gives LLM agents a guarded read-only SQL window into a Databricks workspace
- `monitoring-sql-warehouse`: Dedicated serverless SQL warehouse tuned for bursty workloads (scheduled Alerts, monitoring queries) with `auto_stop_mins: 1`
- `sdp-quarantine-pattern`: Lakeflow SDP pipeline demonstrating the inverse-expectations quarantine pattern (drop expectations route bad rows to a separate quarantine table) on `samples.nyctaxi.trips`, with medallion schema separation, a queryable event log, and NULL-safe predicates. Ships a companion agent skill that adapts the pattern to the user's own dataset and self-verifies it

**Possible future assets** (ideas for contributors, not commitments):
- `etl-pipeline`: Medallion-layered Declarative Pipeline with Bronze/Silver layers and DLT expectations
- `ingest-job`: Data ingestion job with error handling
- `ml-pipeline`: ML training pipeline with experiment tracking
- `dbt-project`: dbt integration with Unity Catalog
- `monitoring`: Observability dashboards and alerting
- `sdp-testing`: Reusable testing harness for Lakeflow SDP pipelines (local-Spark unit-test scaffold, source-parameterized integration tests on a test target, CI wiring). The `sdp-quarantine-pattern` asset ships a focused unit-test example; a dedicated asset would generalize the integration-test layer.

Contributions welcome. Each asset ships as its own PR following the authoring walkthrough in CONTRIBUTING.md.

Expand All @@ -79,6 +81,10 @@ Add metadata to generated resources so teams can track which template version wa

These are larger features that require more design work and community input before committing to implementation.

### Pre-commit Hooks

<fill-in from `https://www.waitingforcode.com/databricks/pre-commit-hook-declarative-automation-bundles-example/read`>

### Advanced Permissions Profiles

**Status**: Proposed
Expand Down
97 changes: 97 additions & 0 deletions assets/sdp-quarantine-pattern/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# sdp-quarantine-pattern

A Lakeflow Spark Declarative Pipeline that demonstrates the inverse-expectations quarantine pattern on the public `samples.nyctaxi.trips` dataset. Critical (drop) expectations route violating rows into a `quarantine_trips` table; valid rows flow to `clean_trips`; advisory (warn) expectations log to the pipeline event log without dropping.

## Install

```bash
databricks bundle init https://github.com/vmariiechko/databricks-bundle-template \
--template-dir assets/sdp-quarantine-pattern
```

You will be prompted for `target_dir`, `pipeline_resource_key`, `pipeline_name`, `catalog`, `bronze_schema`, `silver_schema`, and `skill_dir` (all with safe defaults). The asset installs the pipeline notebook, a declarative `expectations.json`, a pure helper module, an offline unit-test suite, event-log parsing queries, the DABs pipeline resource, and a companion agent skill.

## Two doors

This is a dual-door asset:

- **Door 1, the reference pipeline.** Copy it, point it at your catalog/schemas, deploy, and watch the quarantine pattern work on `samples.nyctaxi.trips`. This README documents it.
- **Door 2, the companion agent skill** at `<skill_dir>/skills/sdp-quarantine-pattern/SKILL.md`. Point a coding agent (Claude Code, Codex, or Genie Code on Databricks) at it and it adapts the pattern to your own dataset: it investigates your data, proposes drop/warn rules for you to confirm, keeps every drop predicate NULL-safe, adapts and runs the offline unit tests, wires the resource into your bundle, and verifies the result (offline unit tests first, then live read-only audits). It applies and verifies a pattern; it is not an auto-rewriter and does not guarantee correctness on arbitrary data. The in-bundle `docs/sdp-quarantine-pattern/README.md` has a ready-to-paste kickoff prompt.

## Architecture

```
samples.nyctaxi.trips
raw_trips (bronze schema)
├─► clean_trips (silver schema) @expect_all(warn) + @expect_all_or_drop(drop)
└─► quarantine_trips (silver schema) @expect_all_or_drop(NOT(drop))
```

The schemas are the medallion layers; the table names describe content. `raw_trips` ingests the read-only public sample as a streaming table and lands in the bronze schema (the pipeline default). `clean_trips` and `quarantine_trips` both read `raw_trips` and land in the silver schema, following medallion schema separation. The clean table applies the drop rules; quarantine applies their inverse. They read the same source once each, which is the tradeoff of this pattern: the data is processed twice.

The public sample already contains some invalid rows, so the quarantine table is populated on the first run with no extra setup.

## Expectations

Rules live in `expectations.json`, split into two lists. Drop rules are critical and route failures to quarantine; warn rules are advisory and only annotate the event log.

| Action | Name | Predicate |
|---|---|---|
| drop | `valid_fare` | `fare_amount IS NOT NULL AND fare_amount > 0` |
| drop | `valid_distance` | `trip_distance IS NOT NULL AND trip_distance > 0` |
| drop | `valid_pickup_ts` | `tpep_pickup_datetime IS NOT NULL` |
| drop | `valid_dropoff_after_pickup` | `tpep_dropoff_datetime IS NOT NULL AND tpep_dropoff_datetime > tpep_pickup_datetime` |
| drop | `valid_pickup_zip` | `pickup_zip IS NOT NULL` |
| warn | `fare_within_expected` | `fare_amount < 500` |
| warn | `distance_within_expected` | `trip_distance < 100` |
| warn | `dropoff_zip_present` | `dropoff_zip IS NOT NULL` |

The quarantine predicate is derived automatically as `NOT((p1) AND (p2) AND ...)` over the drop predicates. You do not maintain it by hand.

## The NULL trap

This is the point worth internalizing before you copy the pattern.

`expect_all_or_drop` drops a row only when its predicate evaluates to `false`; a predicate that evaluates to SQL `NULL` (unknown) is not `false`, so the row is kept (see the [`is false` operator](https://docs.databricks.com/aws/en/sql/language-manual/functions/isfalse): `NULL is false` is `false`). So if you write a drop rule as `fare_amount > 0` and a row has a NULL fare, the predicate is `NULL` and the row is kept in the clean table. The inverse `NOT(fare_amount > 0)` is also `NULL`, which is likewise not `false`, so the same row is kept in the quarantine table too. The one row lands in both tables, double-counted, and the clean/quarantine split is no longer a clean partition.

The fix is to make every drop predicate total (NULL-safe): guard each column test with `IS NOT NULL`, as in `fare_amount IS NOT NULL AND fare_amount > 0`. A total predicate never returns `NULL`, so `expect_all_or_drop(drop_rules)` and `expect_all_or_drop(NOT(drop_rules))` partition the raw input exactly once: every row lands in exactly one of the clean or quarantine table. This asset writes all drop predicates this way.

If you cannot guarantee total predicates (for example, rules loaded from an external system), the robust alternative is to compute an explicit `_quarantined` boolean with NULL-safe SQL once, then branch the clean and quarantine tables on that flag instead of on raw inverse predicates.

## Comparison: the status-column variant

An alternative implementation keeps a single table and adds a validation-status column that marks each row good or bad, partitioning on that column instead of using a separate table. Tradeoff: it avoids reading the source twice, but the per-expectation data-quality metrics do not surface in the pipeline event log the way they do with expectations. This asset builds the separate-quarantine-table approach; the status-column variant is noted here for context.

## Inspecting expectation results

The pipeline publishes its event log to a queryable UC table (`<catalog>.<bronze_schema>.quarantine_pipeline_event_log`). `event_log_queries.sql` parses it into per-expectation passed/failed counts for the latest run, plus a focused query for the warn expectations, which never route to quarantine and so only surface in the event log.

## Tests

The asset ships its own unit tests at `<target_dir>/tests/`: a real local-Spark `pytest` suite that proves the partition invariant (clean + quarantine == raw) and NULL routing on crafted edge rows, with no Databricks workspace. It models `expect_all_or_drop`'s keep-on-NULL semantics with `IS NOT FALSE`, so a drop predicate that is not NULL-safe fails the partition assertion. Run it with `pip install -r <target_dir>/tests/requirements-test.txt && pytest <target_dir>/tests/ -q` (needs Java for PySpark). The companion skill adapts these tests to your dataset as the first verification tier.

Repo-level tests (`tests/assets/test_sdp_quarantine_pattern.py`) verify the asset installs the expected files, the pipeline source parses, `expectations.py` builds the correct clean and inverse predicates, every drop predicate is NULL-safe, the resource YAML is valid (schema split, source parameterization, event log, tags, library path), and the skill and unit-test scaffold install. End-to-end expectation firing on the workspace is verified live (see below).

## Validation results

The pattern was validated live on serverless SDP against `samples.nyctaxi.trips`.

- **Partition invariant holds exactly.** Baseline run: 21,932 raw rows split into 21,847 clean and 85 quarantine (21,847 + 85 = 21,932). The split stayed exact across every test.
- **Quarantine works on real data out of the box.** The 85 baseline quarantine rows are naturally invalid rows in the public sample (74 zero-distance, 9 zero or negative fare, 2 with multiple violations). No injection needed to see the pattern work.
- **The NULL trap is real and the fix works.** A row with a NULL `fare_amount` routed to quarantine only and did not leak into the clean table, because `valid_fare` is NULL-safe. A naive `fare_amount > 0` would have treated the NULL as passing and leaked the row.
- **Multiple violations on one row produce exactly one quarantine row.** No duplication; the event log still attributes each failed expectation separately.
- **Warn rules behave as advisory.** A row that only violated `fare_within_expected` flowed to the clean table and was recorded in the event log; quarantine was unaffected.

Two lived findings worth carrying into your own pipelines:

- `spark.catalog.tableExists()` is not whitelisted inside SDP (it raises `Py4JSecurityException`). Avoid it in pipeline code; this asset does not use it.
- A streaming table's set of input sources is fixed at its first run. Changing it later (for example, adding a union source) is incompatible with the existing checkpoint and requires a full refresh.

## What this asset is

A standalone sub-template in the [databricks-bundle-template](https://github.com/vmariiechko/databricks-bundle-template) asset library. It does not depend on the core template; it can be installed into any Databricks bundle. See [ASSETS.md](../../ASSETS.md) for the full catalog.

It is a demonstration of one specific pattern with its tradeoffs documented, not a general-purpose data-quality framework. For a fuller framework, look at [DQX](https://databrickslabs.github.io/dqx/) (Databricks Labs).
Loading
Loading