Skip to content

Conversation

@wietzesuijker
Copy link
Collaborator

@wietzesuijker wietzesuijker commented Oct 23, 2025

Kubernetes pipeline converting Sentinel-1/2 to cloud-optimized GeoZarr with web visualization.

Flow: AMQP event → Argo workflow → convert → register → STAC + map tiles


Deploy

kubectl apply -k workflows/overlays/staging

Run

kubectl create -n devseed-staging -f - <<'EOF'
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: geozarr-
spec:
  workflowTemplateRef:
    name: geozarr-pipeline
  arguments:
    parameters:
    - name: source_url
      value: "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2A_..."
    - name: register_collection
      value: "sentinel-2-l2a-dp-test"
EOF

Components

Scripts: convert.py, register.py, augment_stac_item.py, get_conversion_params.py

Stack: Argo Workflows, RabbitMQ, STAC API, TiTiler

Image: ghcr.io/eopf-explorer/data-pipeline:slim


Future feature branches

#27 tests, #28 STAC integration, #29 validation, #30 benchmarks, #31 ops

Add complete Argo Workflows infrastructure for geozarr pipeline with automated AMQP event triggering.

Workflow pipeline:
- Convert: Sentinel-2 Zarr → GeoZarr (cloud-optimized)
- Register: Create STAC item with metadata
- Augment: Add visualization links (XYZ tiles, TileJSON)

Event-driven automation:
- AMQP EventSource subscribes to RabbitMQ queue
- Sensor triggers workflows on incoming messages
- RBAC configuration for secure execution

Configuration:
- Python dependencies (pyproject.toml, uv.lock)
- Pre-commit hooks (ruff, mypy, yaml validation)
- TTL cleanup (24h auto-delete completed workflows)
Add STAC registration, augmentation, and workflow submission scripts.

- register_stac.py: Create/update STAC items with S3→HTTPS rewriting
- augment_stac_item.py: Add visualization links (XYZ tiles, TileJSON)
- submit_via_api.py: Submit workflows via Argo API for testing
- Retry with exponential backoff on transient failures
- Configurable timeouts via HTTP_TIMEOUT, RETRY_ATTEMPTS, RETRY_MAX_WAIT
- Workflow step timeouts: 1h convert, 5min register/augment
Add operator notebooks and environment configuration.

- Submit workflow examples (AMQP and direct API)
- Environment variable template (.env.example)
- .gitignore for Python, IDEs, Kubernetes configs
Add container build configuration and development tooling.

- Dockerfile for data-pipeline image
- Makefile for common tasks (build, push, test)
- GitHub Container Registry integration
Add comprehensive testing and project documentation.

- Unit tests for register_stac and augment_stac_item
- Integration tests for workflow submission
- E2E test configuration
- Project README, CONTRIBUTING, QUICKSTART guides
- CI workflow (GitHub Actions)
Extend pipeline to support Sentinel-1 GRD collections:

- S1 GRD workflow configuration and test payloads
- Collection detection logic (get_crs.py extended for S1)
- Staging namespace deployment (rbac-staging.yaml)
- S1-specific STAC registration handling
- End-to-end S1 test suite
- v20-v22 image iterations with S1 support

Enables multi-mission pipeline supporting both S2 L2A and S1 GRD products.
Add comprehensive S1 GRD pipeline documentation and example code.

docs/s1-guide.md:
- S2 vs S1 feature comparison (groups, flags, chunks, polarizations)
- Collection registry config for sentinel-1-l1-grd
- Preview generation logic (grayscale with polarization detection)
- Test data sources (EODC STAC)
- Workflow parameters for S1 conversion
- Known issues (GCP reprojection, memory, TiTiler rescaling)

examples/s1_quickstart.py:
- End-to-end S1 pipeline: fetch → convert → register → augment
- Demonstrates S1-specific flags: --gcp-group, --spatial-chunk 2048
- Example using EODC S1C_IW_GRDH test item
- Local development workflow

Usage:
  python examples/s1_quickstart.py
Generalize pipeline through collection registry pattern:

- Collection-specific parameter registry (groups, chunks, tile sizes)
- Dynamic parameter lookup script (get_conversion_params.py)
- Registry integration across all workflow stages
- Support for S2 L2A and S1 GRD with distinct parameters
- Kustomize-based deployment structure

Enables scalable addition of new missions (S3, S5P, etc.) through
registry configuration without code changes.
Add comprehensive performance measurement and validation:

- Automated validation workflow task (validate_geozarr.py)
- Performance benchmarking tools (benchmark_comparison.py, benchmark_tile_performance.py)
- Production metrics from 9 operational workflows (8.6min avg, 75% success)
- Ecosystem compatibility validation (zarr-python, xarray, stac-geoparquet)
- User guide for adding new collections (docs/ADDING_COLLECTIONS.md)
- Performance report with operational metrics (docs/PERFORMANCE_REPORT.md)

Production validation shows pipeline ready for deployment with
validated performance and ecosystem compatibility.
Enable parallel chunk processing with Dask distributed:

- Add --dask-cluster flag to conversion workflow
- Update to v26 image with Dask support
- Add validation task between convert and register stages

Initial test shows 1.6× speedup (320s vs 516s baseline).
Task was defined but never referenced in DAG (lines 25-37).
Add workflow parameters:
- stac_api_url, raster_api_url (API endpoints)
- s3_endpoint, s3_output_bucket, s3_output_prefix (S3 config)

Replace all hardcoded values with parameter references for:
- STAC/raster API URLs in register/augment tasks
- S3 endpoint in all tasks
- S3 bucket/prefix in convert/validate/register tasks

Enables easy environment switching (dev/staging/prod) via parameter override.
Three Jupyter notebooks demonstrating GeoZarr data access and pyramid features:

01_quickstart.ipynb
- Load GeoZarr from S3 with embedded STAC metadata
- Visualize RGB composites
- Inspect geospatial properties

02_pyramid_performance.ipynb
- Benchmark tile serving with/without pyramids
- Measure observed 3-5× speedup at zoom 6-10
- Calculate storage tradeoffs (33% overhead)

03_multi_resolution.ipynb
- Access individual pyramid levels (0-3)
- Compare sizes (4.7MB → 72KB reduction)
- Explore quality vs size tradeoffs

These notebooks help users understand the pipeline outputs and evaluate
pyramid benefits for their use cases. Still evolving as we refine the
conversion process and gather production feedback.
Replace inline bash script in workflows/amqp-publish-once.yaml with
scripts/publish_amqp.py. Script is now included in Docker image,
eliminating need for runtime pip installs and curl downloads.

Changes:
- Add scripts/publish_amqp.py with routing key templates and retry
- Update workflows/amqp-publish-once.yaml to use pre-built image
- Add workflows/ directory to docker/Dockerfile
- Add tests/unit/test_publish_amqp.py with pytest-mock
20 tests: pattern matching, S1/S2 configs, CLI output formats
Tests asset priority logic (product > zarr > any .zarr) and error handling
for missing or malformed STAC items.
Tests subprocess execution, timeout handling, error cases, and CLI
options including file output and verbose mode.
Measures load time and dataset metrics for performance comparison.
Outputs JSON results with speedup factor and format recommendations.
- Add show-parameters step displaying full workflow config in UI
- Add step headers (1/4, 2/4, etc) to all pipeline stages
- Add progress indicators and section dividers for better readability
- Add workflow metadata labels (collection, item-id) for filtering
- Fix sensor event binding (rabbitmq-geozarr/geozarr-events)
- Add S1 E2E test job (amqp-publish-s1-e2e.yaml)

Argo UI now shows:
  • Full payload/parameters in dedicated initial step
  • Clear step numbers and progress for each stage
  • Final URLs for STAC item and S3 output
  • Better context during long-running conversions
Complete validation report showing:
- Successful S1 GRD to GeoZarr conversion
- 21-minute workflow execution (30k x 15k resolution)
- 6-level multiscale pyramids for VV/VH polarizations
- STAC registration with preview links
- UI enhancements validated in Argo
- Collection registry parameters documented
- Fix sys.path in test_publish_amqp.py from parent.parent to parent.parent.parent
- Update S1 spatial_chunk test expectations from 2048 to 4096
- Aligns with code changes in get_conversion_params.py
- Remove test_real_stac_api_connection (only checked HTTP 200, no logic)
- Remove unused os import
- Test had external dependency, was flaky, redundant with mocked tests
- Format long argparse description lines for readability
- No functional changes, purely formatting
- Set archiveLogs: false for immediate log visibility via kubectl
- Change convert-geozarr from script to container template for stdout logs
- Reduce memory request to 6Gi (limit 10Gi) for better cluster scheduling
- Add Dask parallel processing info in comments
- Simplify show-parameters to basic output

Fixes 30-60s log delay in Argo UI. Logs now visible via kubectl immediately.
- Add run-s1-test.yaml for direct kubectl submission
- Update amqp-publish-s1-e2e.yaml with optimized test parameters
- Use S1A item from Oct 3 for consistent testing
- Add WORKFLOW_SUBMISSION_TESTING.md with complete test results
- Update README.md: reorganize by recommendation priority
- Document all 4 submission methods with pros/cons
- Add troubleshooting for log visibility and resource limits
- Simplify Quick Start to 2 commands (30 seconds)
- Document Dask integration and resource optimization

Covers kubectl, Jupyter, event-driven (AMQP), and Python CLI approaches.
Test validation proven by 93 passing tests, not narrative docs
- Configure pytest pythonpath to enable script imports (unblocks 90 tests)
- Add exception tracebacks to get_conversion_params error handlers
- Add error trap to validate-setup.sh for line-level diagnostics
- Replace timestamp-based Docker cache with commit SHA for precision
- Add pre-commit hooks (ruff, mypy) for code quality enforcement

Test results: 90/90 passing, 32% coverage
- Add integration-tests job in GitHub Actions (runs on PRs only)
- Add explicit resource requests/limits to all workflow templates
  - convert-geozarr: 6Gi/10Gi memory, 2/4 CPU
  - validate: 2Gi/4Gi memory, 1/2 CPU
  - register-stac: 1Gi/2Gi memory, 500m/1 CPU
  - augment-stac: 1Gi/2Gi memory, 500m/1 CPU

Prevents pod eviction and enables predictable scheduling
@wietzesuijker wietzesuijker force-pushed the slim branch 2 times, most recently from be2ce09 to 4b70e92 Compare October 27, 2025 05:51
Remove workflow_dispatch and tags triggers from build workflow
Remove pull_request and workflow_dispatch triggers from test workflow
Fix permissions in test workflow (no write access needed for tests)
@wietzesuijker wietzesuijker force-pushed the slim branch 6 times, most recently from a413c2d to 80f0da5 Compare October 27, 2025 15:56
… artifacts

- Add S3 cleanup before conversion to remove stale base arrays
- Revert to Python entry points (convert.py, register.py) for maintainability
- Fix groups parameter type (string → list) for API compatibility
- Use clean args approach instead of inline bash scripts
- Fix TiTiler preview path to use overview arrays (/r10m/0:tci)

This addresses PR feedback by consolidating the cleanup fix with proper
Python-based workflow structure. All debugging iterations squashed.
@wietzesuijker wietzesuijker force-pushed the slim branch 4 times, most recently from 2c743cd to 4b0f232 Compare October 28, 2025 05:39
@ciaransweet
Copy link

@wietzesuijker should there be tests here? I still don't see them

@wietzesuijker wietzesuijker force-pushed the slim branch 2 times, most recently from 5572ef3 to 719e145 Compare October 28, 2025 09:30
@wietzesuijker
Copy link
Collaborator Author

@wietzesuijker should there be tests here? I still don't see them

@ciaransweet, no deferred that to #27 and #28 (they need updating to the new state in this branch; I'll do that soon).

The --crs-groups flag triggers prepare_dataset_with_crs_info() in data-model,
which writes CRS metadata via ds.rio.write_crs() and creates the spatial_ref
coordinate variable required by TiTiler validation.

Restores working configuration from commit 21ea009.

fix: update sentinel-2 conversion parameters for accuracy

feat: add enable-sharding parameter support

- Add enable_sharding to mission configs (S2: true, S1: false)
- Pass enable_sharding to create_geozarr_dataset kwargs
- Add shell/JSON output support for new parameter
- Align with data-model launch.json reference config
- Remove Dask cluster parameter (use ENABLE_DASK_CLUSTER env var instead)

Refs: data-model/.vscode/launch.json L2A config
docs: remove duplicate Deploy section and clarify test status

docs: credential setup instructions with OVH Manager links

- workflows/README: explain secret purposes (event ingestion, storage, API auth)
- workflows/README: add direct OVH Manager links for kubeconfig and S3 credentials
- README: delegate setup to workflows/README
- Separate operator usage (root README) from deployment setup (workflows/README)

docs: update README and gitignore
- Merge 8 scripts into 2 (convert.py, register.py)
- Normalize CONFIGS structure (groups always list, direct crs_groups)
- Use pystac native patterns (Item.clone, simplified workflow)
- Simplify registration: 6 HTTP calls → 2 (fetch+upsert)
- Add emoji logging for better UX
- Net reduction: -198 lines (-30%)
Empty string parameters caused argparse errors.
Workflows now use collection-specific defaults from convert.py CONFIGS.
Don't strip /stac from base URL - client.self_href already points to correct endpoint.

fix: suppress httpx/httpcore debug logging
@wietzesuijker wietzesuijker merged commit 1f7b08c into main Oct 30, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants