Skip to content

Commit a413c2d

Browse files
committed
fix: add S3 cleanup and Python workflow scripts to prevent base array artifacts
- Add S3 cleanup before conversion to remove stale base arrays - Revert to Python entry points (convert.py, register.py) for maintainability - Fix groups parameter type (string → list) for API compatibility - Use clean args approach instead of inline bash scripts - Fix TiTiler preview path to use overview arrays (/r10m/0:tci) This addresses PR feedback by consolidating the cleanup fix with proper Python-based workflow structure. All debugging iterations squashed.
1 parent 3da5667 commit a413c2d

File tree

5 files changed

+49
-78
lines changed

5 files changed

+49
-78
lines changed

scripts/augment_stac_item.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ def add_visualization(item: Item, raster_base: str, collection_id: str) -> None:
6666
_add_tile_links(item, base_url, query, "Sentinel-1 GRD VH")
6767

6868
elif coll_lower.startswith(("sentinel-2", "sentinel2")):
69-
# S2: Quicklook path
70-
var_path = "/quality/l2a_quicklook/r10m:tci"
69+
# S2: Point to overview level 0 for quicklook TCI
70+
# Use /r10m/0:tci instead of /r10m:tci because base array lacks
71+
# spatial_ref coordinate (only overviews have it)
72+
var_path = "/quality/l2a_quicklook/r10m/0:tci"
7173
query = (
7274
f"variables={urllib.parse.quote(var_path, safe='')}&bidx=1&bidx=2&bidx=3&assets=TCI_10m"
7375
)

scripts/convert.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55

66
import argparse
77
import logging
8+
import os
89
import sys
910
from urllib.parse import urlparse
1011

12+
import fsspec
1113
import httpx
1214
import xarray as xr
1315
from eopf_geozarr import create_geozarr_dataset
@@ -83,6 +85,15 @@ def run_conversion(
8385
# Construct output path
8486
output_url = f"s3://{s3_output_bucket}/{s3_output_prefix}/{collection}/{item_id}.zarr"
8587

88+
# Clean up existing output to avoid base array artifacts
89+
logger.info(f"🧹 Cleaning up existing output at: {output_url}")
90+
try:
91+
fs = fsspec.filesystem("s3", client_kwargs={"endpoint_url": os.getenv("AWS_ENDPOINT_URL")})
92+
fs.rm(output_url, recursive=True)
93+
logger.info("✅ Cleanup completed")
94+
except Exception as e:
95+
logger.info(f"ℹ️ No existing output to clean (or cleanup failed): {e}")
96+
8697
logger.info("Starting GeoZarr conversion...")
8798
logger.info(f" Source: {zarr_url}")
8899
logger.info(f" Destination: {output_url}")
@@ -113,9 +124,12 @@ def run_conversion(
113124
crs_groups_str = params["extra_flags"].split("--crs-groups")[1].strip().split()[0]
114125
kwargs["crs_groups"] = [crs_groups_str]
115126

127+
# groups parameter must be a list
128+
groups_list = [params["groups"]] if isinstance(params["groups"], str) else params["groups"]
129+
116130
create_geozarr_dataset(
117131
dt_input=dt,
118-
groups=params["groups"],
132+
groups=groups_list,
119133
output_path=output_url,
120134
spatial_chunk=params["spatial_chunk"],
121135
tile_width=params["tile_width"],
@@ -135,9 +149,13 @@ def main(argv: list[str] | None = None) -> int:
135149
parser.add_argument("--collection", required=True, help="Collection ID")
136150
parser.add_argument("--s3-output-bucket", required=True, help="S3 output bucket")
137151
parser.add_argument("--s3-output-prefix", required=True, help="S3 output prefix")
152+
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
138153

139154
args = parser.parse_args(argv)
140155

156+
if args.verbose:
157+
logging.getLogger().setLevel(logging.DEBUG)
158+
141159
try:
142160
output_url = run_conversion(
143161
args.source_url,

scripts/get_conversion_params.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
# Conversion parameters by mission
2222
CONFIGS: dict[str, dict[str, Any]] = {
2323
"sentinel-1": {
24-
"groups": ["/measurements"],
24+
"groups": "/measurements",
2525
"extra_flags": "--gcp-group /conditions/gcp",
2626
"spatial_chunk": 4096,
2727
"tile_width": 512,
2828
},
2929
"sentinel-2": {
30-
"groups": ["/quality/l2a_quicklook/r10m"],
30+
"groups": "/quality/l2a_quicklook/r10m",
3131
"extra_flags": "--crs-groups /quality/l2a_quicklook/r10m",
3232
"spatial_chunk": 4096,
3333
"tile_width": 512,

submit_test_workflow.py

Lines changed: 21 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,27 @@
11
#!/usr/bin/env python3
2-
"""Submit workflow to geozarr pipeline via RabbitMQ."""
3-
42
import json
53
import os
6-
import sys
74

85
import pika
96

10-
11-
def submit_workflow(payload: dict) -> bool:
12-
"""Submit workflow via RabbitMQ."""
13-
try:
14-
username = os.getenv("RABBITMQ_USER", "user")
15-
password = os.getenv("RABBITMQ_PASSWORD")
16-
17-
if not password:
18-
print("❌ RABBITMQ_PASSWORD not set")
19-
print(
20-
" Get: kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d"
21-
)
22-
return False
23-
24-
credentials = pika.PlainCredentials(username, password)
25-
connection = pika.BlockingConnection(
26-
pika.ConnectionParameters("localhost", 5672, credentials=credentials)
27-
)
28-
channel = connection.channel()
29-
30-
exchange_name = "geozarr-staging"
31-
routing_key = "eopf.items.test"
32-
33-
channel.exchange_declare(exchange=exchange_name, exchange_type="topic", durable=True)
34-
channel.basic_publish(
35-
exchange=exchange_name,
36-
routing_key=routing_key,
37-
body=json.dumps(payload),
38-
properties=pika.BasicProperties(delivery_mode=2, content_type="application/json"),
39-
)
40-
41-
print(f"✅ Published: {payload['source_url'][:80]}...")
42-
connection.close()
43-
return True
44-
45-
except Exception as e:
46-
print(f"❌ Failed: {e}")
47-
import traceback
48-
49-
traceback.print_exc()
50-
return False
51-
52-
53-
if __name__ == "__main__":
54-
# ✅ Use STAC item URL (pipeline extracts zarr URL from assets)
55-
# ❌ NOT direct zarr URL
56-
item_id = "S2A_MSIL2A_20251022T094121_N0511_R036_T34TDT_20251022T114817"
57-
payload = {
58-
"source_url": f"https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/{item_id}",
59-
"item_id": item_id,
60-
"collection": "sentinel-2-l2a-dp-test",
61-
}
62-
63-
print("🚀 Submitting workflow via RabbitMQ")
64-
print(f" Collection: {payload['collection']}")
65-
print(f" Source: {payload['source_url']}")
66-
print()
67-
print("Prerequisites:")
68-
print(" kubectl port-forward -n devseed-staging svc/rabbitmq 5672:5672 &")
69-
print(
70-
" export RABBITMQ_PASSWORD=$(kubectl get secret rabbitmq-password -n core -o jsonpath='{.data.rabbitmq-password}' | base64 -d)"
71-
)
72-
print()
73-
74-
if submit_workflow(payload):
75-
print("✅ Monitor: kubectl get wf -n devseed-staging --watch")
76-
sys.exit(0)
77-
else:
78-
sys.exit(1)
7+
# Test item that was failing (same as before)
8+
payload = {
9+
"source_url": "https://stac.core.eopf.eodc.eu/collections/sentinel-2-l2a/items/S2A_MSIL2A_20251023T105131_N0511_R051_T31UET_20251023T122522",
10+
"item_id": "S2A_MSIL2A_20251023T105131_N0511_R051_T31UET_20251023T122522",
11+
"collection": "sentinel-2-l2a-dp-test",
12+
}
13+
14+
credentials = pika.PlainCredentials("user", os.getenv("RABBITMQ_PASSWORD"))
15+
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost", 5672, "/", credentials))
16+
channel = connection.channel()
17+
18+
message = json.dumps(payload)
19+
channel.basic_publish(
20+
exchange="geozarr-events",
21+
routing_key="geozarr.convert",
22+
body=message,
23+
properties=pika.BasicProperties(content_type="application/json"),
24+
)
25+
26+
print(f"✅ Published workflow for item: {payload['item_id']}")
27+
connection.close()

workflows/base/workflowtemplate.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ spec:
3131
- name: s3_output_prefix
3232
value: tests-output
3333
- name: pipeline_image_version
34-
value: fix-unit-tests
34+
value: slim
3535
templates:
3636
- name: main
3737
dag:
@@ -58,6 +58,7 @@ spec:
5858
- "{{workflow.parameters.s3_output_bucket}}"
5959
- --s3-output-prefix
6060
- "{{workflow.parameters.s3_output_prefix}}"
61+
- --verbose
6162
resources:
6263
requests:
6364
memory: 4Gi
@@ -104,6 +105,7 @@ spec:
104105
- "{{workflow.parameters.s3_output_bucket}}"
105106
- --s3-output-prefix
106107
- "{{workflow.parameters.s3_output_prefix}}"
108+
- --verbose
107109
ports:
108110
- containerPort: 8000
109111
name: metrics

0 commit comments

Comments
 (0)