Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 65 additions & 110 deletions echopype/convert/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Literal, Optional, Tuple, Union

import dask.array
import fsspec
from xarray import DataTree

Expand All @@ -13,7 +14,7 @@
# fmt: on
from ..echodata.echodata import XARRAY_ENGINE_MAP, EchoData
from ..utils import io
from ..utils.coding import COMPRESSION_SETTINGS
from ..utils.coding import COMPRESSION_SETTINGS, sanitize_dtypes, set_storage_encodings
from ..utils.log import _init_logger
from ..utils.prov import add_processing_level

Expand Down Expand Up @@ -100,120 +101,74 @@ def to_file(


def _save_groups_to_file(echodata, output_path, engine, compress=True, **kwargs):
"""Serialize all groups to file."""
# TODO: in terms of chunking, would using rechunker at the end be faster and more convenient?
# TODO: investigate chunking before we save Dataset to a file

# Top-level group
io.save_file(
echodata["Top-level"],
path=output_path,
mode="w",
engine=engine,
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Environment group
io.save_file(
echodata["Environment"], # TODO: chunking necessary?
path=output_path,
mode="a",
engine=engine,
group="Environment",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Platform group
io.save_file(
echodata["Platform"], # TODO: chunking necessary? time1 and time2 (EK80) only
path=output_path,
mode="a",
engine=engine,
group="Platform",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)
"""Serialize all groups to file using DataTree-native I/O.

# Platform/NMEA group: some sonar model does not produce NMEA data
if echodata["Platform/NMEA"] is not None:
io.save_file(
echodata["Platform/NMEA"], # TODO: chunking necessary?
path=output_path,
mode="a",
engine=engine,
group="Platform/NMEA",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
Uses ``DataTree.to_netcdf`` or ``DataTree.to_zarr`` to write the entire
tree in a single call, replacing the previous per-group save loop.
"""
tree = echodata._tree
if tree is None:
raise ValueError("EchoData has no DataTree to save.")

compression_settings = COMPRESSION_SETTINGS[engine] if compress else None

# Sanitize dtypes in each node (e.g. convert object → str) and build
# the nested encoding dict keyed by group path (e.g. "/", "/Environment").
# Use inherit=False so each group only produces encodings for its own
# variables, not coordinates inherited from parent nodes.
encoding = {}
for group_path in tree.groups:
node = tree[group_path] if group_path != "/" else tree
if not (node.has_data or node.has_attrs):
continue
ds = node.to_dataset(inherit=False)
if len(ds.variables) == 0:
continue
ds = sanitize_dtypes(ds)
group_encoding = set_storage_encodings(ds, compression_settings, engine)

# For zarr: align dask chunks with encoding chunks to avoid
# "overlapping chunks" errors during parallel writes.
if engine == "zarr":
for var, enc in group_encoding.items():
if var in ds and isinstance(ds[var].data, dask.array.Array):
enc_chunks = enc.get("chunks")
if enc_chunks is not None:
ds[var] = ds[var].chunk(
dict(zip(ds[var].dims, enc_chunks))
)

node.dataset = ds
encoding[group_path] = group_encoding

if engine == "netcdf4":
if isinstance(output_path, fsspec.FSMap):
# DataTree.to_netcdf requires a file path, not an FSMap
file_path = output_path.root
else:
file_path = str(output_path)
tree.to_netcdf(
file_path,
mode="w",
engine="netcdf4",
encoding=encoding,
write_inherited_coords=True,
**kwargs,
)

# Provenance group
io.save_file(
echodata["Provenance"],
path=output_path,
group="Provenance",
mode="a",
engine=engine,
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Sonar group
io.save_file(
echodata["Sonar"],
path=output_path,
group="Sonar",
mode="a",
engine=engine,
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# /Sonar/Beam_groupX group
if echodata.sonar_model == "AD2CP":
for i in range(1, len(echodata["Sonar"]["beam_group"]) + 1):
io.save_file(
echodata[f"Sonar/Beam_group{i}"],
path=output_path,
mode="a",
engine=engine,
group=f"Sonar/Beam_group{i}",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)
else:
io.save_file(
echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"],
path=output_path,
mode="a",
engine=engine,
group=f"Sonar/{BEAM_SUBGROUP_DEFAULT}",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
elif engine == "zarr":
if isinstance(output_path, fsspec.FSMap):
store = output_path.root
else:
store = str(output_path)
tree.to_zarr(
store,
mode="w",
encoding=encoding,
write_inherited_coords=True,
**kwargs,
)
if echodata["Sonar/Beam_group2"] is not None:
# some sonar model does not produce Sonar/Beam_group2
io.save_file(
echodata["Sonar/Beam_group2"],
path=output_path,
mode="a",
engine=engine,
group="Sonar/Beam_group2",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Vendor_specific group
io.save_file(
echodata["Vendor_specific"], # TODO: chunking necessary?
path=output_path,
mode="a",
engine=engine,
group="Vendor_specific",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)
else:
raise ValueError(f"{engine} is not a supported save format")


def _set_convert_params(param_dict: Dict[str, str]) -> Dict[str, str]:
Expand Down
Loading
Loading