Refactor hf-sim to use xarray and Dask for larger-than-memory station processing#102
Refactor hf-sim to use xarray and Dask for larger-than-memory station processing#102Copilot wants to merge 4 commits into
Conversation
- Add HostType(str, Enum) with 'local' and 'slurm' values - Add dask_cluster context manager for LocalCluster/SLURMCluster - Add load_hf_dataset: loads stations into chunked xarray Dataset - Add process_hf_dataset: map_blocks-compatible station processor - Refactor run_hf CLI to use --host option and Dask workflow - Replace ThreadPoolExecutor with xr.map_blocks + dask.distributed - Add dask[distributed] and dask-jobqueue dependencies - Add tests for HostType, load_hf_dataset, process_hf_dataset Agent-Logs-Url: https://github.com/ucgmsim/workflow/sessions/66b1316d-545e-44d8-96d2-317087688eed Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com>
… test Agent-Logs-Url: https://github.com/ucgmsim/workflow/sessions/66b1316d-545e-44d8-96d2-317087688eed Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com>
lispandfound
left a comment
There was a problem hiding this comment.
I've changed my mind here. This is run in a container, so a slurmcluster is a bad idea because it requires too many hacks. However I do believe this template is still useful because dask allows us to process larger than memory datasets which multiprocessing won't do. Take out the slurm stuff and refactor this to focus on being able to process larger-than-memory outputs. This means in particular you should not eagerly compute the whole array at any point (no .values, no .result, no .compute) and instead allow the graph to get all the way up to to_netcdf before saving to let dask handle the memory pressure. These output files can be up to 100gb.
… output - Remove HostType enum, dask_cluster context manager, --host CLI option - Remove dask-jobqueue dependency - Use dask.array.empty for template to avoid allocating memory - Remove all eager .compute()/.result()/.values on large data - Let Dask graph flow lazily to to_netcdf for chunk-by-chunk writing - Remove test_host_type_values test Agent-Logs-Url: https://github.com/ucgmsim/workflow/sessions/dee90e3b-9cd0-4ac3-afcc-58a8a3d10c06 Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com>
Done in 9953992. Removed all SLURM/HostType/dask-jobqueue code and the
|
| assert "seed" in ds.data_vars | ||
| assert "vref" in ds.data_vars | ||
|
|
||
| np.testing.assert_allclose(ds["latitude"].values, [-43.5, -43.6, -43.7]) |
There was a problem hiding this comment.
You should use xarray testing facilities to simplify this test
There was a problem hiding this comment.
Refactored in 9b69839. test_load_hf_dataset now builds an expected xr.Dataset and uses xr.testing.assert_allclose for the data variable comparison. test_process_hf_dataset_structure uses xr.testing.assert_equal for coordinate comparisons instead of np.testing.assert_array_equal.
Replace np.testing.assert_allclose/.assert_array_equal with xr.testing.assert_allclose/assert_equal for dataset and coordinate comparisons in test_load_hf_dataset and test_process_hf_dataset_structure. Agent-Logs-Url: https://github.com/ucgmsim/workflow/sessions/eb2dbecf-d37e-45e1-bc15-020c0d78111f Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com>
Summary
Refactors the
hf-simscript to replaceThreadPoolExecutorwithxarray.map_blocks+dask.distributedfor parallelization, enabling scaling to 100k+ stations and supporting larger-than-memory output files (up to 100GB+). The entire Dask graph flows lazily toto_netcdf, so the full waveform array never needs to reside in memory.Changes
New Functions
load_hf_dataset(...)— Reads the station CSV, computes seeds and vref, returns a chunkedxr.Datasetindexed bystation. Chunk size is calculated asmax(1, total_stations // 500)to keep the task graph between ~500–1000 tasks.process_hf_dataset(ds, *, hf_sim_path, hf_input_template)—map_blocks-compatible function that iterates stations in a chunk and callshf_simulate_station. Returnswaveform(component × station × time) andepicentre_distance(station).CLI Refactor
load_hf_dataset→ build template withdask.array.empty→xr.map_blocks(process_hf_dataset)→ assign metadata lazily →ds.to_netcdf(out_file, engine="h5netcdf")..compute(),.result(), or.valueson large data — Dask streams chunks to disk one at a time.LocalClusterfor parallel subprocess execution within the container.Dependencies
dask[distributed]topyproject.toml.Tests
test_load_hf_dataset— Checks dataset structure, coordinates, and attributes.test_load_hf_dataset_chunking— Verifies chunking logic with 1500 stations (chunk_size=3, no data loss).test_process_hf_dataset_structure— Mockshf_simulate_stationand verifies output dataset dimensions.Notes
hf_simulate_stationandprocess_hf_datasetare standalone (not nested) functions for pickle serialization to Dask workers.hb_high_binmodbinary's memory footprint (documented in module docstring).map_blockstemplate usesdask.array.emptyto avoid allocating memory for the full waveform array upfront.