diff --git a/ASSETS.md b/ASSETS.md index f3f1536..ee27710 100644 --- a/ASSETS.md +++ b/ASSETS.md @@ -20,6 +20,7 @@ Run the command from the root of your bundle project. The CLI will prompt for an | `sdp-checkpoint-recovery` | Reset checkpoint selection on a Lakeflow Spark Declarative Pipeline after a source table has been dropped and recreated. | Stable | [README](assets/sdp-checkpoint-recovery/README.md) | | `dbx-ro-query` | Dependency-free Python wrapper around `databricks experimental aitools tools query` that gives LLM agents a guarded read-only SQL window into a Databricks workspace. Ships a `SKILL.md` for agent integration. | Stable | [README](assets/dbx-ro-query/README.md) | | `monitoring-sql-warehouse` | Small, dedicated serverless SQL warehouse (2X-Small, `auto_stop_mins: 1`) for scheduled Databricks Alerts and monitoring queries. Keeps cost proportional to actual query time instead of idle warm-up. | Stable | [README](assets/monitoring-sql-warehouse/README.md) | +| `sdp-quarantine-pattern` | Lakeflow SDP pipeline demonstrating the inverse-expectations quarantine pattern on `samples.nyctaxi.trips`: critical (drop) expectations route bad rows into a separate quarantine table (silver schema), valid rows flow to silver, warn expectations log to a queryable event log. NULL-safe predicates keep the silver/quarantine split a clean partition. Ships a companion agent skill (`SKILL.md`) that adapts the pattern to your own dataset and self-verifies it. | Stable | [README](assets/sdp-quarantine-pattern/README.md) | ## What an asset is not diff --git a/CHANGELOG.md b/CHANGELOG.md index d84dff1..205436b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/). ## [Unreleased] +## [1.9.0] - 2026-06-06 + +### Added +- **Asset `sdp-quarantine-pattern`**: Lakeflow Spark Declarative Pipeline demonstrating the inverse-expectations quarantine pattern on the public `samples.nyctaxi.trips` dataset. + - `raw_trips` ingests the read-only public sample as a streaming table (bronze schema). `clean_trips` applies the critical (drop) expectations; `quarantine_trips` applies their inverse, capturing exactly the rows the clean table drops; both land in the silver schema, following medallion schema separation. The schemas are the medallion layers; the table names describe content. Advisory (warn) expectations log to the pipeline event log without dropping. The public sample contains naturally invalid rows, so quarantine is populated on the first run. + - Drop predicates are written NULL-safe (total) so `expect_all_or_drop(drop)` and `expect_all_or_drop(NOT(drop))` partition the input exactly once. The README documents the three-valued-logic trap that breaks naive inverse expectations. Validated live on serverless SDP: the partition invariant held exactly across all cases (21,847 clean + 85 quarantine = 21,932 raw at baseline) and a NULL drop column routed to quarantine without leaking into the clean table. + - Rules live in a declarative `expectations.json`; a pure helper `expectations.py` (no pyspark import) loads them and derives the clean predicate (`build_silver_expr`) and its inverse (`build_quarantine_expr`). The asset also ships an offline unit-test suite (`/tests/`: a real local-Spark `pytest` that proves the partition invariant and NULL routing on crafted edge rows, modeling `expect_all_or_drop` keep-on-NULL semantics with `IS NOT FALSE`). The pipeline source is a Databricks notebook referenced from `resources/.pipeline.yml` via a path relative to the resource file. + - The resource publishes a queryable event log (`event_log` block, table `quarantine_pipeline_event_log`) and ships `event_log_queries.sql` to parse per-expectation passed/failed counts; cost/trace tags identify the pipeline as created by this asset. In-bundle usage doc at `docs/sdp-quarantine-pattern/README.md`. All three pipeline tables are named with fully qualified `catalog.schema.table` identifiers so the source reads consistently and the in-pipeline dependencies resolve unambiguously; catalog, both schemas, and the source table flow to the pipeline through the resource `configuration` block (`quarantine.source` parameterizes the ingest so a test run can point at a curated dataset). + - **Companion agent skill (dual-door asset).** Alongside the reference pipeline, the asset installs an agent skill at `/skills/sdp-quarantine-pattern/` (`SKILL.md` + `references/adapt-the-pattern.md` + `references/self-verify.md`) that instructs a coding agent to apply the quarantine pattern to the user's own dataset and verify it. The skill enforces the NULL-safe drop-predicate rule, splits the work at a deploy/run trust boundary (Phase A adapt + run the offline unit tests with no workspace; Phase B deploy and live-verify, defaulting to human-in-the-middle and delegating deploy/run mechanics to the runtime), and ships a three-tier verification ladder: offline rule unit tests (primary), live read-only audits (partition invariant + a NULL-in-clean check per dropped column), and an optional integration test via source parameterization. It includes a kickoff prompt template, is runtime-neutral about read-only query access (it uses whatever capability the agent's runtime provides), and states its limits: it applies and verifies a pattern, it does not guarantee correctness on arbitrary data. Seven prompts (`target_dir`, `pipeline_resource_key`, `pipeline_name`, `catalog`, `bronze_schema`, `silver_schema`, `skill_dir`) with safe defaults. + ## [1.8.0] - 2026-05-30 ### Added @@ -185,6 +195,7 @@ Initial public release. - L2 tests: YAML syntax, environment targets, content validation - CI/CD tests: pipeline generation, auth patterns, branch references +[1.9.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.9.0 [1.8.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.8.0 [1.7.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.7.0 [1.6.0]: https://github.com/vmariiechko/databricks-bundle-template/releases/tag/v1.6.0 diff --git a/ROADMAP.md b/ROADMAP.md index cf00756..7fe6721 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -49,6 +49,7 @@ Modular sub-templates installable via `databricks bundle init --templ - `sdp-checkpoint-recovery`: Reset checkpoint selection on a Lakeflow Spark Declarative Pipeline after a source table has been dropped and recreated - `dbx-ro-query`: Dependency-free Python wrapper around `databricks experimental aitools tools query` that gives LLM agents a guarded read-only SQL window into a Databricks workspace - `monitoring-sql-warehouse`: Dedicated serverless SQL warehouse tuned for bursty workloads (scheduled Alerts, monitoring queries) with `auto_stop_mins: 1` +- `sdp-quarantine-pattern`: Lakeflow SDP pipeline demonstrating the inverse-expectations quarantine pattern (drop expectations route bad rows to a separate quarantine table) on `samples.nyctaxi.trips`, with medallion schema separation, a queryable event log, and NULL-safe predicates. Ships a companion agent skill that adapts the pattern to the user's own dataset and self-verifies it **Possible future assets** (ideas for contributors, not commitments): - `etl-pipeline`: Medallion-layered Declarative Pipeline with Bronze/Silver layers and DLT expectations @@ -56,6 +57,7 @@ Modular sub-templates installable via `databricks bundle init --templ - `ml-pipeline`: ML training pipeline with experiment tracking - `dbt-project`: dbt integration with Unity Catalog - `monitoring`: Observability dashboards and alerting +- `sdp-testing`: Reusable testing harness for Lakeflow SDP pipelines (local-Spark unit-test scaffold, source-parameterized integration tests on a test target, CI wiring). The `sdp-quarantine-pattern` asset ships a focused unit-test example; a dedicated asset would generalize the integration-test layer. Contributions welcome. Each asset ships as its own PR following the authoring walkthrough in CONTRIBUTING.md. @@ -79,6 +81,10 @@ Add metadata to generated resources so teams can track which template version wa These are larger features that require more design work and community input before committing to implementation. +### Pre-commit Hooks + + + ### Advanced Permissions Profiles **Status**: Proposed diff --git a/assets/sdp-quarantine-pattern/README.md b/assets/sdp-quarantine-pattern/README.md new file mode 100644 index 0000000..27c9a3a --- /dev/null +++ b/assets/sdp-quarantine-pattern/README.md @@ -0,0 +1,97 @@ +# sdp-quarantine-pattern + +A Lakeflow Spark Declarative Pipeline that demonstrates the inverse-expectations quarantine pattern on the public `samples.nyctaxi.trips` dataset. Critical (drop) expectations route violating rows into a `quarantine_trips` table; valid rows flow to `clean_trips`; advisory (warn) expectations log to the pipeline event log without dropping. + +## Install + +```bash +databricks bundle init https://github.com/vmariiechko/databricks-bundle-template \ + --template-dir assets/sdp-quarantine-pattern +``` + +You will be prompted for `target_dir`, `pipeline_resource_key`, `pipeline_name`, `catalog`, `bronze_schema`, `silver_schema`, and `skill_dir` (all with safe defaults). The asset installs the pipeline notebook, a declarative `expectations.json`, a pure helper module, an offline unit-test suite, event-log parsing queries, the DABs pipeline resource, and a companion agent skill. + +## Two doors + +This is a dual-door asset: + +- **Door 1, the reference pipeline.** Copy it, point it at your catalog/schemas, deploy, and watch the quarantine pattern work on `samples.nyctaxi.trips`. This README documents it. +- **Door 2, the companion agent skill** at `/skills/sdp-quarantine-pattern/SKILL.md`. Point a coding agent (Claude Code, Codex, or Genie Code on Databricks) at it and it adapts the pattern to your own dataset: it investigates your data, proposes drop/warn rules for you to confirm, keeps every drop predicate NULL-safe, adapts and runs the offline unit tests, wires the resource into your bundle, and verifies the result (offline unit tests first, then live read-only audits). It applies and verifies a pattern; it is not an auto-rewriter and does not guarantee correctness on arbitrary data. The in-bundle `docs/sdp-quarantine-pattern/README.md` has a ready-to-paste kickoff prompt. + +## Architecture + +``` +samples.nyctaxi.trips + │ + ▼ + raw_trips (bronze schema) + │ + ├─► clean_trips (silver schema) @expect_all(warn) + @expect_all_or_drop(drop) + └─► quarantine_trips (silver schema) @expect_all_or_drop(NOT(drop)) +``` + +The schemas are the medallion layers; the table names describe content. `raw_trips` ingests the read-only public sample as a streaming table and lands in the bronze schema (the pipeline default). `clean_trips` and `quarantine_trips` both read `raw_trips` and land in the silver schema, following medallion schema separation. The clean table applies the drop rules; quarantine applies their inverse. They read the same source once each, which is the tradeoff of this pattern: the data is processed twice. + +The public sample already contains some invalid rows, so the quarantine table is populated on the first run with no extra setup. + +## Expectations + +Rules live in `expectations.json`, split into two lists. Drop rules are critical and route failures to quarantine; warn rules are advisory and only annotate the event log. + +| Action | Name | Predicate | +|---|---|---| +| drop | `valid_fare` | `fare_amount IS NOT NULL AND fare_amount > 0` | +| drop | `valid_distance` | `trip_distance IS NOT NULL AND trip_distance > 0` | +| drop | `valid_pickup_ts` | `tpep_pickup_datetime IS NOT NULL` | +| drop | `valid_dropoff_after_pickup` | `tpep_dropoff_datetime IS NOT NULL AND tpep_dropoff_datetime > tpep_pickup_datetime` | +| drop | `valid_pickup_zip` | `pickup_zip IS NOT NULL` | +| warn | `fare_within_expected` | `fare_amount < 500` | +| warn | `distance_within_expected` | `trip_distance < 100` | +| warn | `dropoff_zip_present` | `dropoff_zip IS NOT NULL` | + +The quarantine predicate is derived automatically as `NOT((p1) AND (p2) AND ...)` over the drop predicates. You do not maintain it by hand. + +## The NULL trap + +This is the point worth internalizing before you copy the pattern. + +`expect_all_or_drop` drops a row only when its predicate evaluates to `false`; a predicate that evaluates to SQL `NULL` (unknown) is not `false`, so the row is kept (see the [`is false` operator](https://docs.databricks.com/aws/en/sql/language-manual/functions/isfalse): `NULL is false` is `false`). So if you write a drop rule as `fare_amount > 0` and a row has a NULL fare, the predicate is `NULL` and the row is kept in the clean table. The inverse `NOT(fare_amount > 0)` is also `NULL`, which is likewise not `false`, so the same row is kept in the quarantine table too. The one row lands in both tables, double-counted, and the clean/quarantine split is no longer a clean partition. + +The fix is to make every drop predicate total (NULL-safe): guard each column test with `IS NOT NULL`, as in `fare_amount IS NOT NULL AND fare_amount > 0`. A total predicate never returns `NULL`, so `expect_all_or_drop(drop_rules)` and `expect_all_or_drop(NOT(drop_rules))` partition the raw input exactly once: every row lands in exactly one of the clean or quarantine table. This asset writes all drop predicates this way. + +If you cannot guarantee total predicates (for example, rules loaded from an external system), the robust alternative is to compute an explicit `_quarantined` boolean with NULL-safe SQL once, then branch the clean and quarantine tables on that flag instead of on raw inverse predicates. + +## Comparison: the status-column variant + +An alternative implementation keeps a single table and adds a validation-status column that marks each row good or bad, partitioning on that column instead of using a separate table. Tradeoff: it avoids reading the source twice, but the per-expectation data-quality metrics do not surface in the pipeline event log the way they do with expectations. This asset builds the separate-quarantine-table approach; the status-column variant is noted here for context. + +## Inspecting expectation results + +The pipeline publishes its event log to a queryable UC table (`..quarantine_pipeline_event_log`). `event_log_queries.sql` parses it into per-expectation passed/failed counts for the latest run, plus a focused query for the warn expectations, which never route to quarantine and so only surface in the event log. + +## Tests + +The asset ships its own unit tests at `/tests/`: a real local-Spark `pytest` suite that proves the partition invariant (clean + quarantine == raw) and NULL routing on crafted edge rows, with no Databricks workspace. It models `expect_all_or_drop`'s keep-on-NULL semantics with `IS NOT FALSE`, so a drop predicate that is not NULL-safe fails the partition assertion. Run it with `pip install -r /tests/requirements-test.txt && pytest /tests/ -q` (needs Java for PySpark). The companion skill adapts these tests to your dataset as the first verification tier. + +Repo-level tests (`tests/assets/test_sdp_quarantine_pattern.py`) verify the asset installs the expected files, the pipeline source parses, `expectations.py` builds the correct clean and inverse predicates, every drop predicate is NULL-safe, the resource YAML is valid (schema split, source parameterization, event log, tags, library path), and the skill and unit-test scaffold install. End-to-end expectation firing on the workspace is verified live (see below). + +## Validation results + +The pattern was validated live on serverless SDP against `samples.nyctaxi.trips`. + +- **Partition invariant holds exactly.** Baseline run: 21,932 raw rows split into 21,847 clean and 85 quarantine (21,847 + 85 = 21,932). The split stayed exact across every test. +- **Quarantine works on real data out of the box.** The 85 baseline quarantine rows are naturally invalid rows in the public sample (74 zero-distance, 9 zero or negative fare, 2 with multiple violations). No injection needed to see the pattern work. +- **The NULL trap is real and the fix works.** A row with a NULL `fare_amount` routed to quarantine only and did not leak into the clean table, because `valid_fare` is NULL-safe. A naive `fare_amount > 0` would have treated the NULL as passing and leaked the row. +- **Multiple violations on one row produce exactly one quarantine row.** No duplication; the event log still attributes each failed expectation separately. +- **Warn rules behave as advisory.** A row that only violated `fare_within_expected` flowed to the clean table and was recorded in the event log; quarantine was unaffected. + +Two lived findings worth carrying into your own pipelines: + +- `spark.catalog.tableExists()` is not whitelisted inside SDP (it raises `Py4JSecurityException`). Avoid it in pipeline code; this asset does not use it. +- A streaming table's set of input sources is fixed at its first run. Changing it later (for example, adding a union source) is incompatible with the existing checkpoint and requires a full refresh. + +## What this asset is + +A standalone sub-template in the [databricks-bundle-template](https://github.com/vmariiechko/databricks-bundle-template) asset library. It does not depend on the core template; it can be installed into any Databricks bundle. See [ASSETS.md](../../ASSETS.md) for the full catalog. + +It is a demonstration of one specific pattern with its tradeoffs documented, not a general-purpose data-quality framework. For a fuller framework, look at [DQX](https://databrickslabs.github.io/dqx/) (Databricks Labs). diff --git a/assets/sdp-quarantine-pattern/databricks_template_schema.json b/assets/sdp-quarantine-pattern/databricks_template_schema.json new file mode 100644 index 0000000..eaa03b2 --- /dev/null +++ b/assets/sdp-quarantine-pattern/databricks_template_schema.json @@ -0,0 +1,73 @@ +{ + "welcome_message": "\nLakeflow SDP Quarantine Pattern: Asset Installer\n\nInstalls two things that ship together:\n 1. A reference Lakeflow Spark Declarative Pipeline demonstrating the\n inverse-expectations quarantine pattern on the public\n samples.nyctaxi.trips dataset (the worked example).\n 2. A companion agent skill that instructs a coding agent to adapt the same\n pattern to YOUR dataset and self-verify the result.\n\nIn the pipeline, critical (drop) expectations route violating rows into a\nseparate quarantine table; the valid rows flow to silver. Advisory (warn)\nexpectations log to the pipeline event log without dropping. Drop predicates\nare written NULL-safe so the silver/quarantine split is a clean partition\n(see the README's NULL trap).\n\nThe public sample already contains some invalid rows, so the quarantine table\nis populated on the first run with no extra setup. Bronze tables land in your\nbronze schema; silver and quarantine land in your silver schema.\n\nLet's pick the install settings...\n", + + "properties": { + "target_dir": { + "type": "string", + "default": "src/sdp_quarantine_pattern", + "description": "\nTarget directory for the pipeline source files (relative to your bundle root).\nThe asset installs the pipeline notebook, expectations.py, expectations.json,\nand event_log_queries.sql here. Pick a path that doesn't already exist.\ntarget_dir", + "order": 1, + "pattern": "^[A-Za-z0-9_][A-Za-z0-9_./-]*$", + "pattern_match_failure_message": "Target directory must start with a letter, number, or underscore and contain only letters, numbers, underscores, slashes, hyphens, or dots." + }, + + "pipeline_resource_key": { + "type": "string", + "default": "sdp_quarantine_pattern", + "description": "\n========================================\n\nDABs resource key for the pipeline.\n\nThis key appears in the bundle as `resources.pipelines.` and is also the\nfilename: `resources/.pipeline.yml`. Other resources reference the\npipeline by `${resources.pipelines..id}`.\n\nDefault `sdp_quarantine_pattern` is safe for most projects. Pick another name\nonly if your bundle already defines a pipeline with this key.\nMust be lowercase snake_case (letters, numbers, underscores).\npipeline_resource_key", + "order": 2, + "pattern": "^[a-z][a-z0-9_]*$", + "pattern_match_failure_message": "Resource key must be lowercase, start with a letter, and contain only letters, numbers, and underscores." + }, + + "pipeline_name": { + "type": "string", + "default": "SDP Quarantine Pattern Demo", + "description": "\n========================================\n\nDisplay name shown in the Databricks workspace Pipelines list.\nFree-form; spaces and mixed case are fine.\npipeline_name", + "order": 3, + "pattern": "^.+$", + "pattern_match_failure_message": "Pipeline name cannot be empty." + }, + + "catalog": { + "type": "string", + "default": "main", + "description": "\n========================================\n\nUnity Catalog catalog the pipeline writes its tables into.\n\nYou need CREATE TABLE on this catalog and on both schemas below. The public\nsource (samples.nyctaxi.trips) is read-only and unaffected.\nMust be a valid lowercase identifier.\ncatalog", + "order": 4, + "pattern": "^[a-z][a-z0-9_]*$", + "pattern_match_failure_message": "Catalog must be lowercase, start with a letter, and contain only letters, numbers, and underscores." + }, + + "bronze_schema": { + "type": "string", + "default": "bronze", + "description": "\n========================================\n\nBronze-layer schema. The raw ingest table (raw_trips) and the pipeline\nevent log land here. This is the pipeline's default schema.\nMust be a valid lowercase identifier.\nbronze_schema", + "order": 5, + "pattern": "^[a-z][a-z0-9_]*$", + "pattern_match_failure_message": "Schema must be lowercase, start with a letter, and contain only letters, numbers, and underscores." + }, + + "silver_schema": { + "type": "string", + "default": "silver", + "description": "\n========================================\n\nSilver-layer schema. The validated table (clean_trips) and the quarantine\ntable (quarantine_trips) land here, following medallion schema separation.\nMust be a valid lowercase identifier.\nsilver_schema", + "order": 6, + "pattern": "^[a-z][a-z0-9_]*$", + "pattern_match_failure_message": "Schema must be lowercase, start with a letter, and contain only letters, numbers, and underscores." + }, + + "skill_dir": { + "type": "string", + "default": ".agents", + "description": "\n========================================\n\nTarget directory for the companion agent skill (relative to your bundle root).\nThe skill installs at /skills/sdp-quarantine-pattern/ and instructs\na coding agent to adapt the quarantine pattern to your own dataset and\nself-verify it.\n\nDefault `.agents` is vendor-neutral. If you use one agent and want\nauto-discovery, override with its native folder:\n - Claude Code: .claude\n - Codex: .codex\n - Cursor: .cursor\n - Gemini CLI: .gemini\n\nPick a path whose `skills/sdp-quarantine-pattern/` subfolder doesn't already\nexist in your project.\nskill_dir", + "order": 7, + "pattern": "^[A-Za-z0-9_.][A-Za-z0-9_./-]*$", + "pattern_match_failure_message": "Skill directory must start with a letter, number, underscore, or dot and contain only letters, numbers, underscores, slashes, hyphens, or dots." + } + }, + + "success_message": "\n========================================\n\nQuarantine pattern installed (reference pipeline + companion skill):\n - Pipeline source: {{.target_dir}}/quarantine_pipeline.py (Databricks notebook)\n - Expectations: {{.target_dir}}/expectations.json (+ expectations.py helper)\n - Event log SQL: {{.target_dir}}/event_log_queries.sql\n - Resource: resources/{{.pipeline_resource_key}}.pipeline.yml\n - Usage doc: docs/sdp-quarantine-pattern/README.md\n - Agent skill: {{.skill_dir}}/skills/sdp-quarantine-pattern/SKILL.md\n\nRun the reference pipeline as-is:\n\n1. Confirm your `databricks.yml` includes the resource file. Most bundles\n already include `resources/*.yml` by default; verify:\n include:\n - resources/*.yml\n\n2. Validate and deploy:\n databricks bundle validate -t \n databricks bundle deploy -t \n\n3. Run the pipeline ('{{.pipeline_name}}'). The public sample already contains\n some invalid rows, so quarantine_trips is populated on the first run.\n\nTables land in {{.catalog}}: raw_trips in {{.bronze_schema}}; clean_trips\nand quarantine_trips in {{.silver_schema}}. Inspect expectation results with the\nqueries in {{.target_dir}}/event_log_queries.sql. Design notes and the NULL-trap\nexplanation live in 'docs/sdp-quarantine-pattern/README.md'.\n\nAdapt the pattern to YOUR dataset with the companion skill:\n\n Point your coding agent at the skill and ask it to apply the quarantine\n pattern to your own table. Wire it the way your agent expects:\n - Claude Code: auto-discovered if {{.skill_dir}} is `.claude`; otherwise\n add to CLAUDE.md: 'Use the skill at\n {{.skill_dir}}/skills/sdp-quarantine-pattern/SKILL.md'.\n - Codex: reference that SKILL.md path from AGENTS.md.\n - Other / on Databricks (Genie): point the agent at the SKILL.md path.\n\n A ready-to-paste kickoff prompt and the offline unit-test commands are in\n 'docs/sdp-quarantine-pattern/README.md'.\n\n The skill delegates deploy/run/query mechanics to your agent's runtime. If\n your agent runs locally, installing the official Databricks ai-dev-kit\n skills (https://github.com/databricks-solutions/ai-dev-kit) gives it CLI\n auth, profile selection, and pipeline management. On Databricks, Genie Code\n already has this. Optionally, the `dbx-ro-query` asset from this same repo\n adds a guarded read-only SQL window for the investigation and audit queries.\n", + + "min_databricks_cli_version": "v0.296.0", + "version": 1 +} diff --git a/assets/sdp-quarantine-pattern/template/docs/sdp-quarantine-pattern/README.md b/assets/sdp-quarantine-pattern/template/docs/sdp-quarantine-pattern/README.md new file mode 100644 index 0000000..93c4105 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/docs/sdp-quarantine-pattern/README.md @@ -0,0 +1,103 @@ +# SDP Quarantine Pattern (in-bundle usage) + +This pipeline demonstrates the inverse-expectations quarantine pattern on the public `samples.nyctaxi.trips` dataset. Critical (drop) expectations route violating rows into a separate `quarantine_trips` table; valid rows flow to `clean_trips`; advisory (warn) expectations log to the pipeline event log without dropping. + +The full design rationale and the NULL trap live in the asset README in the [databricks-bundle-template repo](https://github.com/vmariiechko/databricks-bundle-template/tree/main/assets/sdp-quarantine-pattern). This file covers what you do after install. + +## What this asset installed + +| Path | Purpose | +|---|---| +| `/quarantine_pipeline.py` | The SDP pipeline source (Databricks notebook): raw, clean, quarantine. | +| `/expectations.json` | Declarative drop/warn rule lists (source of truth). | +| `/expectations.py` | Pure helper: loads rules, builds the clean and inverse predicates. | +| `/tests/` | Offline unit tests (`pytest` + local Spark) that prove the partition invariant and NULL routing without a workspace. | +| `/event_log_queries.sql` | Queries that parse expectation results from the event log. | +| `resources/.pipeline.yml` | The DABs pipeline resource definition. | +| `/skills/sdp-quarantine-pattern/SKILL.md` | Companion agent skill: adapt the pattern to your own dataset and verify it. | +| `docs/sdp-quarantine-pattern/README.md` | This file. | + +## Adapt the pattern to your own dataset (companion skill) + +This asset ships a companion agent skill at `/skills/sdp-quarantine-pattern/SKILL.md`. Point a coding agent (Claude Code, Codex, or Genie Code on Databricks) at it and it will adapt the quarantine pattern to your own table: investigate your data, propose drop/warn rules for you to confirm, keep every drop predicate NULL-safe, adapt and run the offline unit tests, wire the resource into your bundle, and verify the result. + +To start, paste a prompt like this into your agent and fill the placeholders: + +``` +Use the sdp-quarantine-pattern skill to add the quarantine pattern to my pipeline. + +- Source table: +- Catalog / schemas: , , +- Quality rules I care about: +- Deploy mode: + +Investigate the source, propose drop/warn rules for me to confirm, adapt the pattern, +run the offline unit tests, and then guide me through (or do) the deploy and live checks. +``` + +The skill defaults to handing deploy and run steps back to you for review; tell it explicitly if you want it to deploy and run on its own. + +The skill delegates the deploy, run, and query mechanics to your agent's runtime rather than hardcoding CLI commands. On Databricks, Genie Code already knows how to drive pipelines. Locally, installing the official Databricks [ai-dev-kit](https://github.com/databricks-solutions/ai-dev-kit) agent skills gives the agent CLI authentication, profile selection, and pipeline management, which makes the workflow smoother. Optionally, the `dbx-ro-query` asset from this same repo adds a guarded read-only SQL window for the investigation and audit queries. + +## Run the offline unit tests + +The asset ships unit tests at `/tests/` that prove the partition invariant (clean + quarantine == raw) and NULL routing on crafted edge rows, with no Databricks workspace. They need Java (for PySpark) and Python 3.10+: + +```bash +pip install -r /tests/requirements-test.txt +pytest /tests/ -q +``` + +A failing partition or NULL-routing test means a drop predicate is not NULL-safe. Fix it in `expectations.json` and rerun. + +## Tables the pipeline creates + +Following medallion schema separation: + +- `..raw_trips`: raw trips ingested from the configured source (default `samples.nyctaxi.trips`). +- `..clean_trips`: rows passing every drop expectation. +- `..quarantine_trips`: rows failing at least one drop expectation (the inverse). +- `..quarantine_pipeline_event_log`: the published pipeline event log. + +The public sample contains some naturally invalid rows (zero distance, zero or negative fare, and similar), so `quarantine_trips` is populated on the first run with no extra setup. + +## Bundle integration + +Most bundles already include `resources/*.yml` from `databricks.yml`: + +```yaml +include: + - resources/*.yml +``` + +If yours does, the pipeline resource is picked up automatically. The resource references the pipeline source with a path relative to the resource file (`..//quarantine_pipeline.py`); DABs translates library paths relative to the resource file's directory, so the default layout works without changes. + +## Deploy and run + +```bash +databricks bundle validate -t +databricks bundle deploy -t +``` + +After deploy, the pipeline appears in the workspace Pipelines list. Run it, then check the three tables and the event log. + +## Inspecting expectation results + +`event_log_queries.sql` parses the published event log into per-expectation passed/failed counts for the latest update, plus a focused query for the warn expectations (which never route to quarantine, so the event log is the only place they surface). Run those queries against your warehouse or in a notebook after a pipeline run. + +## Configuration + +The pipeline reads its catalog, schemas, and source from the DABs resource `configuration` block (`quarantine.catalog`, `quarantine.bronze_schema`, `quarantine.silver_schema`, `quarantine.source`); the bronze schema also matches the pipeline's default `schema`. To change them later, edit the resource YAML and redeploy. Pointing `quarantine.source` at a curated test table (in a separate test target) is how you run a clean end-to-end integration test against known-good and known-bad rows. + +## Editing the expectations + +Add or change rules in `expectations.json` under `drop` (critical, routed to quarantine) or `warn` (advisory, logged). Keep every `drop` predicate NULL-safe (guard each column with `IS NOT NULL`) so the clean/quarantine split stays a clean partition. The quarantine predicate is derived automatically from the drop list; you do not maintain it by hand. After editing rules, rerun the unit tests in `/tests/` to confirm the partition still holds. + +## References + +1. [Lakeflow SDP expectations](https://docs.databricks.com/aws/en/ldp/expectations) +2. [Manage data quality with pipeline expectations](https://docs.databricks.com/aws/en/ldp/expectation-patterns) +3. [Monitor pipelines with the event log](https://docs.databricks.com/aws/en/ldp/monitor-event-logs) +4. [DABs pipelines resource reference](https://docs.databricks.com/aws/en/dev-tools/bundles/resources.html#pipelines) +5. [Databricks sample datasets (`samples.nyctaxi.trips`)](https://docs.databricks.com/aws/en/discover/databricks-datasets) diff --git a/assets/sdp-quarantine-pattern/template/resources/{{.pipeline_resource_key}}.pipeline.yml.tmpl b/assets/sdp-quarantine-pattern/template/resources/{{.pipeline_resource_key}}.pipeline.yml.tmpl new file mode 100644 index 0000000..7fdf272 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/resources/{{.pipeline_resource_key}}.pipeline.yml.tmpl @@ -0,0 +1,47 @@ +# Lakeflow SDP pipeline: inverse-expectations quarantine pattern. +# +# raw_trips lands in the bronze schema (the pipeline default); clean_trips +# and quarantine_trips are fully qualified into the silver schema by the +# pipeline source. The pipeline source is referenced relative to THIS file's +# directory (DABs translates library paths relative to the resource file). +# +# Reference this pipeline from other resources via: +# ${resources.pipelines.{{.pipeline_resource_key}}.id} +# +# See docs/sdp-quarantine-pattern/README.md for the design and the NULL trap. + +resources: + pipelines: + {{.pipeline_resource_key}}: + name: {{.pipeline_name}} + catalog: {{.catalog}} + schema: {{.bronze_schema}} + serverless: true + channel: CURRENT + + # Publish the pipeline event log to a queryable UC table. Expectation + # results (passed/failed records per rule) are parsed from here; see + # event_log_queries.sql. The name is coupled to this pipeline so it does + # not collide with other pipelines publishing into the same schema. + event_log: + catalog: {{.catalog}} + schema: {{.bronze_schema}} + name: quarantine_pipeline_event_log + + libraries: + - notebook: + path: ../{{.target_dir}}/quarantine_pipeline.py + + configuration: + quarantine.catalog: {{.catalog}} + quarantine.bronze_schema: {{.bronze_schema}} + quarantine.silver_schema: {{.silver_schema}} + # The source table the raw layer ingests. Point this at a curated test + # table to run the pipeline against known-good/known-bad rows; see the + # skill's self-verify reference (Tier 3 integration test). + quarantine.source: samples.nyctaxi.trips + + # Traceability back to the originating asset (forwarded as cluster tags). + tags: + created_by: dabs-asset + asset: sdp-quarantine-pattern diff --git a/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/SKILL.md.tmpl b/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/SKILL.md.tmpl new file mode 100644 index 0000000..ddf294f --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/SKILL.md.tmpl @@ -0,0 +1,103 @@ +--- +name: sdp-quarantine-pattern +description: Use this skill to apply the inverse-expectations quarantine pattern to a Lakeflow Spark Declarative Pipeline on the user's own dataset. Critical (drop) checks route violating rows into a separate quarantine table while valid rows flow to a clean table; advisory (warn) checks are logged but not dropped. Use it whenever the user wants quarantine or dead-letter handling, expectation-based data-quality routing, or a "good rows here, bad rows there" split in an SDP pipeline. The skill adapts the shipped samples.nyctaxi.trips reference to the user's table, enforces NULL-safe drop predicates so clean and quarantine form a clean partition, wires the DABs pipeline resource into the bundle, and verifies the result (offline rule unit tests first, then live audits). It applies and verifies a pattern; it is not an auto-rewriter and does not guarantee correctness on arbitrary data. +--- + +# sdp-quarantine-pattern + +Apply the inverse-expectations quarantine pattern to the user's own Lakeflow Spark Declarative Pipeline (SDP), then prove it works. + +The pattern: critical data-quality rules ("drop" rules) decide which rows are valid. Valid rows flow to a clean table. The rows that fail at least one drop rule are captured in a separate quarantine table, which applies the **inverse** of the drop rules. Advisory rules ("warn" rules) annotate the pipeline event log but never drop or route anything. + +This asset shipped a validated reference implementation alongside this skill. Read it first; it is the worked example you adapt away from: + +- Pipeline source: `{{.target_dir}}/quarantine_pipeline.py` (a Databricks notebook: raw, clean, quarantine). +- Rules: `{{.target_dir}}/expectations.json` (declarative drop/warn lists) and `{{.target_dir}}/expectations.py` (a pure helper that loads the rules and derives the clean and inverse predicates). +- Unit tests: `{{.target_dir}}/tests/` (`test_expectations.py`, `conftest.py`, `requirements-test.txt`) - the offline proof of the pattern. +- Event-log queries: `{{.target_dir}}/event_log_queries.sql`. +- DABs resource: `resources/.pipeline.yml`. + +Paths in this skill are relative to the bundle root. + +## The one rule you must not break: NULL-safe drop predicates + +`expect_all_or_drop` drops a row only when its predicate evaluates to `false`. A predicate that evaluates to SQL `NULL` (unknown) is not `false`, so the row is **kept**. So a naive drop rule like `amount > 0` lets a NULL `amount` through to the clean table, and the inverse `NOT(amount > 0)` is also `NULL`, so the same row is kept in the quarantine table too. The one row lands in both, double-counted, and the partition breaks. + +Every drop predicate you write or adapt MUST be total (NULL-safe): guard each column test with `IS NOT NULL`. + +``` +# wrong (NULL amount leaks) +amount > 0 +# right (NULL amount is dropped, routes to quarantine) +amount IS NOT NULL AND amount > 0 +``` + +A total predicate never returns `NULL`, so the clean and quarantine tables partition the input exactly once: every raw row lands in exactly one of them. This is the property the Tier 1 unit tests prove. Do not relax it. If the user genuinely cannot guarantee total predicates (for example rules pulled from an external system), compute an explicit NULL-safe `_quarantined` boolean once and branch both tables on that flag instead; `references/adapt-the-pattern.md` covers this fallback. + +## Workflow + +The work splits at a trust boundary. **Phase A** is non-mutating: investigate, edit files, and run the offline unit tests. You can complete it and prove the core correctness without touching a workspace. **Phase B** deploys and runs the pipeline, which mutates state, so it is gated (see "Control model" below). Lean on `references/adapt-the-pattern.md` for the editing mechanics and `references/self-verify.md` for the verification tiers. + +### Phase A (autonomous, no workspace changes) + +1. **Understand the target.** Find out what the user wants quarantined and from which source (a table, a file feed, an existing pipeline). Inspect the source schema and sample a few rows so your rules match real column names and types. Use your runtime's read-only query capability (see below). +2. **Decide the checks and classify them.** For each quality concern, decide whether it is a drop rule (a row that fails is invalid and must be quarantined) or a warn rule (advisory, stays in the clean table but is flagged in the event log). Default approach: investigate the data, propose an initial rule set with a one-line justification each, and confirm it with the user before wiring it in. Do not silently invent business rules. +3. **Adapt the files.** Point the raw layer at the user's source, rewrite `expectations.json` for the user's columns (every drop rule NULL-safe), and set the catalog and bronze/silver schemas. Keep `expectations.py` as-is; it derives the predicates from whatever drop rules you supply. +4. **Adapt and run the unit tests (Tier 1).** Update `{{.target_dir}}/tests/test_expectations.py` with the user's schema and edge rows (one per violation, a multi-violation row, a warn-only row, and a NULL-in-a-guarded-column row). Install the test dependencies into the project's virtual environment (use an existing `venv`/`.venv` if the project has one, otherwise create one; do not install into the global or system Python), then run `pytest {{.target_dir}}/tests/ -q`. Green means the rules partition correctly and the NULL trap is closed, proven offline. Fix any failure before going further. +5. **Integrate the DABs resource.** Add the pipeline resource to the user's `databricks.yml`. Inspect it first: reuse existing variables where they exist (for example `catalog: ${var.catalog_name}` rather than a hardcoded catalog) and confirm an `include: resources/*.yml` glob (or equivalent) picks the resource up. Do not assume the default layout. + +### Phase B (gated: deploy, run, live-verify) + +6. **Deploy and run.** `databricks bundle validate`, then `deploy`, then run the pipeline. Use your runtime's own capability for this (see "Deploying and running" below). +7. **Live-verify.** Run the Tier 2 read-only audits in `references/self-verify.md` (partition invariant + NULL-in-clean audit + event-log inspection). Offer Tier 3 (an integration test on a curated source via `quarantine.source`) if the user wants full end-to-end proof. +8. **Confirm and hand back.** Tell the user what you changed, show the verification results, and state the tradeoffs (the source is processed twice; warn rules surface only in the event log) and the limits (verification proves behavior on the data tested, not on all future data). + +## Control model: who deploys and runs + +Phase B mutates workspace state, so do not assume you may deploy and run unattended. Default to **human-in-the-middle**: finish Phase A, then present the deploy/run commands and the verification queries and let the user run them, coming back to you with results to verify. Run the full closed loop yourself (deploy, run, full-refresh, verify) only when the user has explicitly granted that and your runtime can do it. Ask the user which mode they want rather than guessing. In either mode, Phase A still gives the user real, proven correctness (the offline unit tests) before anything is deployed. + +## Read-only query access (use what your runtime has) + +Investigation (step 1) and the Tier 2 audits need read-only SQL. Use whatever read-only query capability your runtime already provides; the queries are plain SQL and the skill mandates no tool: + +- On Databricks (for example Genie Code), use the platform's native query ability. +- On a local machine, use the Databricks CLI or a SQL warehouse. +- If a guarded read-only SQL skill is already installed in this project (for example `dbx-ro-query`), prefer it. + +Keep all verification SQL read-only. Never run destructive SQL to verify a pipeline. + +## Deploying and running (delegate the mechanics) + +This skill owns the pattern and the verification logic, not the deploy plumbing. For the actual deploy / run / full-refresh mechanics, use your runtime's own capability: on Databricks, Genie Code already knows how to manage pipelines; on a local machine, use the Databricks CLI directly, and if the Databricks ai-dev-kit agent skills (https://github.com/databricks-solutions/ai-dev-kit) are installed, use those for the CLI authentication, profile selection, and pipeline-management steps. Do not hardcode a CLI recipe here. If you are running locally without those skills and the user has not chosen a CLI profile, ask which profile to use rather than guessing. + +## Agent latitude + +These steps are deliberately high-level. The user's setup will have specifics you cannot anticipate: existing variables and naming conventions, an existing pipeline you are extending, compute or governance constraints, a source that is a view or a file feed. Investigate the user's repository and workspace, decide how best to fit the pattern, and confirm your plan with the user. Adapt the pattern; do not transplant the nyctaxi specifics. + +## Definition of done + +Do not report success until all of these hold: + +- [ ] Drop and warn rules reflect the user's dataset and the user has confirmed the classification. +- [ ] Every drop predicate is NULL-safe (each column test guarded by `IS NOT NULL`). +- [ ] The adapted Tier 1 unit tests pass (`pytest {{.target_dir}}/tests/ -q`). +- [ ] The pipeline resource is integrated and `databricks bundle validate` is clean. +- [ ] Either the pipeline ran and the Tier 2 audits pass, or (human-in-the-middle) you have handed the user the exact deploy/run commands and audit queries. +- [ ] You have stated the tradeoffs and limits back to the user. + +## Limits + +This skill applies and verifies a pattern. It does not guarantee correctness on arbitrary data, and it is not a general data-quality framework. The verification proves the partition and NULL behavior on the rows actually tested; it does not prove the rules capture every real-world defect, nor that they will hold for data shapes you have not seen. The user owns the business meaning of the rules. For a fuller framework, point them at [DQX](https://databrickslabs.github.io/dqx/) (Databricks Labs). + +## References + +- `references/adapt-the-pattern.md`: the mechanics of adapting the rules, pipeline, and unit tests to a new dataset, the status-column fallback, and config dimensions. +- `references/self-verify.md`: the three-tier verification ladder (offline unit tests, live read-only audits, optional integration test). + +Official Databricks docs: + +1. [Lakeflow SDP expectations](https://docs.databricks.com/aws/en/ldp/expectations) +2. [Manage data quality with pipeline expectations](https://docs.databricks.com/aws/en/ldp/expectation-patterns) +3. [Develop pipelines: create test datasets](https://docs.databricks.com/aws/en/ldp/develop) +4. [Pipeline parameters (source parameterization)](https://docs.databricks.com/aws/en/ldp/parameters) +5. [Monitor pipelines with the event log](https://docs.databricks.com/aws/en/ldp/monitor-event-logs) diff --git a/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/references/adapt-the-pattern.md.tmpl b/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/references/adapt-the-pattern.md.tmpl new file mode 100644 index 0000000..222b018 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/references/adapt-the-pattern.md.tmpl @@ -0,0 +1,83 @@ +# Adapting the quarantine pattern to a new dataset + +Load this when you are editing the rules, pipeline, and tests for the user's data. It assumes you have read `SKILL.md` and the reference implementation under `{{.target_dir}}/`. + +## Anatomy of the reference + +Three datasets, all named with fully qualified `catalog.schema.table` identifiers so the source reads consistently and the in-pipeline dependencies resolve unambiguously. The schemas are the medallion layers; the table names describe content: + +- `raw_trips` (bronze schema): a streaming ingest of the source. No expectations; it captures everything. +- `clean_trips` (silver schema): reads the raw table, applies `@dp.expect_all(WARN_RULES)` then `@dp.expect_all_or_drop(DROP_RULES)`. The drop decorator removes rows that fail any drop rule. +- `quarantine_trips` (silver schema): reads the same raw table, applies `@dp.expect_all_or_drop({"is_quarantined": QUARANTINE_EXPR})` where `QUARANTINE_EXPR` is `NOT((rule1) AND (rule2) AND ...)`. It keeps exactly the rows the clean table dropped. + +`{{.target_dir}}/expectations.json` holds the two rule lists. `{{.target_dir}}/expectations.py` loads them and builds the predicates: `build_silver_expr` (all rules pass, used by the tests) and `build_quarantine_expr` (its inverse, used by the pipeline). You do not maintain the inverse by hand. The pipeline reads `quarantine.catalog`, `quarantine.bronze_schema`, `quarantine.silver_schema`, and `quarantine.source` from the DABs resource `configuration` block, so the source needs no per-deploy editing for those. + +## What to change for the user's dataset + +1. **Source.** In `{{.target_dir}}/quarantine_pipeline.py`, the raw layer reads `SOURCE_TABLE`, which comes from the `quarantine.source` config (default `samples.nyctaxi.trips`). For the user's data, set `quarantine.source` in the resource to their table. The reference does a streaming read of a table (`spark.readStream.table(...)`). If the user's source is a file feed, use the matching read (for example Auto Loader / `cloudFiles`); if it is batch, use `spark.read` and materialized views instead of streaming tables. Keep the three-table shape. +2. **Columns.** Update `TRIP_COLUMNS` (or replace it with the user's projection) so the raw layer selects the columns the rules reference. Drop the `nyctaxi`-specific column list. +3. **Rules.** Rewrite `{{.target_dir}}/expectations.json`. This is the real work; see the next section. +4. **Names.** Rename the table constants (`RAW_*`, `CLEAN_*`, `QUARANTINE_*`) and the `@dp.table` comments to the user's domain. Keep them fully qualified. +5. **Schemas and catalog.** Set these through the resource `configuration` block, reusing the user's bundle variables where they exist (see "Integrate the DABs resource" in `SKILL.md`). +6. **Unit tests.** Update `{{.target_dir}}/tests/test_expectations.py` to match; see "Adapting the unit tests" below. + +## Writing the rules + +Each rule is `{"name": "...", "query": ""}`. Split them into `drop` and `warn`. + +- **drop**: a row that fails this is invalid and must be quarantined. Example concerns: a required field is missing, a number is out of its valid domain, a timestamp ordering is violated, a foreign key is absent. +- **warn**: advisory. The row stays in the clean table but the failure count surfaces in the event log. Example concerns: a value is unusually large but not impossible, an optional field is missing. + +Classify with the user. Investigate the data first (sample rows, check null rates and value ranges with read-only SQL, and check the column's actual type rather than trusting how sample values look), propose a starting set of drop and warn rules with a one-line justification each, and let the user ratify or adjust before you wire them in. Do not invent business thresholds silently. + +The source does not need to contain violations today. A clean or synthetic source is fine: the rules guard against bad rows arriving in future micro-batches, and the Tier 1 unit tests prove the routing on crafted edge rows regardless of what the live source currently holds. Do not fabricate violations in the source to "see the pattern work"; that is what the unit tests are for. + +### Every drop predicate must be NULL-safe + +This is the rule from `SKILL.md`, restated because it is where adaptations go wrong. Guard every column reference in a drop rule with `IS NOT NULL`: + +``` +valid_amount: amount IS NOT NULL AND amount > 0 +valid_window: ends_at IS NOT NULL AND starts_at IS NOT NULL AND ends_at > starts_at +required_key: customer_id IS NOT NULL +``` + +A drop rule that references a column without an `IS NOT NULL` guard will let NULLs leak into the clean table and break the partition. The Tier 1 unit tests exist precisely to catch this; if they fail on the partition or NULL-routing assertions, a drop predicate is not total. + +Warn rules do not need to be NULL-safe, because they do not route anything; a NULL warn predicate simply does not flag the row. + +### The status-column fallback + +If the user cannot guarantee total drop predicates (rules arrive from an external system, or are too dynamic to audit), do not rely on raw inverse predicates. Instead compute one explicit NULL-safe boolean in the raw layer or a shared view, for example: + +``` +_quarantined = NOT ( (amount IS NOT NULL AND amount > 0) AND (customer_id IS NOT NULL) AND ... ) +``` + +then define the clean table as the rows where `_quarantined` is false and quarantine as the rows where it is true. Both tables branch on the same precomputed flag, so they partition cleanly even if an individual predicate is not total. This trades the pure expectations-only form for robustness. + +## Adapting the unit tests + +`{{.target_dir}}/tests/test_expectations.py` is the offline proof. To retarget it: + +1. Replace `_SCHEMA` with the user's columns and types. +2. Replace `_EDGE_ROWS` with rows that exercise the user's rules: a fully valid row, one row per single drop violation, a multi-violation row, a warn-only row (passes every drop rule), and a row with `NULL` in each column that a drop rule guards. Set each row's `expected` value to `"clean"` or `"quarantine"`. +3. Leave the routing model alone: `CLEAN_KEEP_EXPR` and `QUARANTINE_KEEP_EXPR` use `() IS NOT FALSE` to reproduce `expect_all_or_drop`'s keep-on-NULL semantics. This is what makes the test catch a non-NULL-safe rule as a broken partition. +4. Run `pytest {{.target_dir}}/tests/ -q`. The pure-Python config tests and the Spark routing tests should all pass. + +The tests need Java (for PySpark) and Python 3.10+ locally. They do not need a Databricks workspace. + +## Config dimensions to consider + +The reference resource is intentionally minimal (`serverless: true`, `channel: CURRENT`, an `event_log` block, `configuration`, and trace `tags`). Depending on the user's context, consider whether to extend it, but only with a reason: + +- **Photon / cluster sizing**: for classic compute, if throughput matters. +- **`continuous: true`**: if the user needs always-on streaming rather than triggered runs. +- **`edition`**: only if the user needs a feature tied to an edition; expectations work on serverless without setting it. +- **`channel`**: keep `CURRENT` unless the user needs a preview feature. + +Keep the worked example minimal; add dimensions to the user's own resource as their needs require, not by default. + +## The streaming-source caveat + +A streaming table's set of input sources is fixed at its first run. If you later change the raw layer's source (for example switch `quarantine.source` or add a union), the existing streaming checkpoint can be incompatible and the run fails with a sources-count assertion. Plan the source before the first deploy, or run a full refresh when you change it. This is also why Tier 3 integration testing uses source parameterization on a separate test target rather than splicing a faults table into a running pipeline. diff --git a/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/references/self-verify.md b/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/references/self-verify.md new file mode 100644 index 0000000..032e74c --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.skill_dir}}/skills/sdp-quarantine-pattern/references/self-verify.md @@ -0,0 +1,84 @@ +# Verifying the adapted quarantine pattern + +Load this when you are ready to prove the adapted pattern works. Verification is a ladder of three tiers. Lead with Tier 1: it is deterministic, repeatable, and needs no Databricks workspace, so it proves the core correctness before anyone deploys anything. Tiers 2 and 3 confirm behavior on the live pipeline. + +Throughout, substitute the user's real identifiers for the placeholders: + +- `` = the raw/bronze table, for example `main.bronze.raw_orders` +- `` = the clean/silver table (valid rows) +- `` = the quarantine table +- `` = the published event log table (the resource's `event_log` block) + +## Tier 1: offline rule unit tests (primary, always do this) + +This is the highest-value check and it runs locally with a real Spark session, no workspace. It proves the one property the whole pattern depends on: every raw row lands in exactly one of clean or quarantine, and a NULL in a guarded column routes to quarantine, not clean. + +The asset ships a worked example at `/tests/` (`conftest.py`, `test_expectations.py`, `requirements-test.txt`). Adapt it to the user's dataset: + +1. Read `/tests/test_expectations.py`. It has two parts: pure-Python config checks (rules load, every drop predicate is NULL-safe, the quarantine predicate is the inverse of the clean predicate) and a real-Spark routing test. +2. Replace the example schema (`_SCHEMA`) and edge rows (`_EDGE_ROWS`) with the user's columns. Craft one row per case: a fully valid row, one row per single drop violation, a multi-violation row, a warn-only row (passes all drop rules), and critically a row with `NULL` in each column that a drop rule guards. Label each row's `expected` bucket (`clean` or `quarantine`). +3. Keep the routing model as-is: the test reproduces `expect_all_or_drop`'s keep-on-NULL semantics with `() IS NOT FALSE`, so a non-NULL-safe rule shows up as a broken partition. Do not weaken it to a plain filter. +4. Run it: + + ```bash + pip install -r /tests/requirements-test.txt + pytest /tests/ -q + ``` + +PASS when every test is green. If `test_partition_invariant` or a NULL-routing test fails, a drop predicate is not NULL-safe: fix it in `expectations.json` (add the `IS NOT NULL` guard) and rerun. You have now proven the rules partition correctly without touching a workspace. + +This tier needs Java (for PySpark) and Python 3.10+. If the runtime cannot run Spark locally, say so and rely on Tier 2 and Tier 3 for the partition proof instead; do not skip verification silently. + +## Tier 2: read-only audits on the deployed pipeline (reality check) + +After the pipeline runs on the workspace, confirm the same invariants on real data with read-only SQL (use whatever read-only query capability your runtime has). These need no fault injection. + +### A. Partition invariant + +```sql +SELECT + (SELECT COUNT(*) FROM ) AS raw_rows, + (SELECT COUNT(*) FROM ) AS clean_rows, + (SELECT COUNT(*) FROM ) AS quarantine_rows, + (SELECT COUNT(*) FROM ) + - (SELECT COUNT(*) FROM ) + - (SELECT COUNT(*) FROM ) AS gap; +``` + +PASS when `gap = 0`. + +### B. NULL-in-clean audit + +For every column that appears in a drop rule, no clean row may have it NULL: + +```sql +SELECT '' AS dropped_column, COUNT(*) AS nulls_in_clean +FROM +WHERE IS NULL; +``` + +PASS when `nulls_in_clean = 0` for every dropped column. + +### C. Expectation results in the event log + +The per-expectation passed/failed counts (and the warn rules, which never route to quarantine) surface in the event log. The reference ships `/event_log_queries.sql`; point it at `` to confirm each rule fired as expected. + +## Tier 3: integration test via source parameterization (optional E2E) + +When the user wants a full end-to-end test on the workspace with known inputs, do not mutate the live source or splice in a faults table. Use the source parameterization the pipeline already supports. The raw layer reads its source from the `quarantine.source` configuration, so a test run can point it at a curated table. + +This follows the Databricks guidance to create test datasets that include records which break expectations, and it keeps the run clean and repeatable (no streaming-checkpoint surgery): + +1. Create a small curated test table with the same columns the raw layer selects. Populate it with known-good rows and known-bad rows, one per scenario (single violation, multi-violation, warn-only, and a NULL in a guarded column), each with a value you can recognize afterward. +2. Point the pipeline at it: set `quarantine.source` to the test table in a test target (or a copy of the resource), so the live (main-target) config is untouched. +3. Deploy to the test target and run the pipeline. +4. Run the Tier 2 audits, plus assert the exact routing: each known-bad row is in `` and not in ``, each known-good row is in ``, the multi-violation row appears in quarantine exactly once, and the NULL row is in quarantine only. +5. Tear down the test target when done. + +PASS when the partition holds and every known row lands where you expect. + +A full, reusable integration-test harness for SDP pipelines (fixtures, CI wiring, golden outputs) is beyond this skill's scope. If the user wants that, treat it as its own effort. + +## Interpreting results honestly + +Tier 1 proves the rules partition correctly on the crafted edge cases, deterministically and offline. Tier 2 confirms it on whatever real data flowed. Tier 3 confirms it end-to-end on curated inputs. None of these prove the rules capture every real defect in the user's domain, nor that they hold for data shapes you have not tested. Report what you verified and what you did not. The user owns whether the rules are the right rules. diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/event_log_queries.sql.tmpl b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/event_log_queries.sql.tmpl new file mode 100644 index 0000000..0d5dd4d --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/event_log_queries.sql.tmpl @@ -0,0 +1,73 @@ +-- Inspect expectation results from the pipeline event log. +-- +-- The pipeline publishes its event log to {{.catalog}}.{{.bronze_schema}}.quarantine_pipeline_event_log +-- (set by the `event_log` block in the pipeline resource). Data-quality results +-- live in `flow_progress` events under details:flow_progress.data_quality. +-- The parsing shape below is the one documented in the Databricks event log +-- reference; adjust column paths if a future runtime changes the schema. + +-- --------------------------------------------------------------------------- +-- Per-expectation passed/failed counts for the latest pipeline update. +-- WARN rules show failed_records here while the rows still flow to silver; +-- DROP rules show the rows routed to quarantine. +-- --------------------------------------------------------------------------- +WITH latest_update AS ( + SELECT origin.update_id AS update_id + FROM {{.catalog}}.{{.bronze_schema}}.quarantine_pipeline_event_log + WHERE event_type = 'create_update' + ORDER BY timestamp DESC + LIMIT 1 +), +expectations_parsed AS ( + SELECT + explode( + from_json( + details:flow_progress.data_quality.expectations, + "array>" + ) + ) AS e + FROM {{.catalog}}.{{.bronze_schema}}.quarantine_pipeline_event_log, latest_update + WHERE event_type = 'flow_progress' + AND origin.update_id = latest_update.update_id +) +SELECT + e.dataset, + e.name AS expectation, + SUM(e.passed_records) AS passing_records, + SUM(e.failed_records) AS failing_records +FROM expectations_parsed +GROUP BY e.dataset, e.name +ORDER BY failing_records DESC, e.dataset, e.name; + +-- --------------------------------------------------------------------------- +-- Only the WARN expectations that fired (rows kept in silver but flagged). +-- These are advisory: they never route to quarantine, so the event log is the +-- only place they surface. +-- --------------------------------------------------------------------------- +WITH latest_update AS ( + SELECT origin.update_id AS update_id + FROM {{.catalog}}.{{.bronze_schema}}.quarantine_pipeline_event_log + WHERE event_type = 'create_update' + ORDER BY timestamp DESC + LIMIT 1 +), +expectations_parsed AS ( + SELECT + explode( + from_json( + details:flow_progress.data_quality.expectations, + "array>" + ) + ) AS e + FROM {{.catalog}}.{{.bronze_schema}}.quarantine_pipeline_event_log, latest_update + WHERE event_type = 'flow_progress' + AND origin.update_id = latest_update.update_id +) +SELECT + e.name AS warn_expectation, + SUM(e.failed_records) AS flagged_records +FROM expectations_parsed +WHERE e.name IN ('fare_within_expected', 'distance_within_expected', 'dropoff_zip_present') +GROUP BY e.name +HAVING SUM(e.failed_records) > 0 +ORDER BY flagged_records DESC; diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/expectations.json b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/expectations.json new file mode 100644 index 0000000..8b36959 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/expectations.json @@ -0,0 +1,38 @@ +{ + "drop": [ + { + "name": "valid_fare", + "query": "fare_amount IS NOT NULL AND fare_amount > 0" + }, + { + "name": "valid_distance", + "query": "trip_distance IS NOT NULL AND trip_distance > 0" + }, + { + "name": "valid_pickup_ts", + "query": "tpep_pickup_datetime IS NOT NULL" + }, + { + "name": "valid_dropoff_after_pickup", + "query": "tpep_dropoff_datetime IS NOT NULL AND tpep_dropoff_datetime > tpep_pickup_datetime" + }, + { + "name": "valid_pickup_zip", + "query": "pickup_zip IS NOT NULL" + } + ], + "warn": [ + { + "name": "fare_within_expected", + "query": "fare_amount < 500" + }, + { + "name": "distance_within_expected", + "query": "trip_distance < 100" + }, + { + "name": "dropoff_zip_present", + "query": "dropoff_zip IS NOT NULL" + } + ] +} diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/expectations.py b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/expectations.py new file mode 100644 index 0000000..39a9cb1 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/expectations.py @@ -0,0 +1,72 @@ +"""Data quality rules for the quarantine-pattern pipeline. + +Pure Python (no pyspark import), so this module is unit-testable offline and +safe to import from the pipeline at runtime. Rules live in `expectations.json`; +this module loads them and derives the inverse predicate the quarantine table +uses. + +Every drop predicate is written NULL-safe (each column test guarded by +`IS NOT NULL`) so the silver/quarantine split is a clean partition. The asset +README ("The NULL trap") explains why; do not relax that guard without reading +it. +""" + +import json +import os + +_HERE = os.path.dirname(os.path.abspath(__file__)) +_CONFIG_PATH = os.path.join(_HERE, "expectations.json") + + +def load_rules(action: str, config_path: str = _CONFIG_PATH) -> dict[str, str]: + """Return `{expectation_name: sql_predicate}` for `"drop"` or `"warn"`.""" + with open(config_path, encoding="utf-8") as f: + config = json.load(f) + return {rule["name"]: rule["query"] for rule in config[action]} + + +def build_silver_expr(drop_rules: dict[str, str]) -> str: + """Build the all-drop-rules-pass predicate for the clean (silver) table. + + Given `{name: predicate}`, returns `(p1) AND (p2) AND ...`, which is true + exactly for rows that pass every drop rule. This mirrors what + `expect_all_or_drop(drop_rules)` keeps (when every predicate is NULL-safe), + and is the inverse of `build_quarantine_expr`. + + Args: + drop_rules: Mapping of drop expectation name to SQL predicate. + + Returns: + The conjunction of drop predicates as a string. + + Raises: + ValueError: If `drop_rules` is empty. + """ + predicates = list(drop_rules.values()) + if not predicates: + raise ValueError( + "at least one drop rule is required to build the silver expression" + ) + return " AND ".join(f"({p})" for p in predicates) + + +def build_quarantine_expr(drop_rules: dict[str, str]) -> str: + """Build the inverse-of-all-drop-rules predicate for the quarantine table. + + Given `{name: predicate}`, returns `NOT((p1) AND (p2) AND ...)`, which is + true exactly for rows that fail at least one drop rule. + + Args: + drop_rules: Mapping of drop expectation name to SQL predicate. + + Returns: + The inverse SQL predicate as a string. + + Raises: + ValueError: If `drop_rules` is empty. + """ + return f"NOT({build_silver_expr(drop_rules)})" + + +DROP_RULES = load_rules("drop") +WARN_RULES = load_rules("warn") diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/quarantine_pipeline.py b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/quarantine_pipeline.py new file mode 100644 index 0000000..be7fedd --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/quarantine_pipeline.py @@ -0,0 +1,105 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Quarantine pattern: inverse expectations + a separate quarantine table +# MAGIC +# MAGIC Critical (drop) expectations remove violating rows from `clean_trips`. The +# MAGIC `quarantine_trips` table applies the **inverse** of those expectations, so it +# MAGIC captures exactly the rows the clean table dropped. Advisory (warn) expectations +# MAGIC annotate the pipeline event log without dropping anything. +# MAGIC +# MAGIC Drop predicates are NULL-safe, so the clean and quarantine tables partition the +# MAGIC input exactly once (every raw row lands in exactly one of them). The README's +# MAGIC "The NULL trap" section explains why that matters. +# MAGIC +# MAGIC The source defaults to the public `samples.nyctaxi.trips`, which already +# MAGIC contains some invalid rows, so `quarantine_trips` is populated on the first +# MAGIC run. The source is read from the `quarantine.source` configuration, so a test +# MAGIC run can point it at a curated dataset (see the skill's self-verify reference). + +# COMMAND ---------- + +from pyspark import pipelines as dp +from pyspark.sql import SparkSession, functions as F + +from expectations import DROP_RULES, WARN_RULES, build_quarantine_expr + +spark = SparkSession.getActiveSession() + +# Catalog, schemas, and source come from the pipeline configuration (set by the +# DABs resource). The raw table lands in the bronze schema (also the pipeline's +# default schema); the clean and quarantine tables land in the silver schema. +# All three tables are named with fully qualified identifiers so the source +# reads consistently and the in-pipeline dependencies resolve unambiguously. +CATALOG = spark.conf.get("quarantine.catalog", "main") +BRONZE_SCHEMA = spark.conf.get("quarantine.bronze_schema", "bronze") +SILVER_SCHEMA = spark.conf.get("quarantine.silver_schema", "silver") +SOURCE_TABLE = spark.conf.get("quarantine.source", "samples.nyctaxi.trips") + +RAW_TRIPS = f"{CATALOG}.{BRONZE_SCHEMA}.raw_trips" +CLEAN_TRIPS = f"{CATALOG}.{SILVER_SCHEMA}.clean_trips" +QUARANTINE_TRIPS = f"{CATALOG}.{SILVER_SCHEMA}.quarantine_trips" + +# The full samples.nyctaxi.trips schema. +TRIP_COLUMNS = [ + "tpep_pickup_datetime", + "tpep_dropoff_datetime", + "trip_distance", + "fare_amount", + "pickup_zip", + "dropoff_zip", +] + +# Inverse of all drop predicates AND-ed together: true for a row that fails at +# least one critical expectation. +QUARANTINE_EXPR = build_quarantine_expr(DROP_RULES) + +# COMMAND ---------- +# MAGIC %md +# MAGIC ## Raw: ingest the source (bronze schema) + +# COMMAND ---------- + + +@dp.table( + name=RAW_TRIPS, + comment="Raw NYC taxi trips ingested from the configured source.", +) +def raw_trips(): + return ( + spark.readStream.table(SOURCE_TABLE) + .select(*TRIP_COLUMNS) + .withColumn("_ingested_at", F.current_timestamp()) + ) + + +# COMMAND ---------- +# MAGIC %md +# MAGIC ## Clean: valid rows (drop expectations enforced; warn rules logged) + +# COMMAND ---------- + + +@dp.table( + name=CLEAN_TRIPS, + comment="Valid trips: pass every critical (drop) expectation.", +) +@dp.expect_all(WARN_RULES) +@dp.expect_all_or_drop(DROP_RULES) +def clean_trips(): + return spark.readStream.table(RAW_TRIPS) + + +# COMMAND ---------- +# MAGIC %md +# MAGIC ## Quarantine: the inverse of clean (same source, opposite predicate) + +# COMMAND ---------- + + +@dp.table( + name=QUARANTINE_TRIPS, + comment="Invalid trips: fail at least one critical (drop) expectation.", +) +@dp.expect_all_or_drop({"is_quarantined": QUARANTINE_EXPR}) +def quarantine_trips(): + return spark.readStream.table(RAW_TRIPS).withColumn("_quarantined_at", F.current_timestamp()) diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/conftest.py b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/conftest.py new file mode 100644 index 0000000..e9ff8a8 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/conftest.py @@ -0,0 +1,57 @@ +"""Shared pytest fixtures for the quarantine-pattern unit tests. + +These tests run locally with a real Spark session, no Databricks workspace +required. The fixture and the UTC timestamp fix follow the standard PySpark +unit-testing setup for Lakeflow SDP transform code. +""" + +import datetime +import os +import sys +from pathlib import Path + +import pytest +from pyspark.sql import SparkSession +from pyspark.sql.types import TimestampType + +# Fix PySpark collect() returning local-timezone timestamps (SPARK-25244). +_original_fromInternal = TimestampType.fromInternal + + +def _utc_fromInternal(self, ts: int) -> datetime.datetime: + if ts is not None: + return datetime.datetime.fromtimestamp(ts // 1000000, tz=datetime.UTC).replace( + microsecond=ts % 1000000, tzinfo=None + ) + + +TimestampType.fromInternal = _utc_fromInternal + +os.environ["PYSPARK_PYTHON"] = sys.executable +os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable + +# The pipeline modules (expectations.py, expectations.json) live one level up, +# in the pipeline source directory. Put that on sys.path so tests import them +# the same way the pipeline does: `from expectations import ...`. +SRC_ROOT = Path(__file__).resolve().parent.parent +if str(SRC_ROOT) not in sys.path: + sys.path.insert(0, str(SRC_ROOT)) + + +@pytest.fixture(scope="session") +def spark(): + """Local Spark session with UTC timestamps for behavior-focused tests.""" + session = ( + SparkSession.builder.master("local[1]") + .appName("sdp-quarantine-pattern-tests") + .config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC") + .config("spark.executor.extraJavaOptions", "-Duser.timezone=UTC") + .config("spark.sql.session.timeZone", "UTC") + .config("spark.ui.enabled", "false") + .config("spark.ui.showConsoleProgress", "false") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") + .getOrCreate() + ) + session.sparkContext.setLogLevel("ERROR") + return session diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/requirements-test.txt b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/requirements-test.txt new file mode 100644 index 0000000..8559ce5 --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/requirements-test.txt @@ -0,0 +1,6 @@ +# Test-time dependencies for the quarantine-pattern unit tests. +# These are NOT needed to run the pipeline on Databricks; they let you verify +# the expectation rules locally (no workspace) with `pytest tests/`. +# Requires Java (for PySpark) and Python 3.10+. +pytest>=8.0 +pyspark>=3.5 diff --git a/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/test_expectations.py b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/test_expectations.py new file mode 100644 index 0000000..90d679c --- /dev/null +++ b/assets/sdp-quarantine-pattern/template/{{.target_dir}}/tests/test_expectations.py @@ -0,0 +1,160 @@ +"""Unit tests for the quarantine-pattern expectation rules. + +Two layers, both offline (no Databricks workspace): + +1. `TestExpectationsConfig` (pure Python): the rule config is well formed, every + drop predicate is NULL-safe, and the derived quarantine predicate is the + inverse of the clean predicate. + +2. `TestQuarantineRouting` (real local Spark): feed crafted edge rows through the + same drop predicates the pipeline uses and assert the partition invariant + (clean + quarantine == raw, disjoint) and that a NULL in a guarded column + routes to quarantine only. This is the offline proof of the NULL trap fix. + +The pipeline drops rows with `@dp.expect_all_or_drop`, which keeps a row unless a +predicate evaluates to `false` (a `NULL` predicate is treated as passing). The +SDP pipelines runtime is not importable offline, so these tests reproduce that +keep-on-NULL semantics with ` IS NOT FALSE`. For NULL-safe predicates +this is identical to the pipeline; for a predicate that is NOT NULL-safe it +exposes the leak as a broken partition, which is exactly what we want to catch. +""" + +import json +from pathlib import Path + +import pytest +from pyspark.sql.types import ( + DoubleType, + IntegerType, + StringType, + StructField, + StructType, + TimestampType, +) + +from expectations import ( + DROP_RULES, + WARN_RULES, + build_quarantine_expr, + build_silver_expr, +) + +CONFIG_PATH = Path(__file__).resolve().parent.parent / "expectations.json" + + +@pytest.fixture(scope="module") +def config(): + return json.loads(CONFIG_PATH.read_text(encoding="utf-8")) + + +class TestExpectationsConfig: + """Pure-Python validation of the rule config and derived predicates.""" + + def test_has_drop_and_warn_lists(self, config): + assert config["drop"], "drop rules must not be empty" + assert "warn" in config + + def test_each_rule_has_name_and_query(self, config): + for action in ("drop", "warn"): + for rule in config.get(action, []): + assert "name" in rule and "query" in rule, f"malformed {action} rule: {rule}" + + def test_every_drop_predicate_is_null_safe(self): + """The partition invariant depends on every drop predicate being total.""" + for name, predicate in DROP_RULES.items(): + assert "IS NOT NULL" in predicate.upper(), ( + f"drop rule {name!r} is not NULL-safe: {predicate!r}" + ) + + def test_quarantine_is_inverse_of_clean(self): + assert build_quarantine_expr(DROP_RULES) == f"NOT({build_silver_expr(DROP_RULES)})" + + def test_helper_rejects_empty_rules(self): + with pytest.raises(ValueError): + build_quarantine_expr({}) + + +# --- Routing semantics (real local Spark) ----------------------------------- + +# A row is kept unless a predicate is false; a NULL predicate passes. This +# mirrors @dp.expect_all_or_drop without the SDP runtime. +CLEAN_KEEP_EXPR = f"({build_silver_expr(DROP_RULES)}) IS NOT FALSE" +QUARANTINE_KEEP_EXPR = f"({build_quarantine_expr(DROP_RULES)}) IS NOT FALSE" + +_SCHEMA = StructType( + [ + StructField("tag", StringType(), True), + StructField("expected", StringType(), True), + StructField("tpep_pickup_datetime", TimestampType(), True), + StructField("tpep_dropoff_datetime", TimestampType(), True), + StructField("trip_distance", DoubleType(), True), + StructField("fare_amount", DoubleType(), True), + StructField("pickup_zip", IntegerType(), True), + StructField("dropoff_zip", IntegerType(), True), + ] +) + + +def _ts(text): + from datetime import datetime + + return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") + + +# tag, expected, pickup, dropoff, distance, fare, pickup_zip, dropoff_zip +_EDGE_ROWS = [ + ("valid", "clean", _ts("2016-02-14 10:00:00"), _ts("2016-02-14 10:20:00"), 3.5, 14.0, 10001, 10002), + ("warn_only", "clean", _ts("2016-02-14 12:00:00"), _ts("2016-02-14 12:40:00"), 8.0, 999.0, 10001, 10002), + ("neg_fare", "quarantine", _ts("2016-02-14 10:00:00"), _ts("2016-02-14 10:20:00"), 3.5, -14.0, 10001, 10002), + ("zero_distance", "quarantine", _ts("2016-02-14 10:00:00"), _ts("2016-02-14 10:20:00"), 0.0, 14.0, 10001, 10002), + ("dropoff_before_pickup", "quarantine", _ts("2016-02-14 11:00:00"), _ts("2016-02-14 10:30:00"), 3.5, 14.0, 10001, 10002), + ("multi_violation", "quarantine", _ts("2016-02-14 11:00:00"), _ts("2016-02-14 10:30:00"), 0.0, -5.0, 10001, 10002), + ("null_fare", "quarantine", _ts("2016-02-14 13:00:00"), _ts("2016-02-14 13:25:00"), 4.0, None, 10001, 10002), + ("null_pickup_ts", "quarantine", None, _ts("2016-02-14 13:25:00"), 4.0, 14.0, 10001, 10002), + ("null_pickup_zip", "quarantine", _ts("2016-02-14 13:00:00"), _ts("2016-02-14 13:25:00"), 4.0, 14.0, None, 10002), +] + + +class TestQuarantineRouting: + """The clean/quarantine split is a clean partition under SDP keep-on-NULL.""" + + @pytest.fixture(scope="class") + def routed(self, spark): + df = spark.createDataFrame(_EDGE_ROWS, _SCHEMA) + clean = df.filter(CLEAN_KEEP_EXPR) + quarantine = df.filter(QUARANTINE_KEEP_EXPR) + clean_tags = {r["tag"] for r in clean.select("tag").collect()} + quarantine_tags = {r["tag"] for r in quarantine.select("tag").collect()} + return df, clean, quarantine, clean_tags, quarantine_tags + + def test_partition_invariant(self, routed): + df, clean, quarantine, _, _ = routed + assert clean.count() + quarantine.count() == df.count() + + def test_clean_and_quarantine_disjoint(self, routed): + _, _, _, clean_tags, quarantine_tags = routed + assert clean_tags.isdisjoint(quarantine_tags) + + def test_each_row_routes_as_expected(self, routed): + df, _, _, clean_tags, quarantine_tags = routed + for row in df.collect(): + bucket = clean_tags if row["expected"] == "clean" else quarantine_tags + assert row["tag"] in bucket, f"{row['tag']} expected in {row['expected']}" + + def test_null_in_guarded_column_routes_to_quarantine_only(self, routed): + """The marquee: NULL-safe predicates keep NULLs out of the clean table.""" + _, _, _, clean_tags, quarantine_tags = routed + for null_tag in ("null_fare", "null_pickup_ts", "null_pickup_zip"): + assert null_tag in quarantine_tags + assert null_tag not in clean_tags + + def test_multi_violation_appears_once(self, routed): + _, _, quarantine, _, _ = routed + rows = [r for r in quarantine.select("tag").collect() if r["tag"] == "multi_violation"] + assert len(rows) == 1 + + def test_warn_only_row_stays_clean(self, routed): + """Warn rules do not route; a row that only trips a warn rule is clean.""" + _, _, _, clean_tags, _ = routed + assert "warn_only" in clean_tags + assert WARN_RULES, "this asset ships warn rules" diff --git a/pyproject.toml b/pyproject.toml index ee371d9..2e31dcd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ Changelog = "https://github.com/vmariiechko/databricks-bundle-template/blob/main [tool.ruff] builtins = ["spark", "dbutils", "display"] line-length = 100 -src = ["src", "tests"] +src = ["src", "tests", "assets/sdp-quarantine-pattern/template/{{.target_dir}}"] target-version = "py311" [tool.ruff.format] @@ -33,6 +33,9 @@ ignore = ["N812"] [tool.ruff.lint.per-file-ignores] # Apply these ignores ONLY to notebook-style scripts "sdp_reset_checkpoint_workspace.py" = ["E402", "E501"] +"*/tests/test_expectations.py" = ["E501"] +# PySpark TimestampType monkey-patch mirrors upstream mixedCase API (SPARK-25244). +"**/conftest.py" = ["N802", "N816"] [tool.ruff.lint.isort] diff --git a/tests/assets/test_sdp_quarantine_pattern.py b/tests/assets/test_sdp_quarantine_pattern.py new file mode 100644 index 0000000..c53df6e --- /dev/null +++ b/tests/assets/test_sdp_quarantine_pattern.py @@ -0,0 +1,334 @@ +"""Asset-specific tests for `assets/sdp-quarantine-pattern`. + +Framework-level smoke checks (no .tmpl leftovers, schema validity, etc.) live +in `tests/assets/test_framework.py`. This file asserts behavior unique to this +asset: + +- the exact files install at the chosen paths; +- the pipeline source is a Databricks notebook and parses as Python; +- the pure helper `expectations.py` builds the correct inverse predicate and + every drop predicate is NULL-safe (the partition invariant); +- the pipeline resource YAML is valid: medallion schema split, event-log block, + cost/trace tags, and the notebook library pointing at the installed source; +- a custom `pipeline_resource_key` / `target_dir` flows into the filename, the + DABs resource key, and the library path; +- the companion agent skill installs alongside the pipeline with well-formed + `SKILL.md` frontmatter and honors a custom `skill_dir`; +- the shipped Tier 1 unit-test files install and are structurally sound. + +The shipped unit tests themselves need PySpark + Java to execute and are run in +the user's bundle, not in this (pyspark-free) suite. Expectation firing, +quarantine routing, and the schema split are runtime properties verified live +on a workspace, not here. +""" + +import ast +import importlib.util +import re +from pathlib import Path + +import pytest +import yaml + +ASSET_NAME = "sdp-quarantine-pattern" +DEFAULT_CONFIG = "sdp_quarantine_pattern" +DEFAULT_TARGET_DIR = "src/sdp_quarantine_pattern" +DEFAULT_RESOURCE_KEY = "sdp_quarantine_pattern" +DEFAULT_PIPELINE_NAME = "SDP Quarantine Pattern Demo" +DEFAULT_CATALOG = "main" +DEFAULT_BRONZE_SCHEMA = "bronze" +DEFAULT_SILVER_SCHEMA = "silver" +DEFAULT_SKILL_DIR = ".agents" +EVENT_LOG_TABLE = "quarantine_pipeline_event_log" + +SKILL_BASE = f"{DEFAULT_SKILL_DIR}/skills/sdp-quarantine-pattern" +SKILL_FILES = ( + f"{SKILL_BASE}/SKILL.md", + f"{SKILL_BASE}/references/adapt-the-pattern.md", + f"{SKILL_BASE}/references/self-verify.md", +) + +UNIT_TEST_FILES = ( + f"{DEFAULT_TARGET_DIR}/tests/conftest.py", + f"{DEFAULT_TARGET_DIR}/tests/test_expectations.py", + f"{DEFAULT_TARGET_DIR}/tests/requirements-test.txt", +) + +EXPECTED_FILES = ( + f"{DEFAULT_TARGET_DIR}/quarantine_pipeline.py", + f"{DEFAULT_TARGET_DIR}/expectations.py", + f"{DEFAULT_TARGET_DIR}/expectations.json", + f"{DEFAULT_TARGET_DIR}/event_log_queries.sql", + f"resources/{DEFAULT_RESOURCE_KEY}.pipeline.yml", + "docs/sdp-quarantine-pattern/README.md", + *UNIT_TEST_FILES, + *SKILL_FILES, +) + + +def _resource_path(resource_key: str) -> str: + return f"resources/{resource_key}.pipeline.yml" + + +def _load_expectations_module(installed: Path, target_dir: str): + """Import the installed (pure) expectations.py by file path.""" + script = installed / target_dir / "expectations.py" + spec = importlib.util.spec_from_file_location("quarantine_expectations", script) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def installed(install_asset) -> Path: + """Install the asset once per module using the default config.""" + return install_asset(ASSET_NAME, config=DEFAULT_CONFIG) + + +def test_installs_expected_files(installed: Path): + for rel in EXPECTED_FILES: + assert (installed / rel).is_file(), f"missing expected file: {rel}" + + +def test_no_extra_files_installed(installed: Path): + installed_files = sorted( + p.relative_to(installed).as_posix() for p in installed.rglob("*") if p.is_file() + ) + assert installed_files == sorted(EXPECTED_FILES), ( + f"unexpected files: {set(installed_files) - set(EXPECTED_FILES)}" + ) + + +def test_pipeline_source_is_notebook_and_parses(installed: Path): + """The source is a Databricks notebook (header present) and valid Python. + + It can't be imported offline (no pyspark.pipelines), so we parse it. The + notebook markers (# Databricks notebook source, # COMMAND, # MAGIC) are all + comments, so ast.parse succeeds. + """ + src = (installed / DEFAULT_TARGET_DIR / "quarantine_pipeline.py").read_text(encoding="utf-8") + assert src.lstrip().startswith("# Databricks notebook source"), ( + "pipeline source must start with the Databricks notebook header" + ) + ast.parse(src) + + +def test_clean_and_quarantine_qualified_into_silver_schema(installed: Path): + """Clean and quarantine outputs must be fully qualified into the silver + schema (read from config), demonstrating medallion schema separation.""" + src = (installed / DEFAULT_TARGET_DIR / "quarantine_pipeline.py").read_text(encoding="utf-8") + assert 'spark.conf.get("quarantine.silver_schema"' in src + assert "CLEAN_TRIPS = f" in src and "clean_trips" in src + assert "QUARANTINE_TRIPS = f" in src and "quarantine_trips" in src + + +def test_helper_builds_inverse_expression(installed: Path): + mod = _load_expectations_module(installed, DEFAULT_TARGET_DIR) + expr = mod.build_quarantine_expr({"a": "a IS NOT NULL AND a > 0", "b": "b IS NOT NULL"}) + assert expr == "NOT((a IS NOT NULL AND a > 0) AND (b IS NOT NULL))" + + +def test_helper_builds_clean_expression(installed: Path): + mod = _load_expectations_module(installed, DEFAULT_TARGET_DIR) + expr = mod.build_silver_expr({"a": "a IS NOT NULL AND a > 0", "b": "b IS NOT NULL"}) + assert expr == "(a IS NOT NULL AND a > 0) AND (b IS NOT NULL)" + # quarantine is exactly the inverse of clean + assert mod.build_quarantine_expr(mod.DROP_RULES) == f"NOT({mod.build_silver_expr(mod.DROP_RULES)})" + + +def test_helper_rejects_empty_rules(installed: Path): + mod = _load_expectations_module(installed, DEFAULT_TARGET_DIR) + with pytest.raises(ValueError): + mod.build_quarantine_expr({}) + + +def test_drop_and_warn_rules_load(installed: Path): + mod = _load_expectations_module(installed, DEFAULT_TARGET_DIR) + assert set(mod.DROP_RULES) == { + "valid_fare", + "valid_distance", + "valid_pickup_ts", + "valid_dropoff_after_pickup", + "valid_pickup_zip", + } + assert "fare_within_expected" in mod.WARN_RULES + + +def test_every_drop_predicate_is_null_safe(installed: Path): + """The partition invariant: each drop predicate must be total (NULL-safe). + + Every drop predicate must be guarded by an `IS NOT NULL` so it never + evaluates to SQL NULL. + """ + mod = _load_expectations_module(installed, DEFAULT_TARGET_DIR) + for name, predicate in mod.DROP_RULES.items(): + assert re.search(r"IS NOT NULL", predicate, re.IGNORECASE), ( + f"drop rule {name!r} is not NULL-safe: {predicate!r}" + ) + + +def test_resource_yaml_is_valid_pipeline(installed: Path): + yaml_path = installed / _resource_path(DEFAULT_RESOURCE_KEY) + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + pipe = data["resources"]["pipelines"][DEFAULT_RESOURCE_KEY] + assert pipe["name"] == DEFAULT_PIPELINE_NAME + assert pipe["catalog"] == DEFAULT_CATALOG + assert pipe["schema"] == DEFAULT_BRONZE_SCHEMA # pipeline default = bronze + assert pipe["serverless"] is True + assert pipe["channel"] == "CURRENT" + + +def test_resource_publishes_event_log(installed: Path): + yaml_path = installed / _resource_path(DEFAULT_RESOURCE_KEY) + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + pipe = data["resources"]["pipelines"][DEFAULT_RESOURCE_KEY] + event_log = pipe["event_log"] + assert event_log["catalog"] == DEFAULT_CATALOG + assert event_log["schema"] == DEFAULT_BRONZE_SCHEMA + assert event_log["name"] == EVENT_LOG_TABLE + + +def test_resource_has_trace_tags(installed: Path): + yaml_path = installed / _resource_path(DEFAULT_RESOURCE_KEY) + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + pipe = data["resources"]["pipelines"][DEFAULT_RESOURCE_KEY] + tags = pipe["tags"] + assert tags["created_by"] == "dabs-asset" + assert tags["asset"] == "sdp-quarantine-pattern" + + +def test_resource_library_points_at_installed_notebook(installed: Path): + yaml_path = installed / _resource_path(DEFAULT_RESOURCE_KEY) + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + pipe = data["resources"]["pipelines"][DEFAULT_RESOURCE_KEY] + nb_paths = [lib["notebook"]["path"] for lib in pipe["libraries"] if "notebook" in lib] + assert nb_paths == [f"../{DEFAULT_TARGET_DIR}/quarantine_pipeline.py"], ( + f"library path should point at the installed notebook via ../: {nb_paths}" + ) + + +def test_resource_passes_catalog_and_silver_schema_config(installed: Path): + yaml_path = installed / _resource_path(DEFAULT_RESOURCE_KEY) + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + pipe = data["resources"]["pipelines"][DEFAULT_RESOURCE_KEY] + config = pipe["configuration"] + assert config["quarantine.catalog"] == DEFAULT_CATALOG + assert config["quarantine.bronze_schema"] == DEFAULT_BRONZE_SCHEMA + assert config["quarantine.silver_schema"] == DEFAULT_SILVER_SCHEMA + # Source is parameterized so a test run can point it at a curated table. + assert config["quarantine.source"] == "samples.nyctaxi.trips" + + +def test_all_three_tables_fully_qualified(installed: Path): + """Raw, clean, and quarantine are all named with FQN constants, the raw + table is read from config, and the source is parameterized (carry-in: + consistent naming + dependency resolution + Tier 3 testability).""" + src = (installed / DEFAULT_TARGET_DIR / "quarantine_pipeline.py").read_text(encoding="utf-8") + assert 'spark.conf.get("quarantine.bronze_schema"' in src + assert 'spark.conf.get("quarantine.source"' in src + assert "RAW_TRIPS = f" in src and "raw_trips" in src + # The raw table is registered and read by its FQN constant, not unqualified. + assert "name=RAW_TRIPS," in src + assert "spark.readStream.table(RAW_TRIPS)" in src + assert 'spark.readStream.table("raw_trips")' not in src + + +def test_event_log_query_targets_published_table(installed: Path): + """The parsing query must point at the published event log table FQN.""" + sql = (installed / DEFAULT_TARGET_DIR / "event_log_queries.sql").read_text(encoding="utf-8") + assert f"{DEFAULT_CATALOG}.{DEFAULT_BRONZE_SCHEMA}.{EVENT_LOG_TABLE}" in sql + assert "data_quality.expectations" in sql + + +def test_custom_resource_key_and_schemas_flow_through(install_asset): + """A user-supplied resource key, target_dir, and schemas must flow into the + filename, the DABs resource key, the library path, and the config.""" + custom_key = "trips_quarantine_x" + custom_target = "pipelines/quarantine_demo" + out = install_asset( + ASSET_NAME, + overrides={ + "target_dir": custom_target, + "pipeline_resource_key": custom_key, + "pipeline_name": "Trips Quarantine X", + "catalog": "sandbox", + "bronze_schema": "raw", + "silver_schema": "clean", + }, + ) + + yaml_path = out / _resource_path(custom_key) + assert yaml_path.is_file(), f"missing renamed resource YAML: {yaml_path}" + assert (out / custom_target / "quarantine_pipeline.py").is_file() + + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + pipe = data["resources"]["pipelines"][custom_key] + assert pipe["name"] == "Trips Quarantine X" + assert pipe["catalog"] == "sandbox" + assert pipe["schema"] == "raw" + assert pipe["event_log"]["schema"] == "raw" + assert pipe["event_log"]["name"] == EVENT_LOG_TABLE + assert pipe["configuration"]["quarantine.bronze_schema"] == "raw" + assert pipe["configuration"]["quarantine.silver_schema"] == "clean" + nb_paths = [lib["notebook"]["path"] for lib in pipe["libraries"] if "notebook" in lib] + assert nb_paths == [f"../{custom_target}/quarantine_pipeline.py"] + + +# --- Companion agent skill -------------------------------------------------- + + +def test_skill_files_install(installed: Path): + for rel in SKILL_FILES: + assert (installed / rel).is_file(), f"missing skill file: {rel}" + + +def test_skill_frontmatter_well_formed(installed: Path): + """SKILL.md must open with YAML frontmatter declaring name and description.""" + skill = installed / SKILL_BASE / "SKILL.md" + text = skill.read_text(encoding="utf-8") + assert text.startswith("---\n"), "SKILL.md missing opening frontmatter delimiter" + end = text.find("\n---", 4) + assert end != -1, "SKILL.md missing closing frontmatter delimiter" + front = yaml.safe_load(text[4:end]) + assert front["name"] == "sdp-quarantine-pattern" + assert isinstance(front.get("description"), str) and front["description"].strip() + + +def test_skill_points_at_reference_pipeline(installed: Path): + """The templated SKILL.md must reference the worked example at the chosen + target_dir so the agent can find it.""" + skill = (installed / SKILL_BASE / "SKILL.md").read_text(encoding="utf-8") + assert f"{DEFAULT_TARGET_DIR}/quarantine_pipeline.py" in skill + # Read-only access stays runtime-neutral: no mandated tool in the skill body. + assert "whatever read-only query capability" in skill + + +def test_custom_skill_dir(install_asset): + """A user-provided skill_dir is honored for the skill install path.""" + out = install_asset(ASSET_NAME, overrides={"skill_dir": ".claude"}) + base = out / ".claude" / "skills" / "sdp-quarantine-pattern" + assert (base / "SKILL.md").is_file() + assert (base / "references" / "adapt-the-pattern.md").is_file() + assert (base / "references" / "self-verify.md").is_file() + + +# --- Tier 1 unit-test scaffold ---------------------------------------------- + + +def test_unit_test_files_install(installed: Path): + for rel in UNIT_TEST_FILES: + assert (installed / rel).is_file(), f"missing unit-test file: {rel}" + + +def test_unit_tests_are_structurally_sound(installed: Path): + """The shipped tests parse, provide a `spark` fixture, and import the rules + from the sibling pipeline module (so they exercise the real predicates).""" + tests_dir = installed / DEFAULT_TARGET_DIR / "tests" + conftest = (tests_dir / "conftest.py").read_text(encoding="utf-8") + test_mod = (tests_dir / "test_expectations.py").read_text(encoding="utf-8") + ast.parse(conftest) + ast.parse(test_mod) + assert "def spark(" in conftest, "conftest must define a `spark` fixture" + assert "from expectations import" in test_mod + # The routing test models expect_all_or_drop keep-on-NULL semantics. + assert "IS NOT FALSE" in test_mod diff --git a/tests/configs/assets/sdp_quarantine_pattern.json b/tests/configs/assets/sdp_quarantine_pattern.json new file mode 100644 index 0000000..10161eb --- /dev/null +++ b/tests/configs/assets/sdp_quarantine_pattern.json @@ -0,0 +1,9 @@ +{ + "target_dir": "src/sdp_quarantine_pattern", + "pipeline_resource_key": "sdp_quarantine_pattern", + "pipeline_name": "SDP Quarantine Pattern Demo", + "catalog": "main", + "bronze_schema": "bronze", + "silver_schema": "silver", + "skill_dir": ".agents" +}