Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions .github/workflows/data-health-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
name: Data Health Check

on:
schedule:
- cron: "30 1 * * *"
workflow_dispatch:

permissions:
contents: read
actions: read

jobs:
data-health-check:
environment: Heroku-DB-Backup
runs-on: ubuntu-latest
env:
HEROKU_APP_NAME: opencreorg
services:
postgres:
image: postgres:16
env:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_HOST_AUTH_METHOD: trust
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y postgresql-client
python -m pip install --upgrade pip psycopg2-binary

- name: Install Heroku CLI
run: curl https://cli-assets.heroku.com/install-ubuntu.sh | sh

- name: Download known-good backup artifact
env:
GH_TOKEN: ${{ github.token }}
run: |
set -euo pipefail
mkdir -p known-good
run_id="$(gh run list --repo OWASP/OpenCRE --workflow backup.yml --branch main --status success --limit 1 --json databaseId --jq '.[0].databaseId')"
if [ -z "${run_id}" ] || [ "${run_id}" = "null" ]; then
echo "Could not find successful backup workflow runs on main."
exit 1
fi

gh run download "${run_id}" --repo OWASP/OpenCRE --name opencreorg_db_backup --dir known-good
known_good_dump="$(find known-good -maxdepth 1 -name '*.dump' | head -n 1)"
if [ -z "${known_good_dump}" ]; then
echo "No .dump file found in opencreorg_db_backup artifact."
exit 1
fi
echo "KNOWN_GOOD_DUMP=${known_good_dump}" >> "${GITHUB_ENV}"

- name: Download current Heroku backup
env:
HEROKU_API_KEY: ${{ secrets.HEROKU_API_KEY }}
run: |
set -euo pipefail
heroku pg:backups:capture -a "${HEROKU_APP_NAME}"
heroku pg:backups:download -a "${HEROKU_APP_NAME}" --output=current.dump
echo "CURRENT_DUMP=${PWD}/current.dump" >> "${GITHUB_ENV}"

- name: Restore dumps to local postgres
run: |
set -euo pipefail
psql -h localhost -U postgres -d postgres -c "DROP DATABASE IF EXISTS opencre_known_good;"
psql -h localhost -U postgres -d postgres -c "DROP DATABASE IF EXISTS opencre_current;"
psql -h localhost -U postgres -d postgres -c "CREATE DATABASE opencre_known_good;"
psql -h localhost -U postgres -d postgres -c "CREATE DATABASE opencre_current;"

pg_restore --clean --if-exists --no-owner --no-privileges -h localhost -U postgres -d opencre_known_good "${KNOWN_GOOD_DUMP}"
pg_restore --clean --if-exists --no-owner --no-privileges -h localhost -U postgres -d opencre_current "${CURRENT_DUMP}"

- name: Compare datasets
run: |
python scripts/check_data_health.py \
--db1-url "postgresql://postgres@localhost:5432/opencre_known_good" \
--db2-url "postgresql://postgres@localhost:5432/opencre_current" \
--db1-label "known-good" \
--db2-label "heroku-current"
90 changes: 90 additions & 0 deletions application/tests/data_health_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import unittest

from application.utils import data_health


class TestDataHealth(unittest.TestCase):
def _dataset(self, cre_id: str, child_id: str, node_id: str):
return {
"cre": [
{
"id": cre_id,
"external_id": "100-100",
"name": "Authentication",
"description": "Base auth requirement",
"tags": "auth,session",
},
{
"id": child_id,
"external_id": "100-101",
"name": "Session timeout",
"description": "Timeout policy",
"tags": "session",
},
],
"node": [
{
"id": node_id,
"name": "ASVS",
"section": "V2",
"subsection": "2.1.1",
"section_id": "ASVS-V2-2.1.1",
"version": "4.0",
"description": "ASVS mapping entry",
"tags": "asvs",
"ntype": "Standard",
"link": "https://example.com",
}
],
"cre_links": [
{
"type": "Contains",
"group": cre_id,
"cre": child_id,
}
],
"cre_node_links": [
{
"type": "Linked To",
"cre": child_id,
"node": node_id,
}
],
}

def test_equivalent_when_only_internal_ids_differ(self):
left_rows = self._dataset("cre-id-1", "cre-id-2", "node-id-1")
right_rows = self._dataset("cre-id-a", "cre-id-b", "node-id-a")

left = data_health.build_canonical_snapshot(left_rows)
right = data_health.build_canonical_snapshot(right_rows)

self.assertEqual(
data_health.snapshot_digest(left), data_health.snapshot_digest(right)
)
self.assertEqual(data_health.snapshot_diff(left, right), {})

def test_detects_data_change(self):
left_rows = self._dataset("cre-id-1", "cre-id-2", "node-id-1")
right_rows = self._dataset("cre-id-a", "cre-id-b", "node-id-a")
right_rows["node"][0]["description"] = "Changed description"

left = data_health.build_canonical_snapshot(left_rows)
right = data_health.build_canonical_snapshot(right_rows)

self.assertNotEqual(
data_health.snapshot_digest(left), data_health.snapshot_digest(right)
)
diff = data_health.snapshot_diff(left, right)
self.assertIn("node", diff)

def test_raises_on_missing_foreign_key_target(self):
rows = self._dataset("cre-id-1", "cre-id-2", "node-id-1")
rows["cre_links"][0]["cre"] = "unknown-cre-id"

with self.assertRaises(ValueError):
data_health.build_canonical_snapshot(rows)


if __name__ == "__main__":
unittest.main()
127 changes: 127 additions & 0 deletions application/utils/data_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import hashlib
import json
from typing import Any, Dict, List, Mapping, Sequence, Tuple


Snapshot = Dict[str, List[Tuple[Any, ...]]]

REQUIRED_TABLES = ("cre", "node", "cre_links", "cre_node_links")


def _normalize(value: Any) -> str:
if value is None:
return ""
return str(value)


def _cre_key(row: Mapping[str, Any]) -> Tuple[str, ...]:
return (
_normalize(row.get("external_id")),
_normalize(row.get("name")),
_normalize(row.get("description")),
_normalize(row.get("tags")),
)


def _node_key(row: Mapping[str, Any]) -> Tuple[str, ...]:
return (
_normalize(row.get("name")),
_normalize(row.get("section")),
_normalize(row.get("subsection")),
_normalize(row.get("section_id")),
_normalize(row.get("version")),
_normalize(row.get("description")),
_normalize(row.get("tags")),
_normalize(row.get("ntype")),
_normalize(row.get("link")),
)


def _validate_table_presence(rows: Mapping[str, Sequence[Mapping[str, Any]]]) -> None:
missing = [table for table in REQUIRED_TABLES if table not in rows]
if missing:
raise ValueError(f"Missing required tables: {missing}")


def build_canonical_snapshot(
rows: Mapping[str, Sequence[Mapping[str, Any]]],
) -> Snapshot:
_validate_table_presence(rows)

cre_id_to_key: Dict[str, Tuple[str, ...]] = {}
node_id_to_key: Dict[str, Tuple[str, ...]] = {}

cre_rows: List[Tuple[str, ...]] = []
node_rows: List[Tuple[str, ...]] = []
cre_links_rows: List[Tuple[Any, ...]] = []
cre_node_links_rows: List[Tuple[Any, ...]] = []

for row in rows["cre"]:
key = _cre_key(row)
row_id = _normalize(row.get("id"))
cre_id_to_key[row_id] = key
cre_rows.append(key)

for row in rows["node"]:
key = _node_key(row)
row_id = _normalize(row.get("id"))
node_id_to_key[row_id] = key
node_rows.append(key)

for row in rows["cre_links"]:
group_id = _normalize(row.get("group"))
cre_id = _normalize(row.get("cre"))
if group_id not in cre_id_to_key or cre_id not in cre_id_to_key:
raise ValueError(
f"cre_links contains unknown IDs: group={group_id}, cre={cre_id}"
)
cre_links_rows.append(
(
_normalize(row.get("type")),
cre_id_to_key[group_id],
cre_id_to_key[cre_id],
)
)

for row in rows["cre_node_links"]:
cre_id = _normalize(row.get("cre"))
node_id = _normalize(row.get("node"))
if cre_id not in cre_id_to_key or node_id not in node_id_to_key:
raise ValueError(
f"cre_node_links contains unknown IDs: cre={cre_id}, node={node_id}"
)
cre_node_links_rows.append(
(
_normalize(row.get("type")),
cre_id_to_key[cre_id],
node_id_to_key[node_id],
)
)

snapshot: Snapshot = {
"cre": sorted(cre_rows),
"node": sorted(node_rows),
"cre_links": sorted(cre_links_rows),
"cre_node_links": sorted(cre_node_links_rows),
}
return snapshot


def snapshot_digest(snapshot: Snapshot) -> str:
payload = json.dumps(snapshot, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()


def snapshot_diff(expected: Snapshot, actual: Snapshot) -> Dict[str, Dict[str, Any]]:
diff: Dict[str, Dict[str, Any]] = {}
for table in REQUIRED_TABLES:
missing = sorted(set(expected.get(table, [])) - set(actual.get(table, [])))
extra = sorted(set(actual.get(table, [])) - set(expected.get(table, [])))
if missing or extra:
diff[table] = {
"missing_count": len(missing),
"extra_count": len(extra),
"missing_sample": missing[:3],
"extra_sample": extra[:3],
}
return diff
Loading
Loading