Skip to content

Commit aa9a580

Browse files
emmanuelmathotwietzesuijker
authored andcommitted
feat: align conversion parameters with data-model reference
- All 4 S2 groups (r10m, r20m, r60m, quicklook) - CRS groups for spatial reference metadata - Optimized chunk sizes (1024) and tile width (256) - Enable-sharding parameter (S2: true, S1: false) - Simplified parameter lookup (concise fallback logic) Aligns with data-model/.vscode/launch.json config. Refs: #31
1 parent 719e145 commit aa9a580

File tree

5 files changed

+145
-65
lines changed

5 files changed

+145
-65
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,5 @@ Thumbs.db
6262
*.zarr
6363
out/
6464
reports/
65+
*.pyc
66+
pipeline_utils.py

scripts/convert.py

Lines changed: 86 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import logging
88
import os
99
import sys
10+
from typing import Any
1011
from urllib.parse import urlparse
1112

1213
import fsspec
@@ -46,6 +47,10 @@ def run_conversion(
4647
collection: str,
4748
s3_output_bucket: str,
4849
s3_output_prefix: str,
50+
groups: str | None = None,
51+
spatial_chunk: int | None = None,
52+
tile_width: int | None = None,
53+
enable_sharding: bool | None = None,
4954
) -> str:
5055
"""Run GeoZarr conversion workflow.
5156
@@ -54,6 +59,10 @@ def run_conversion(
5459
collection: Collection ID for parameter lookup
5560
s3_output_bucket: S3 bucket for output
5661
s3_output_prefix: S3 prefix for output
62+
groups: Override groups parameter (comma-separated if multiple)
63+
spatial_chunk: Override spatial chunk size
64+
tile_width: Override tile width
65+
enable_sharding: Override sharding enable flag
5766
5867
Returns:
5968
Output Zarr URL (s3://...)
@@ -74,15 +83,17 @@ def run_conversion(
7483
zarr_url = source_url
7584
logger.info(f"Direct Zarr URL: {zarr_url}")
7685

77-
# Get conversion parameters from collection config
78-
logger.debug(f"Getting conversion parameters for {collection}...")
86+
# Get conversion parameters (with optional overrides)
7987
params = get_conversion_params(collection)
80-
logger.debug(f" Groups: {params['groups']}")
81-
logger.debug(f" Chunk: {params['spatial_chunk']}")
82-
logger.debug(f" Tile width: {params['tile_width']}")
83-
logger.debug(f" Extra flags: {params['extra_flags']}")
84-
85-
# Construct output path
88+
overrides = {
89+
"groups": groups.split(",") if groups and "," in groups else groups or None,
90+
"spatial_chunk": spatial_chunk,
91+
"tile_width": tile_width,
92+
"enable_sharding": enable_sharding,
93+
}
94+
params.update({k: v for k, v in overrides.items() if v is not None})
95+
96+
logger.info(f"Conversion params: {params}") # Construct output path
8697
output_url = f"s3://{s3_output_bucket}/{s3_output_prefix}/{collection}/{item_id}.zarr"
8798

8899
# Clean up existing output to avoid base array artifacts
@@ -98,46 +109,60 @@ def run_conversion(
98109
logger.info(f" Source: {zarr_url}")
99110
logger.info(f" Destination: {output_url}")
100111

101-
# Set up Dask cluster for parallel processing
102-
from dask.distributed import Client
112+
# Optional: Set up Dask cluster if enabled via environment variable
113+
# Note: eopf-geozarr handles its own Dask setup when using create_geozarr_dataset
114+
# This is here only for future compatibility if we need external cluster management
115+
use_dask = os.getenv("ENABLE_DASK_CLUSTER", "").lower() in ("true", "1", "yes")
116+
if use_dask:
117+
logger.info("🚀 Dask cluster enabled via ENABLE_DASK_CLUSTER env var")
118+
# Future: Could connect to external cluster here if needed
119+
# from dask.distributed import Client
120+
# dask_address = os.getenv("DASK_SCHEDULER_ADDRESS")
121+
# client = Client(dask_address) if dask_address else Client()
122+
123+
# Load source dataset
124+
logger.info("Loading source dataset...")
125+
storage_options = get_storage_options(zarr_url)
126+
dt = xr.open_datatree(
127+
zarr_url,
128+
engine="zarr",
129+
chunks="auto",
130+
storage_options=storage_options,
131+
)
132+
logger.info(f"Loaded DataTree with {len(dt.children)} groups")
133+
134+
# Convert to GeoZarr
135+
logger.info("Converting to GeoZarr format...")
136+
137+
# Parse extra flags for optional parameters
138+
kwargs: dict[str, Any] = {}
139+
if params["extra_flags"] and "--crs-groups" in params["extra_flags"]:
140+
crs_groups_str = params["extra_flags"].split("--crs-groups")[1].strip().split()[0]
141+
kwargs["crs_groups"] = [crs_groups_str]
142+
143+
# Add sharding if enabled
144+
if params.get("enable_sharding", False):
145+
kwargs["enable_sharding"] = True
146+
147+
# groups parameter must be a list
148+
groups_param = params["groups"]
149+
if isinstance(groups_param, str):
150+
groups_list: list[str] = [groups_param]
151+
else:
152+
# groups_param is list[str] in mission configs
153+
groups_list = list(groups_param) if groups_param else []
103154

104-
with Client() as client:
105-
logger.info(f"🚀 Dask cluster started: {client.dashboard_link}")
155+
create_geozarr_dataset(
156+
dt_input=dt,
157+
groups=groups_list,
158+
output_path=output_url,
159+
spatial_chunk=params["spatial_chunk"],
160+
tile_width=params["tile_width"],
161+
**kwargs,
162+
)
106163

107-
# Load source dataset
108-
logger.info("Loading source dataset...")
109-
storage_options = get_storage_options(zarr_url)
110-
dt = xr.open_datatree(
111-
zarr_url,
112-
engine="zarr",
113-
chunks="auto",
114-
storage_options=storage_options,
115-
)
116-
logger.info(f"Loaded DataTree with {len(dt.children)} groups")
117-
118-
# Convert to GeoZarr
119-
logger.info("Converting to GeoZarr format...")
120-
121-
# Parse extra flags for optional parameters
122-
kwargs = {}
123-
if params["extra_flags"] and "--crs-groups" in params["extra_flags"]:
124-
crs_groups_str = params["extra_flags"].split("--crs-groups")[1].strip().split()[0]
125-
kwargs["crs_groups"] = [crs_groups_str]
126-
127-
# groups parameter must be a list
128-
groups_list = [params["groups"]] if isinstance(params["groups"], str) else params["groups"]
129-
130-
create_geozarr_dataset(
131-
dt_input=dt,
132-
groups=groups_list,
133-
output_path=output_url,
134-
spatial_chunk=params["spatial_chunk"],
135-
tile_width=params["tile_width"],
136-
**kwargs,
137-
)
138-
139-
logger.info("✅ Conversion completed successfully!")
140-
logger.info(f"Output: {output_url}")
164+
logger.info("✅ Conversion completed successfully!")
165+
logger.info(f"Output: {output_url}")
141166

142167
return output_url
143168

@@ -150,18 +175,32 @@ def main(argv: list[str] | None = None) -> int:
150175
parser.add_argument("--s3-output-bucket", required=True, help="S3 output bucket")
151176
parser.add_argument("--s3-output-prefix", required=True, help="S3 output prefix")
152177
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
178+
# Optional parameter overrides
179+
parser.add_argument("--groups", help="Override groups (comma-separated)")
180+
parser.add_argument("--spatial-chunk", help="Override spatial chunk size")
181+
parser.add_argument("--tile-width", help="Override tile width")
182+
parser.add_argument("--enable-sharding", help="Override sharding (true/false)")
153183

154184
args = parser.parse_args(argv)
155-
156185
if args.verbose:
157186
logging.getLogger().setLevel(logging.DEBUG)
158187

188+
# Parse override args (empty string → None)
189+
groups = args.groups or None
190+
spatial_chunk = int(args.spatial_chunk) if args.spatial_chunk else None
191+
tile_width = int(args.tile_width) if args.tile_width else None
192+
enable_sharding = args.enable_sharding.lower() == "true" if args.enable_sharding else None
193+
159194
try:
160195
output_url = run_conversion(
161196
args.source_url,
162197
args.collection,
163198
args.s3_output_bucket,
164199
args.s3_output_prefix,
200+
groups,
201+
spatial_chunk,
202+
tile_width,
203+
enable_sharding,
165204
)
166205
logger.info(f"Success: {output_url}")
167206
return 0

scripts/get_conversion_params.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,30 @@
2525
"extra_flags": "--gcp-group /conditions/gcp",
2626
"spatial_chunk": 4096,
2727
"tile_width": 512,
28+
"enable_sharding": False,
2829
},
2930
"sentinel-2": {
30-
"groups": "/quality/l2a_quicklook/r10m",
31-
"extra_flags": "--crs-groups /quality/l2a_quicklook/r10m",
32-
"spatial_chunk": 4096,
33-
"tile_width": 512,
31+
"groups": [
32+
"/measurements/reflectance/r10m",
33+
"/measurements/reflectance/r20m",
34+
"/measurements/reflectance/r60m",
35+
"/quality/l2a_quicklook/r10m",
36+
],
37+
"extra_flags": "--crs-groups /conditions/geometry",
38+
"spatial_chunk": 1024,
39+
"tile_width": 256,
40+
"enable_sharding": True,
3441
},
3542
}
3643

3744

3845
def get_conversion_params(collection_id: str) -> dict[str, Any]:
39-
"""Get conversion parameters for collection.
40-
41-
Args:
42-
collection_id: Collection identifier (e.g., sentinel-1-l1-grd, sentinel-2-l2a-dp-test)
43-
44-
Returns:
45-
Dict of conversion parameters (groups, extra_flags, spatial_chunk, tile_width)
46-
"""
47-
# Extract mission prefix (sentinel-1 or sentinel-2)
46+
"""Get conversion parameters for collection. Defaults to Sentinel-2 if unrecognized."""
4847
parts = collection_id.lower().split("-")
4948
if len(parts) >= 2:
50-
prefix = f"{parts[0]}-{parts[1]}" # "sentinel-1" or "sentinel-2"
49+
prefix = f"{parts[0]}-{parts[1]}"
5150
if prefix in CONFIGS:
5251
return CONFIGS[prefix]
53-
54-
# Default to Sentinel-2 if no match
5552
return CONFIGS["sentinel-2"]
5653

5754

@@ -73,7 +70,7 @@ def main(argv: list[str] | None = None) -> int:
7370
)
7471
parser.add_argument(
7572
"--param",
76-
choices=["groups", "extra_flags", "spatial_chunk", "tile_width"],
73+
choices=["groups", "extra_flags", "spatial_chunk", "tile_width", "enable_sharding"],
7774
help="Get single parameter (for shell scripts)",
7875
)
7976

@@ -82,7 +79,12 @@ def main(argv: list[str] | None = None) -> int:
8279

8380
if args.param:
8481
# Output single parameter (for shell variable assignment)
85-
print(params.get(args.param, ""))
82+
value = params.get(args.param, "")
83+
# Convert boolean to shell-friendly format
84+
if isinstance(value, bool):
85+
print("true" if value else "false")
86+
else:
87+
print(value if value is not None else "")
8688
elif args.format == "json":
8789
# Output JSON (for parsing with jq)
8890
print(json.dumps(params, indent=2))
@@ -92,6 +94,7 @@ def main(argv: list[str] | None = None) -> int:
9294
print(f"EXTRA_FLAGS='{params['extra_flags']}'")
9395
print(f"CHUNK={params['spatial_chunk']}")
9496
print(f"TILE_WIDTH={params['tile_width']}")
97+
print(f"ENABLE_SHARDING={'true' if params['enable_sharding'] else 'false'}")
9598

9699
return 0
97100

workflows/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,25 @@ Key parameters (see [../README.md](../README.md) for full reference):
114114
- `s3_output_bucket`: Output bucket
115115
- `pipeline_image_version`: Docker image tag
116116

117+
**Override conversion parameters** (optional, for testing):
118+
119+
```bash
120+
# Example: Test with different chunk size and disable sharding
121+
argo submit workflows/base/workflowtemplate.yaml \
122+
--from workflowtemplate/geozarr-pipeline \
123+
-p source_url="https://api.example.com/stac/.../items/ITEM_ID" \
124+
-p override_spatial_chunk="2048" \
125+
-p override_enable_sharding="false"
126+
```
127+
128+
Available overrides (empty = use collection defaults):
129+
- `override_groups`: Comma-separated zarr groups (e.g., `/measurements/reflectance/r10m`)
130+
- `override_spatial_chunk`: Chunk size (e.g., `2048`)
131+
- `override_tile_width`: Tile width (e.g., `512`)
132+
- `override_enable_sharding`: Enable sharding (`true`/`false`)
133+
134+
Defaults: S2 (1024/256/true), S1 (4096/512/false). See `scripts/get_conversion_params.py`.
135+
117136
### Resource Tuning
118137

119138
Edit `workflows/base/workflowtemplate.yaml`:

workflows/base/workflowtemplate.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ spec:
3232
value: tests-output
3333
- name: pipeline_image_version
3434
value: slim
35+
# Optional conversion parameter overrides (empty = use collection defaults)
36+
- name: override_groups
37+
value: ""
38+
- name: override_spatial_chunk
39+
value: ""
40+
- name: override_tile_width
41+
value: ""
42+
- name: override_enable_sharding
43+
value: ""
3544
templates:
3645
- name: main
3746
dag:
@@ -59,6 +68,14 @@ spec:
5968
- --s3-output-prefix
6069
- "{{workflow.parameters.s3_output_prefix}}"
6170
- --verbose
71+
- --groups
72+
- "{{workflow.parameters.override_groups}}"
73+
- --spatial-chunk
74+
- "{{workflow.parameters.override_spatial_chunk}}"
75+
- --tile-width
76+
- "{{workflow.parameters.override_tile_width}}"
77+
- --enable-sharding
78+
- "{{workflow.parameters.override_enable_sharding}}"
6279
resources:
6380
requests:
6481
memory: 4Gi

0 commit comments

Comments
 (0)