diff --git a/scripts/validate_geozarr.py b/scripts/validate_geozarr.py new file mode 100644 index 0000000..bb90319 --- /dev/null +++ b/scripts/validate_geozarr.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +"""Validate GeoZarr compliance and generate quality metrics. + +Validates: +- GeoZarr spec 0.4 compliance (via eopf-geozarr CLI) +- STAC item spec compliance (via pystac) +- TileMatrixSet OGC compliance (via morecantile) +- CF-conventions compliance (via cf-xarray) +""" + +from __future__ import annotations + +import argparse +import json +import logging +import subprocess +import sys +from datetime import UTC, datetime +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s") +logger = logging.getLogger(__name__) + + +def validate_geozarr(dataset_path: str, verbose: bool = False) -> dict: + """Run eopf-geozarr validate and parse results. + + Returns: + dict with validation status and any errors/warnings + """ + logger.info(f"Validating: {dataset_path}") + + cmd = ["eopf-geozarr", "validate", dataset_path] + if verbose: + cmd.append("--verbose") + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout + ) + + validation_result = { + "valid": result.returncode == 0, + "exit_code": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + } + + if result.returncode == 0: + logger.info("✅ Validation passed") + else: + logger.error(f"❌ Validation failed (exit code {result.returncode})") + if result.stderr: + logger.error(f"Errors:\n{result.stderr}") + + return validation_result + + except subprocess.TimeoutExpired: + logger.error("❌ Validation timeout (>5 minutes)") + return { + "valid": False, + "exit_code": -1, + "error": "Validation timeout", + } + except Exception as e: + logger.error(f"❌ Validation error: {e}") + return { + "valid": False, + "exit_code": -1, + "error": str(e), + } + + +def validate_stac_item(item_path: str | Path) -> dict: + """Validate STAC item against spec. + + Args: + item_path: Path to STAC item JSON file + + Returns: + dict with validation status + """ + try: + import pystac + + logger.info(f"Validating STAC item: {item_path}") + item = pystac.Item.from_file(str(item_path)) + item.validate() + + logger.info("✅ STAC item valid") + return {"valid": True, "item_id": item.id, "collection": item.collection_id} + + except Exception as e: + logger.error(f"❌ STAC validation failed: {e}") + return {"valid": False, "error": str(e)} + + +def validate_tile_matrix_set(zarr_path: str) -> dict: + """Validate TileMatrixSet against OGC spec. + + Args: + zarr_path: Path to GeoZarr dataset + + Returns: + dict with validation status + """ + try: + import zarr + from morecantile import TileMatrixSet + + logger.info("Validating TileMatrixSet...") + store = zarr.open(zarr_path, mode="r") + attrs = store.attrs.asdict() + + if "tile_matrix_set" not in attrs: + logger.warning("⚠️ No tile_matrix_set found in attributes") + return {"valid": False, "error": "Missing tile_matrix_set attribute"} + + # Parse and validate TMS + tms = TileMatrixSet(**attrs["tile_matrix_set"]) + # morecantile validates on instantiation + + logger.info("✅ TileMatrixSet valid") + return { + "valid": True, + "tms_id": tms.id, + "crs": str(tms.crs), + "num_levels": len(tms.tileMatrices), + } + + except Exception as e: + logger.error(f"❌ TMS validation failed: {e}") + return {"valid": False, "error": str(e)} + + +def validate_cf_conventions(zarr_path: str) -> dict: + """Validate CF-conventions compliance. + + Args: + zarr_path: Path to GeoZarr dataset + + Returns: + dict with validation status + """ + try: + import cf_xarray # noqa: F401 + import xarray as xr + + logger.info("Validating CF-conventions...") + ds = xr.open_zarr(zarr_path, consolidated=False) + + # Attempt CF decoding (raises if non-compliant) + ds.cf.decode() + + # Check for required CF attributes + issues = [] + for var_name in ds.data_vars: + var = ds[var_name] + if "standard_name" not in var.attrs and "long_name" not in var.attrs: + issues.append(f"Variable {var_name} missing standard_name/long_name") + + if issues: + logger.warning(f"⚠️ CF compliance warnings: {len(issues)}") + for issue in issues[:5]: # Show first 5 + logger.warning(f" - {issue}") + return {"valid": True, "warnings": issues} + + logger.info("✅ CF-conventions valid") + return {"valid": True} + + except Exception as e: + logger.error(f"❌ CF validation failed: {e}") + return {"valid": False, "error": str(e)} + + +def main() -> None: + parser = argparse.ArgumentParser(description="Validate GeoZarr compliance") + parser.add_argument("dataset_path", help="Path to GeoZarr dataset (S3 or local)") + parser.add_argument("--item-id", help="STAC item ID for tracking") + parser.add_argument("--stac-item", help="Path to STAC item JSON for validation") + parser.add_argument("--output", help="Output JSON file path") + parser.add_argument("--skip-cf", action="store_true", help="Skip CF-conventions check") + parser.add_argument("--skip-tms", action="store_true", help="Skip TileMatrixSet check") + parser.add_argument("--verbose", action="store_true", help="Verbose validation output") + args = parser.parse_args() + + # Run all validations + validations = {} + + # 1. GeoZarr spec compliance (via eopf-geozarr CLI) + validations["geozarr"] = validate_geozarr(args.dataset_path, args.verbose) + + # 2. STAC item validation (if provided) + if args.stac_item: + validations["stac_item"] = validate_stac_item(args.stac_item) + + # 3. TileMatrixSet validation + if not args.skip_tms: + validations["tile_matrix_set"] = validate_tile_matrix_set(args.dataset_path) + + # 4. CF-conventions validation + if not args.skip_cf: + validations["cf_conventions"] = validate_cf_conventions(args.dataset_path) + + # Determine overall validity + all_valid = all(v.get("valid", False) for v in validations.values()) + + # Build complete result + result = { + "timestamp": datetime.now(UTC).isoformat(), + "dataset_path": args.dataset_path, + "item_id": args.item_id, + "valid": all_valid, + "validations": validations, + } + + # Write to file if requested + if args.output: + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + json.dump(result, f, indent=2) + logger.info(f"Results written to: {output_path}") + + # Print summary + logger.info("\n" + "=" * 60) + logger.info(f"Dataset: {args.dataset_path}") + logger.info(f"Overall Valid: {all_valid}") + for check_name, check_result in validations.items(): + status = "✅" if check_result.get("valid") else "❌" + logger.info(f" {status} {check_name}: {check_result.get('valid')}") + if args.item_id: + logger.info(f"Item ID: {args.item_id}") + logger.info("=" * 60 + "\n") + + # Output JSON for workflow + print(json.dumps(result, indent=2)) + + # Exit with validation status + sys.exit(0 if all_valid else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..10b8239 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,23 @@ +"""Pytest configuration tweaks for GeoZarr validation suite.""" + +import os +import sys +from types import SimpleNamespace + +import s3fs + +# Force s3fs to remain synchronous during tests to avoid background tasks. +os.environ.setdefault("S3FS_ASYNC", "0") +s3fs.core.ASYNC_ENABLED = False + + +def _suppress_async_unraisable(unraisable: SimpleNamespace) -> None: + """Ignore noisy async teardown errors emitted by s3fs/zarr.""" + exc = unraisable.exc_value + obj_repr = repr(unraisable.object) + if "StorePath.get" in obj_repr and "_context" in str(exc): + return + sys.__unraisablehook__(unraisable) + + +sys.unraisablehook = _suppress_async_unraisable diff --git a/tests/unit/test_convert.py b/tests/unit/test_convert.py new file mode 100644 index 0000000..87322b0 --- /dev/null +++ b/tests/unit/test_convert.py @@ -0,0 +1,285 @@ +"""Unit tests for convert.py helpers and CLI.""" + +from types import SimpleNamespace + +import pytest + +import scripts.convert as convert_module +from scripts.convert import CONFIGS, get_config + + +class TestConversionConfigs: + """Validate mission configuration dictionaries.""" + + def test_sentinel2_config_groups(self) -> None: + """Sentinel-2 config exposes all expected reflectance groups.""" + s2 = CONFIGS["sentinel-2"] + assert "/measurements/reflectance/r10m" in s2["groups"] + assert "/measurements/reflectance/r20m" in s2["groups"] + assert "/measurements/reflectance/r60m" in s2["groups"] + assert "/quality/l2a_quicklook/r10m" in s2["groups"] + assert s2["crs_groups"] == ["/conditions/geometry"] + assert s2["spatial_chunk"] == 1024 + assert s2["tile_width"] == 256 + assert s2["enable_sharding"] is True + + def test_sentinel1_config_groups(self) -> None: + """Sentinel-1 config exposes VH configuration.""" + s1 = CONFIGS["sentinel-1"] + assert s1["groups"] == ["/measurements"] + assert s1["crs_groups"] == ["/conditions/gcp"] + assert s1["spatial_chunk"] == 4096 + assert s1["tile_width"] == 512 + assert s1["enable_sharding"] is False + + def test_config_key_consistency(self) -> None: + """All configs share the same key set.""" + expected = { + "groups", + "crs_groups", + "spatial_chunk", + "tile_width", + "enable_sharding", + } + for name, config in CONFIGS.items(): + assert set(config) == expected, f"{name} missing expected keys" + + def test_get_config_defaults_to_s2(self) -> None: + """Unknown collection IDs fall back to Sentinel-2.""" + result = get_config("unknown-collection") + assert result == CONFIGS["sentinel-2"] + assert result is not CONFIGS["sentinel-2"] # defensive copy + + def test_get_config_matches_prefix(self) -> None: + """Prefix detection pulls Sentinel-1 config from collection id.""" + result = get_config("sentinel-1-grd") + assert result == CONFIGS["sentinel-1"] + assert result is not CONFIGS["sentinel-1"] + + +class FakeHttpResponse: + """Minimal httpx.Response replacement.""" + + def __init__(self, payload: dict): + self._payload = payload + + def raise_for_status(self) -> "FakeHttpResponse": + return self + + def json(self) -> dict: + return self._payload + + +class FakeHttpClient: + """Minimal httpx.Client stand-in for deterministic responses.""" + + def __init__(self, payload: dict): + self._payload = payload + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, url: str) -> FakeHttpResponse: + return FakeHttpResponse(self._payload) + + +def test_get_zarr_url_prefers_product(monkeypatch: pytest.MonkeyPatch) -> None: + """Product asset takes precedence when present.""" + payload = {"assets": {"product": {"href": "s3://bucket/product.zarr"}}} + monkeypatch.setattr(convert_module.httpx, "Client", lambda *a, **k: FakeHttpClient(payload)) + + result = convert_module.get_zarr_url("https://example/items/test") + + assert result == "s3://bucket/product.zarr" + + +def test_get_zarr_url_falls_back_to_any_zarr(monkeypatch: pytest.MonkeyPatch) -> None: + """Falls back to the first asset containing .zarr in href.""" + payload = {"assets": {"other": {"href": "https://foo/bar/data.zarr"}}} + monkeypatch.setattr(convert_module.httpx, "Client", lambda *a, **k: FakeHttpClient(payload)) + + result = convert_module.get_zarr_url("https://example/items/test") + + assert result == "https://foo/bar/data.zarr" + + +def test_get_zarr_url_raises_when_missing(monkeypatch: pytest.MonkeyPatch) -> None: + """Raises a RuntimeError when no Zarr asset is found.""" + payload = {"assets": {"foo": {"href": "https://foo/bar.tif"}}} + monkeypatch.setattr(convert_module.httpx, "Client", lambda *a, **k: FakeHttpClient(payload)) + + with pytest.raises(RuntimeError): + convert_module.get_zarr_url("https://example/items/test") + + +def test_run_conversion_with_overrides(monkeypatch: pytest.MonkeyPatch) -> None: + """run_conversion applies overrides and orchestrates conversion helpers.""" + + calls = {} + monkeypatch.setattr(convert_module, "get_zarr_url", lambda url: "s3://source/input.zarr") + + class FakeFS: + def __init__(self) -> None: + self.rm_calls = [] + + def rm(self, path: str, recursive: bool = False) -> None: + self.rm_calls.append((path, recursive)) + raise FileNotFoundError() + + fake_fs = FakeFS() + monkeypatch.setattr(convert_module.fsspec, "filesystem", lambda *a, **k: fake_fs) + monkeypatch.setattr(convert_module, "get_storage_options", lambda url: {"token": "abc"}) + + fake_dt = SimpleNamespace(children={"a": object(), "b": object()}) + monkeypatch.setattr( + convert_module.xr, + "open_datatree", + lambda *a, **k: fake_dt, + ) + + def record_create(**kwargs): + calls.update(kwargs) + + monkeypatch.setattr(convert_module, "create_geozarr_dataset", record_create) + + result = convert_module.run_conversion( + source_url="https://example/stac/collections/foo/items/S2_TEST", + collection="sentinel-2-l2a", + s3_output_bucket="out-bucket", + s3_output_prefix="out-prefix", + groups="/foo,/bar", + spatial_chunk=2048, + tile_width=128, + enable_sharding=False, + ) + + expected_output = "s3://out-bucket/out-prefix/sentinel-2-l2a/S2_TEST.zarr" + assert result == expected_output + assert fake_fs.rm_calls == [(expected_output, True)] + assert calls["dt_input"] is fake_dt + assert calls["groups"] == ["/foo", "/bar"] + assert calls["spatial_chunk"] == 2048 + assert calls["tile_width"] == 128 + assert calls["enable_sharding"] is False + assert calls["output_path"] == expected_output + + +def test_run_conversion_skips_stac_lookup_for_direct_zarr(monkeypatch: pytest.MonkeyPatch) -> None: + """Direct Zarr URLs bypass STAC lookup and reuse default config.""" + + called = False + + def fail_call(url: str) -> str: + nonlocal called + called = True + return "should-not-be-used" + + monkeypatch.setattr(convert_module, "get_zarr_url", fail_call) + + class CleanFS: + def __init__(self) -> None: + self.rm_calls = [] + + def rm(self, path: str, recursive: bool = False) -> None: + self.rm_calls.append((path, recursive)) + + clean_fs = CleanFS() + monkeypatch.setattr(convert_module.fsspec, "filesystem", lambda *a, **k: clean_fs) + monkeypatch.setattr(convert_module, "get_storage_options", lambda url: {}) + + fake_dt = SimpleNamespace(children={}) + monkeypatch.setattr(convert_module.xr, "open_datatree", lambda *a, **k: fake_dt) + + create_kwargs = {} + + def capture_create(**kwargs): + create_kwargs.update(kwargs) + + monkeypatch.setattr(convert_module, "create_geozarr_dataset", capture_create) + + result = convert_module.run_conversion( + source_url="s3://input/data.zarr", + collection="sentinel-1-grd", + s3_output_bucket="bucket", + s3_output_prefix="prefix", + ) + + expected_output = "s3://bucket/prefix/sentinel-1-grd/data.zarr.zarr" + assert result == expected_output + assert not called + assert clean_fs.rm_calls == [(expected_output, True)] + assert create_kwargs["groups"] == CONFIGS["sentinel-1"]["groups"] + assert create_kwargs["crs_groups"] == CONFIGS["sentinel-1"]["crs_groups"] + + +def test_run_conversion_warns_on_cleanup_errors(monkeypatch: pytest.MonkeyPatch) -> None: + """Non-FileNotFound cleanup errors do not abort conversion.""" + + monkeypatch.setattr(convert_module, "get_zarr_url", lambda url: "s3://source/input.zarr") + + class ErrorFS: + def rm(self, path: str, recursive: bool = False) -> None: + raise RuntimeError("boom") + + monkeypatch.setattr(convert_module.fsspec, "filesystem", lambda *a, **k: ErrorFS()) + monkeypatch.setattr(convert_module, "get_storage_options", lambda url: {}) + + fake_dt = SimpleNamespace(children={}) + monkeypatch.setattr(convert_module.xr, "open_datatree", lambda *a, **k: fake_dt) + monkeypatch.setattr(convert_module, "create_geozarr_dataset", lambda **kwargs: None) + + result = convert_module.run_conversion( + source_url="https://example/stac/items/S2_TEST", + collection="sentinel-2-l2a", + s3_output_bucket="bucket", + s3_output_prefix="prefix", + ) + + assert result.endswith("/S2_TEST.zarr") + + +def test_convert_main_invokes_run(monkeypatch: pytest.MonkeyPatch) -> None: + """CLI main function forwards arguments into run_conversion.""" + + parsed = SimpleNamespace( + source_url="src", + collection="sentinel-2", + s3_output_bucket="bucket", + s3_output_prefix="prefix", + groups=None, + spatial_chunk=None, + tile_width=None, + enable_sharding=False, + ) + + class FakeParser: + def add_argument(self, *args, **kwargs): + return None + + def parse_args(self): + return parsed + + monkeypatch.setattr(convert_module.argparse, "ArgumentParser", lambda *a, **k: FakeParser()) + + received = {} + + def capture_run(**kwargs): + received.update(kwargs) + + monkeypatch.setattr(convert_module, "run_conversion", capture_run) + + convert_module.main() + + assert received == { + "source_url": "src", + "collection": "sentinel-2", + "s3_output_bucket": "bucket", + "s3_output_prefix": "prefix", + "groups": None, + "spatial_chunk": None, + "tile_width": None, + "enable_sharding": False, + } diff --git a/tests/unit/test_register.py b/tests/unit/test_register.py new file mode 100644 index 0000000..2a8bfe0 --- /dev/null +++ b/tests/unit/test_register.py @@ -0,0 +1,317 @@ +"""Concise unit tests for scripts.register. + +Fixture-driven and parametrized to be compact while preserving coverage for +projection extraction, visualization links, thumbnail generation, xarray +cleanup, upsert behavior and CLI wiring. +""" + +from datetime import UTC, datetime +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest +from pystac import Asset, Item +from pystac.extensions.projection import ProjectionExtension + +import scripts.register as register_module + + +def make_item(collection: str | None = None, item_id: str = "id") -> Item: + return Item( + id=item_id, + geometry=None, + bbox=None, + datetime=datetime.now(UTC), + properties={}, + collection=collection, + ) + + +def make_asset( + href: str, media_type: str = "application/vnd+zarr", roles=None, extra=None +) -> Asset: + a = Asset(href=href, media_type=media_type, roles=roles or []) + if extra is not None: + a.extra_fields = extra + return a + + +def test_s3_helpers_and_rewrite(): + assert ( + register_module.s3_to_https("s3://b/p.zarr", "https://s3.example") + == "https://b.s3.example/p.zarr" + ) + + it = make_item(item_id="t") + it.add_asset("data", make_asset("s3://old/base.zarr/path", extra=None)) + register_module.rewrite_asset_hrefs( + it, "s3://old/base.zarr", "s3://new/out.zarr", "https://s3.e" + ) + assert it.assets["data"].href.startswith("https://new.s3.e/out.zarr") + + +@patch("scripts.register.zarr.open") +def test_add_projection_from_zarr(mock_open): + store = MagicMock() + store.attrs = {"spatial_ref": {"spatial_ref": "32633", "crs_wkt": "WKT"}} + mock_open.return_value = store + it = make_item() + it.add_asset("a", make_asset("https://x/z.zarr")) + register_module.add_projection_from_zarr(it) + proj = ProjectionExtension.ext(it) + assert proj.epsg == 32633 and proj.wkt2 == "WKT" + + +@patch("scripts.register.zarr.open") +def test_add_projection_from_zarr_ignores_errors(mock_open): + mock_open.side_effect = RuntimeError("boom") + it = make_item() + it.add_asset("a", make_asset("https://x/z.zarr")) + register_module.add_projection_from_zarr(it) + assert "proj:epsg" not in it.properties + + +@pytest.mark.parametrize( + "collection,assets,expect_vh", + [ + ("sentinel-2-l2a", {"TCI": ".../reflectance/r60m/TCI"}, False), + ("sentinel-1-grd", {"vh": ".../base.zarr/measurements/vh"}, True), + ], +) +def test_add_visualization_links_and_tilejson(collection, assets, expect_vh): + it = make_item(collection=collection, item_id=f"{collection}-1") + for k, v in assets.items(): + it.add_asset(k, make_asset(f"https://ex/{v}")) + register_module.add_visualization_links(it, "https://titiler", collection) + assert any(link.rel == "viewer" for link in it.links) + xyz = [link for link in it.links if link.rel == "xyz"] + assert xyz and ("vh" in xyz[0].href.lower()) == expect_vh + + +def test_remove_xarray_integration_removes_fields(): + a = make_asset( + "https://ex/z.zarr", + extra={ + "xarray:open_dataset_kwargs": {"c": {}}, + "xarray:open_datatree_kwargs": {"e": True}, + "alternate": {"xarray": {}}, + "keep": 1, + }, + ) + it = make_item() + it.add_asset("d", a) + register_module.remove_xarray_integration(it) + ef = a.extra_fields + assert "xarray:open_dataset_kwargs" not in ef and "alternate" not in ef and ef.get("keep") == 1 + + +@pytest.mark.parametrize( + "has_vh,existing_thumb,expected_thumb", + [ + (True, False, True), + (False, False, True), + (False, True, True), + ], +) +def test_add_thumbnail_variants(has_vh, existing_thumb, expected_thumb): + coll = "sentinel-1-grd" if has_vh else "sentinel-2-l2a" + it = make_item(collection=coll) + if has_vh: + it.add_asset("vh", make_asset("https://ex/base.zarr/measurements/VH")) + if existing_thumb: + it.add_asset("thumbnail", make_asset("https://ex/t.png", media_type="image/png")) + register_module.add_thumbnail_asset(it, "https://titiler", coll) + assert ("thumbnail" in it.assets) == expected_thumb + + +def test_run_registration_pipeline(monkeypatch): + payload = { + "type": "Feature", + "stac_version": "1.0.0", + "id": "S2_TEST", + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2025-01-01T00:00:00Z"}, + "collection": "sentinel-2-l2a", + "assets": { + "data": { + "href": "s3://source-bucket/product.zarr/measurements/reflectance/r10m/B04", + "type": "application/vnd+zarr", + } + }, + } + + class C: + def __init__(self): + self.request_url = None + + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + def get(self, url): + self.request_url = url + return SimpleNamespace(raise_for_status=lambda: None, json=lambda: payload) + + monkeypatch.setattr(register_module.httpx, "Client", lambda *a, **k: C()) + + seen = [] + monkeypatch.setattr( + register_module, "rewrite_asset_hrefs", lambda item, o, n, e: seen.append((o, n, e)) + ) + monkeypatch.setattr( + register_module, "add_projection_from_zarr", lambda item: seen.append("proj") + ) + monkeypatch.setattr( + register_module, "remove_xarray_integration", lambda item: seen.append("xarray") + ) + monkeypatch.setattr( + register_module, "add_visualization_links", lambda item, b, c: seen.append("viz") + ) + monkeypatch.setattr( + register_module, "add_thumbnail_asset", lambda item, b, c: seen.append("thumb") + ) + monkeypatch.setattr( + register_module, "add_derived_from_link", lambda item, s: seen.append("derived") + ) + + upsert = {} + + def record_upsert(client, coll, item): + upsert["coll"] = coll + upsert["item"] = item + + monkeypatch.setattr(register_module, "upsert_item", record_upsert) + monkeypatch.setattr(register_module, "Client", SimpleNamespace(open=lambda url: MagicMock())) + + register_module.run_registration( + source_url="https://stac.example.com/collections/sentinel-2-l2a/items/S2_TEST", + collection="sentinel-2-l2a", + stac_api_url="https://stac.example.com", + raster_api_url="https://titiler.example.com", + s3_endpoint="https://s3.example.com", + s3_output_bucket="out-bucket", + s3_output_prefix="converted", + ) + + assert ( + any(x == "viz" for x in seen) + and any(x == "thumb" for x in seen) + and any(x == "derived" for x in seen) + ) + assert ( + upsert.get("coll") == "sentinel-2-l2a" + and upsert.get("item").collection_id == "sentinel-2-l2a" + ) + + +def test_upsert_item_variants(): + def make_client(exists): + coll = MagicMock() + if exists: + coll.get_item.return_value = object() + else: + coll.get_item.side_effect = RuntimeError("no") + session = MagicMock() + session.post.return_value = SimpleNamespace(status_code=201, raise_for_status=lambda: None) + return ( + SimpleNamespace( + get_collection=lambda cid: coll, + self_href="https://stac.example.com", + _stac_io=SimpleNamespace(session=session), + ), + session, + ) + + client, sess = make_client(False) + it = make_item(item_id="T") + register_module.upsert_item(client, "sentinel-2-l2a", it) + assert sess.delete.call_count == 0 and sess.post.call_count == 1 + + client2, sess2 = make_client(True) + it2 = make_item(item_id="T2") + register_module.upsert_item(client2, "sentinel-2-l2a", it2) + assert sess2.delete.call_count == 1 and sess2.post.call_count == 1 + + +def test_register_main_exit_codes(monkeypatch): + monkeypatch.setattr(register_module, "run_registration", lambda *a, **k: None) + assert ( + register_module.main( + [ + "--source-url", + "s", + "--collection", + "c", + "--stac-api-url", + "https://s", + "--raster-api-url", + "https://r", + "--s3-endpoint", + "https://e", + "--s3-output-bucket", + "b", + "--s3-output-prefix", + "p", + ] + ) + == 0 + ) + monkeypatch.setattr( + register_module, + "run_registration", + lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")), + ) + assert ( + register_module.main( + [ + "--source-url", + "s", + "--collection", + "c", + "--stac-api-url", + "https://s", + "--raster-api-url", + "https://r", + "--s3-endpoint", + "https://e", + "--s3-output-bucket", + "b", + "--s3-output-prefix", + "p", + ] + ) + == 1 + ) + + +def test_register_main_failure(monkeypatch: pytest.MonkeyPatch) -> None: + """register.main returns 1 when run_registration raises.""" + + def boom(*args): + raise RuntimeError("boom") + + monkeypatch.setattr(register_module, "run_registration", boom) + + exit_code = register_module.main( + [ + "--source-url", + "src", + "--collection", + "coll", + "--stac-api-url", + "https://stac", + "--raster-api-url", + "https://titiler", + "--s3-endpoint", + "https://s3", + "--s3-output-bucket", + "bucket", + "--s3-output-prefix", + "prefix", + ] + ) + + assert exit_code == 1 diff --git a/tests/unit/test_validate_geozarr.py b/tests/unit/test_validate_geozarr.py new file mode 100644 index 0000000..ce53a27 --- /dev/null +++ b/tests/unit/test_validate_geozarr.py @@ -0,0 +1,431 @@ +"""Tests for validate_geozarr.py - GeoZarr compliance validation.""" + +import json +import subprocess + +import pytest + +from scripts.validate_geozarr import main, validate_geozarr + + +class TestValidateGeozarr: + """Test validation logic.""" + + def test_successful_validation(self, mocker): + """Validation passes when subprocess exits 0.""" + mock_run = mocker.patch("scripts.validate_geozarr.subprocess.run") + mock_run.return_value = mocker.Mock( + returncode=0, + stdout="All checks passed", + stderr="", + ) + + result = validate_geozarr("s3://bucket/dataset.zarr") + + assert result["valid"] is True + assert result["exit_code"] == 0 + assert "All checks passed" in result["stdout"] + mock_run.assert_called_once_with( + ["eopf-geozarr", "validate", "s3://bucket/dataset.zarr"], + capture_output=True, + text=True, + timeout=300, + ) + + def test_failed_validation(self, mocker): + """Validation fails when subprocess exits non-zero.""" + mock_run = mocker.patch("scripts.validate_geozarr.subprocess.run") + mock_run.return_value = mocker.Mock( + returncode=1, + stdout="", + stderr="Missing required attribute: spatial_ref", + ) + + result = validate_geozarr("s3://bucket/invalid.zarr") + + assert result["valid"] is False + assert result["exit_code"] == 1 + assert "Missing required attribute" in result["stderr"] + + def test_verbose_flag_passed(self, mocker): + """Verbose flag is passed to subprocess.""" + mock_run = mocker.patch("scripts.validate_geozarr.subprocess.run") + mock_run.return_value = mocker.Mock(returncode=0, stdout="", stderr="") + + validate_geozarr("s3://bucket/dataset.zarr", verbose=True) + + mock_run.assert_called_once_with( + ["eopf-geozarr", "validate", "s3://bucket/dataset.zarr", "--verbose"], + capture_output=True, + text=True, + timeout=300, + ) + + def test_timeout_handling(self, mocker): + """Handles subprocess timeout gracefully.""" + mock_run = mocker.patch("scripts.validate_geozarr.subprocess.run") + mock_run.side_effect = subprocess.TimeoutExpired( + cmd=["eopf-geozarr", "validate"], timeout=300 + ) + + result = validate_geozarr("s3://bucket/large.zarr") + + assert result["valid"] is False + assert result["exit_code"] == -1 + assert "timeout" in result["error"].lower() + + def test_subprocess_exception(self, mocker): + """Handles subprocess exceptions.""" + mock_run = mocker.patch("scripts.validate_geozarr.subprocess.run") + mock_run.side_effect = FileNotFoundError("eopf-geozarr not found") + + result = validate_geozarr("s3://bucket/dataset.zarr") + + assert result["valid"] is False + assert result["exit_code"] == -1 + assert "not found" in result["error"] + + +class TestMainCLI: + """Test CLI interface.""" + + def test_basic_validation(self, mocker): + """Basic validation without options.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = { + "valid": True, + "exit_code": 0, + "stdout": "OK", + "stderr": "", + } + # Mock TMS and CF validation functions + mocker.patch( + "scripts.validate_geozarr.validate_tile_matrix_set", return_value={"valid": True} + ) + mocker.patch( + "scripts.validate_geozarr.validate_cf_conventions", return_value={"valid": True} + ) + mocker.patch("sys.argv", ["validate_geozarr.py", "s3://bucket/dataset.zarr"]) + + with pytest.raises(SystemExit) as exc_info: + main() + + assert exc_info.value.code == 0 + mock_validate.assert_called_once_with("s3://bucket/dataset.zarr", False) + + def test_with_item_id(self, mocker): + """Includes item ID in output.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": True, "exit_code": 0} + # Mock TMS and CF validation functions + mocker.patch( + "scripts.validate_geozarr.validate_tile_matrix_set", return_value={"valid": True} + ) + mocker.patch( + "scripts.validate_geozarr.validate_cf_conventions", return_value={"valid": True} + ) + mocker.patch( + "sys.argv", + ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--item-id", "test-item-123"], + ) + + with pytest.raises(SystemExit) as exc_info: + main() + + assert exc_info.value.code == 0 + + def test_with_output_file(self, mocker, tmp_path): + """Writes results to output file.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": True, "exit_code": 0} + # Mock TMS and CF validation functions + mocker.patch( + "scripts.validate_geozarr.validate_tile_matrix_set", return_value={"valid": True} + ) + mocker.patch( + "scripts.validate_geozarr.validate_cf_conventions", return_value={"valid": True} + ) + + output_file = tmp_path / "results.json" + mocker.patch( + "sys.argv", + ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--output", str(output_file)], + ) + + with pytest.raises(SystemExit): + main() + + assert output_file.exists() + data = json.loads(output_file.read_text()) + with open(output_file) as f: + data = json.load(f) + + assert data["dataset_path"] == "s3://bucket/dataset.zarr" + assert data["validations"]["geozarr"]["valid"] is True + + def test_verbose_flag(self, mocker): + """Verbose flag is passed through.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": True, "exit_code": 0} + mocker.patch("sys.argv", ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--verbose"]) + + with pytest.raises(SystemExit): + main() + + mock_validate.assert_called_once_with("s3://bucket/dataset.zarr", True) + + def test_failed_validation_exits_1(self, mocker): + """Failed validation exits with code 1.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": False, "exit_code": 1} + mocker.patch("sys.argv", ["validate_geozarr.py", "s3://bucket/invalid.zarr"]) + + with pytest.raises(SystemExit) as exc_info: + main() + + assert exc_info.value.code == 1 + + def test_creates_output_directory(self, mocker, tmp_path): + """Creates output directory if it doesn't exist.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": True, "exit_code": 0} + + nested_output = tmp_path / "deep" / "nested" / "results.json" + mocker.patch( + "sys.argv", + ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--output", str(nested_output)], + ) + + with pytest.raises(SystemExit): + main() + + assert nested_output.exists() + assert nested_output.parent.exists() + + +class TestValidateStacItem: + """Test STAC item validation.""" + + def test_valid_stac_item(self, mocker, tmp_path): + """Test valid STAC item passes.""" + from scripts.validate_geozarr import validate_stac_item + + item_file = tmp_path / "item.json" + item_file.write_text( + json.dumps( + { + "type": "Feature", + "stac_version": "1.0.0", + "id": "test-item", + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2025-01-01T00:00:00Z"}, + "links": [], + "assets": {}, + } + ) + ) + + result = validate_stac_item(item_file) + + assert result["valid"] is True + assert result["item_id"] == "test-item" + + def test_invalid_stac_item(self, mocker, tmp_path): + """Test invalid STAC item fails.""" + from scripts.validate_geozarr import validate_stac_item + + item_file = tmp_path / "bad_item.json" + item_file.write_text(json.dumps({"invalid": "data"})) + + result = validate_stac_item(item_file) + + assert result["valid"] is False + assert "error" in result + + +class TestValidateTileMatrixSet: + """Test TileMatrixSet validation.""" + + def test_valid_tms(self, mocker): + """Test valid TileMatrixSet.""" + from scripts.validate_geozarr import validate_tile_matrix_set + + mock_store = mocker.Mock() + mock_store.attrs.asdict.return_value = { + "tile_matrix_set": { + "id": "WebMercatorQuad", + "crs": "http://www.opengis.net/def/crs/EPSG/0/3857", + "tileMatrices": [ + { + "id": "0", + "scaleDenominator": 559082264.0287178, + "cellSize": 156543.03392804097, + "pointOfOrigin": [-20037508.342789244, 20037508.342789244], + "tileWidth": 256, + "tileHeight": 256, + "matrixWidth": 1, + "matrixHeight": 1, + } + ], + } + } + + mock_patch = mocker.patch("zarr.open", return_value=mock_store) + result = validate_tile_matrix_set("s3://bucket/dataset.zarr") + mock_patch.stop() + + assert result["valid"] is True + assert result["tms_id"] == "WebMercatorQuad" + assert "3857" in result["crs"] + + def test_missing_tms(self, mocker): + """Test missing TileMatrixSet attribute.""" + from scripts.validate_geozarr import validate_tile_matrix_set + + mock_store = mocker.Mock() + mock_store.attrs.asdict.return_value = {} # No tile_matrix_set + + mock_patch = mocker.patch("zarr.open", return_value=mock_store) + result = validate_tile_matrix_set("s3://bucket/dataset.zarr") + mock_patch.stop() + + assert result["valid"] is False + assert "Missing" in result["error"] + + def test_tms_exception(self, mocker): + """Test TMS validation exception handling.""" + from scripts.validate_geozarr import validate_tile_matrix_set + + mock_patch = mocker.patch("zarr.open", side_effect=Exception("Zarr error")) + result = validate_tile_matrix_set("s3://bucket/dataset.zarr") + mock_patch.stop() + + assert result["valid"] is False + assert "error" in result + + +class TestValidateCFConventions: + """Test CF-conventions validation.""" + + def test_valid_cf(self, mocker): + """Test valid CF-conventions.""" + from scripts.validate_geozarr import validate_cf_conventions + + mock_var = mocker.Mock() + mock_var.attrs = {"standard_name": "air_temperature"} + + mock_ds = mocker.Mock() + mock_ds.data_vars = {"temp": mock_var} + mock_ds.__getitem__ = mocker.Mock(return_value=mock_var) # Support ds[var_name] + mock_ds.cf.decode.return_value = mock_ds + + mock_patch = mocker.patch("xarray.open_zarr", return_value=mock_ds) + result = validate_cf_conventions("s3://bucket/dataset.zarr") + mock_patch.stop() + + assert result["valid"] is True + + def test_cf_warnings(self, mocker): + """Test CF-conventions with warnings.""" + from scripts.validate_geozarr import validate_cf_conventions + + mock_var = mocker.Mock() + mock_var.attrs = {} # Missing standard_name/long_name + + mock_ds = mocker.Mock() + mock_ds.data_vars = {"temp": mock_var} + mock_ds.__getitem__ = mocker.Mock(return_value=mock_var) # Support ds[var_name] + mock_ds.cf.decode.return_value = mock_ds + + mock_patch = mocker.patch("xarray.open_zarr", return_value=mock_ds) + result = validate_cf_conventions("s3://bucket/dataset.zarr") + mock_patch.stop() + + assert result["valid"] is True + assert "warnings" in result + assert len(result["warnings"]) > 0 + + def test_cf_exception(self, mocker): + """Test CF validation exception handling.""" + from scripts.validate_geozarr import validate_cf_conventions + + mock_patch = mocker.patch("xarray.open_zarr", side_effect=Exception("xarray error")) + result = validate_cf_conventions("s3://bucket/dataset.zarr") + mock_patch.stop() + + assert result["valid"] is False + assert "error" in result + + +class TestMainWithStacItem: + """Test main() with STAC item validation.""" + + def test_with_stac_item(self, mocker, tmp_path): + """Test validation with STAC item.""" + mock_validate_geozarr = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate_geozarr.return_value = {"valid": True, "exit_code": 0} + + # Mock TMS and CF to return valid so overall validation passes + mocker.patch( + "scripts.validate_geozarr.validate_tile_matrix_set", return_value={"valid": True} + ) + mocker.patch( + "scripts.validate_geozarr.validate_cf_conventions", return_value={"valid": True} + ) + + item_file = tmp_path / "item.json" + item_file.write_text( + json.dumps( + { + "type": "Feature", + "stac_version": "1.0.0", + "id": "test-item", + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2025-01-01T00:00:00Z"}, + "links": [], + "assets": {}, + } + ) + ) + + mocker.patch( + "sys.argv", + ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--stac-item", str(item_file)], + ) + + with pytest.raises(SystemExit) as exc_info: + main() + + assert exc_info.value.code == 0 + + def test_skip_tms(self, mocker): + """Test --skip-tms flag.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": True, "exit_code": 0} + mock_tms = mocker.patch("scripts.validate_geozarr.validate_tile_matrix_set") + mock_cf = mocker.patch("scripts.validate_geozarr.validate_cf_conventions") + mock_cf.return_value = {"valid": True} + + mocker.patch("sys.argv", ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--skip-tms"]) + + with pytest.raises(SystemExit): + main() + + mock_tms.assert_not_called() + + def test_skip_cf(self, mocker): + """Test --skip-cf flag.""" + mock_validate = mocker.patch("scripts.validate_geozarr.validate_geozarr") + mock_validate.return_value = {"valid": True, "exit_code": 0} + mock_tms = mocker.patch("scripts.validate_geozarr.validate_tile_matrix_set") + mock_tms.return_value = {"valid": True} + mock_cf = mocker.patch("scripts.validate_geozarr.validate_cf_conventions") + + mocker.patch("sys.argv", ["validate_geozarr.py", "s3://bucket/dataset.zarr", "--skip-cf"]) + + with pytest.raises(SystemExit): + main() + + mock_cf.assert_not_called()