Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 74 additions & 3 deletions tests/test_generating_dataarrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,92 @@ def test_convert_snowcover_dates_to_s1_overpasses_missing_date():
# -----------------------------
def test_generate_forest_fraction_dataarray_inside_conus(monkeypatch):
aoi = box(-100, 30, -99, 31)

# Patch download_proba_v to return temporary file
def fake_download(*args, **kwargs):
tmp = tempfile.NamedTemporaryFile(suffix=".tif", delete=False)
da = xr.DataArray(np.array([[50,100],[0,25]]))
da.rio.write_crs("EPSG:4326", inplace=True)
da.rio.to_raster(tmp.name)
return tmp.name

monkeypatch.setattr("spicy_snow.utils.download.download_proba_v", fake_download)

da = generate_forest_fraction_dataarray(aoi)
assert da.max() <= 1
assert da.min() >= 0


def test_generate_forest_fraction_dataarray_outside_conus(monkeypatch, tmp_path):
"""Regression smoke test for the non-CONUS FCF code path. Catches the
'unexercised non-CONUS branch' class of bugs (PRs #79, #80, #85): missing
download_proba_v args, eager-load OOM, and clip_box failure modes.
"""
# Alaska AOI — within_conus(aoi) returns False here so the Proba-V
# branch in generate_forest_fraction_dataarray runs.
aoi = box(-148, 65, -147, 66)

# Build a small synthetic FCF tif that covers the AOI + buffer in
# EPSG:4326 (matches Lievens raster CRS). 0-100 range like the real one.
fcf_path = tmp_path / 'fake_fcf.tif'
n = 30
lats = np.linspace(66.5, 64.5, n) # descending, north-up
lons = np.linspace(-148.5, -146.5, n)
rng = np.random.default_rng(7)
da = xr.DataArray(
rng.uniform(0, 100, (n, n)).astype('float32'),
dims=('y', 'x'),
coords={'y': lats, 'x': lons},
)
da = da.rio.write_crs("EPSG:4326")
da.rio.to_raster(fcf_path)

# Mock download_proba_v wherever it's referenced: by name in
# utils.download AND as imported into generate_dataarrays (the actual
# call site after PR #79). Patching just utils.download wouldn't
# affect the already-imported reference in generate_dataarrays.
def fake_download(out_fp):
# Real download_proba_v writes to out_fp and returns it; mirror that.
import shutil
shutil.copy(str(fcf_path), str(out_fp))
return out_fp

monkeypatch.setattr(
"spicy_snow.processing.generate_dataarrays.download_proba_v",
fake_download,
)

# No ref: exercises the AOI-bounds clip_box path
out_no_ref = generate_forest_fraction_dataarray(aoi)
assert isinstance(out_no_ref, xr.DataArray)
assert out_no_ref.max() <= 1
assert out_no_ref.min() >= 0

# With ref: exercises reproject_match path. Use a small synthetic ref
# grid in the AOI's lat/lon range.
ref = xr.DataArray(
np.zeros((10, 10), dtype='float32'),
dims=('y', 'x'),
coords={
'y': np.linspace(65.9, 65.1, 10), # descending
'x': np.linspace(-147.9, -147.1, 10),
},
).rio.write_crs("EPSG:4326")

out_with_ref = generate_forest_fraction_dataarray(aoi, ref=ref)
assert isinstance(out_with_ref, xr.DataArray)
assert out_with_ref.shape == ref.shape # reproject_match aligned to ref
assert out_with_ref.max() <= 1
assert out_with_ref.min() >= 0


# Note: a unit test for generate_reference_grid's y-orientation normalization
# (PR #85) would require faking a Sentinel-1 burst tif with a specific
# transform/coord inconsistency that arises in production from degenerate
# single-acquisition tracks. That's hard to construct synthetically without
# real S1 data; the normalization is defensive and covered implicitly by
# the post-download pipeline test if its inputs are ever flipped.

# -----------------------------
# Test validate_aoi and within_conus
# -----------------------------
Expand Down
154 changes: 154 additions & 0 deletions tests/test_pipeline_synthetic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Synthetic snapshot-style end-to-end test for the post-download pipeline.

The download path (S1, VIIRS, Proba-V) is mocked at the boundary by
constructing a realistic Dataset that mimics what would come out of
generate_sentinel1_dataarray + generate_snowcover_dataarray +
generate_forest_fraction_dataarray. The rest of the pipeline
(amplitude_to_dB -> s1_orbit_averaging -> s1_clip_outliers ->
calc_delta_* -> calc_snow_index -> calc_snow_index_to_snow_depth ->
wet_snow flags) runs against it.

This catches a wide class of regressions in the post-download chain
without needing network, Earthdata creds, or real downloads — the gap
that let the bugs from this session leak into a release. The dataset
deliberately includes:

* Multiple orbits with disjoint time sets (catches in-place .loc[] writes)
* One single-acquisition orbit (the T021 shape)
* Multi-timestep snowcover that varies per timestep (catches stale-`ts`)
* Realistic dB-range vv/vh values (catches preprocessing assertions)

It is NOT a numerical-correctness regression test; it asserts the
pipeline runs without error and produces outputs in plausible ranges.
"""
import numpy as np
import pandas as pd
import pytest
import xarray as xr

from spicy_snow.processing.s1_preprocessing import (
amplitude_to_dB, s1_orbit_averaging, s1_clip_outliers,
)
from spicy_snow.processing.snow_index import (
calc_delta_vv, calc_delta_cross_ratio, calc_delta_gamma,
clip_delta_gamma_outlier, calc_snow_index, calc_snow_index_to_snow_depth,
)
from spicy_snow.processing.wet_snow import (
id_newly_wet_snow, id_wet_negative_si, id_newly_frozen_snow, flag_wet_snow,
)


@pytest.fixture
def synthetic_post_download_dataset():
"""Mimics the dataset shape that retrieve_snow_depth has after the
download + initial assembly step (right before amplitude_to_dB).

Variables: vv, vh (linear amplitude), snowcover (binary 0/1), fcf
(0-1), watermask (binary), with a multi-orbit time axis including
a single-acquisition orbit.
"""
# 18 timesteps over a winter season, 3 orbits with disjoint time
# patterns + one orphan single-acquisition orbit (track 21).
# Sorted chronologically (production data comes through
# combine_close_images(...sortby('time')) so the snow-index funcs
# assume a sorted time index for slice-based prev-image lookups).
times_unsorted = pd.to_datetime([
# Orbit 10 (6-day)
'2024-10-01', '2024-10-07', '2024-10-13', '2024-10-19',
'2024-10-25', '2024-10-31', '2024-11-06',
# Orbit 65 (6-day, offset by 3 days)
'2024-10-04', '2024-10-10', '2024-10-16', '2024-10-22',
'2024-10-28', '2024-11-03', '2024-11-09',
# Orbit 94 (12-day)
'2024-10-12', '2024-10-24', '2024-11-05',
# Orbit 21 - ORPHAN, single acquisition
'2024-11-15',
])
track_unsorted = np.array(
[10] * 7 + [65] * 7 + [94] * 3 + [21]
)
order = np.argsort(times_unsorted.values)
times = times_unsorted[order]
track = track_unsorted[order]
n_t = len(times)
n_y, n_x = 8, 10
rng = np.random.default_rng(0)

# Sentinel-1 in linear amplitude (will be converted to dB downstream).
# Values around 0.01-0.5 → about -20 to -3 dB.
vv_amp = rng.uniform(0.01, 0.4, size=(n_t, n_y, n_x)).astype('float32')
vh_amp = rng.uniform(0.005, 0.2, size=(n_t, n_y, n_x)).astype('float32')

# Snowcover varies per timestep — early-season has mixed coverage,
# late-season fully snow-covered.
snowcover = np.ones((n_t, n_y, n_x), dtype=int)
for t in range(min(4, n_t)):
# patches of bare ground in first few timesteps
snowcover[t, t % n_y, : (t + 2) % n_x or 1] = 0

fcf = rng.uniform(0.1, 0.8, size=(n_y, n_x)).astype('float32')
watermask = np.zeros((n_y, n_x), dtype=int)

ds = xr.Dataset(
{
'vv': (('time', 'y', 'x'), vv_amp),
'vh': (('time', 'y', 'x'), vh_amp),
'snowcover': (('time', 'y', 'x'), snowcover),
'fcf': (('y', 'x'), fcf),
'watermask': (('y', 'x'), watermask),
},
coords={
'time': times,
'y': np.linspace(65.7, 65.1, n_y),
'x': np.linspace(-148.3, -147.4, n_x),
'track': ('time', track),
},
attrs={'s1_units': 'amp'},
)
return ds


def test_post_download_pipeline_runs_end_to_end(synthetic_post_download_dataset):
"""Run the entire post-download pipeline against a multi-orbit synthetic
dataset (including a single-acquisition orbit) and assert plausible
outputs. This is the offline counterpart to the integration test that
would catch the snow-index, wet-snow, and preprocessing regressions.
"""
ds = synthetic_post_download_dataset

# Preprocessing
ds = amplitude_to_dB(ds)
ds = s1_orbit_averaging(ds)
ds = s1_clip_outliers(ds)

# Snow-index chain
ds = calc_delta_cross_ratio(ds, A=2)
ds = calc_delta_vv(ds)
ds = calc_delta_gamma(ds, B=0.5)
ds = clip_delta_gamma_outlier(ds)
ds = calc_snow_index(ds, ims_masking=True)
ds = calc_snow_index_to_snow_depth(ds, C=0.55)

# Wet-snow flags
ds = id_newly_wet_snow(ds, wet_thresh=-2)
ds = id_wet_negative_si(ds, wet_SI_thresh=0)
ds = id_newly_frozen_snow(ds, freeze_thresh=2)
ds = flag_wet_snow(ds)

# Sanity checks: outputs exist with expected dims
for var in ('deltaCR', 'deltavv', 'deltaGamma', 'snow_index',
'snow_depth', 'wet_snow', 'perma_wet'):
assert var in ds.data_vars, f"missing {var} after pipeline"
assert ds[var].sizes['time'] == ds.sizes['time']

# snow_depth should be non-negative where defined
sd = ds['snow_depth'].values
finite_sd = sd[np.isfinite(sd)]
assert (finite_sd >= 0).all(), "snow_depth must be non-negative"

# wet_snow should be 0 or 1 (or NaN) — not >1 from the stale-`ts` bug
wet = ds['wet_snow'].values
finite_wet = wet[np.isfinite(wet)]
assert ((finite_wet == 0) | (finite_wet == 1)).all(), (
"wet_snow must be binary (0/1) wherever defined"
)
54 changes: 54 additions & 0 deletions tests/test_snow_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,57 @@ def test_delta_cr_errors(test_dataset):
import pytest
with pytest.raises(AssertionError):
calc_delta_cross_ratio(test_dataset)


def test_calc_delta_cross_ratio_disjoint_orbits_with_singleton():
"""Regression test for the read-only-buffer bug (PR #85): orbits with
disjoint time sets, including one single-acquisition orbit (T021-style),
used to break in-place .loc[time=...] writes under numpy 2.x. Verifies
the per-orbit-list + xr.concat path works for these shapes.
"""
times = pd.to_datetime([
'2024-10-01', '2024-10-07', '2024-10-13', # orbit A
'2024-10-04', '2024-10-10', '2024-10-16', # orbit B
'2024-11-15', # orbit C - single acquisition
])
track = np.array([10, 10, 10, 65, 65, 65, 21])
n_t = len(times)
rng = np.random.default_rng(0)
ds = xr.Dataset(
{
'vv': (('time', 'y', 'x'), rng.normal(-15, 2, (n_t, 3, 4))),
'vh': (('time', 'y', 'x'), rng.normal(-22, 2, (n_t, 3, 4))),
},
coords={
'time': times, 'y': [0, 1, 2], 'x': [0, 1, 2, 3],
'track': ('time', track),
},
attrs={'s1_units': 'dB'},
)

out = calc_delta_cross_ratio(ds, A=2)

# Output exists and aligns to dataset time axis
assert 'deltaCR' in out.data_vars
assert out['deltaCR'].sizes['time'] == n_t

# Each orbit's first timestep is NaN (diff drops it). For singleton T021,
# the only timestep should also be NaN since there's no previous.
is_nan = np.isnan(out['deltaCR'].values).all(axis=(1, 2)) # collapse y,x
expected_nan_times = sorted([
'2024-10-01', '2024-10-04', '2024-11-15',
])
actual_nan_times = sorted(
[str(t.date()) for t in pd.to_datetime(out.time.values[is_nan])]
)
assert actual_nan_times == expected_nan_times

# Verify a known per-orbit delta: orbit A second timestep
A = 2
gamma_cr = (A * ds['vh']) - ds['vv']
expected_step = (
gamma_cr.sel(time='2024-10-07') - gamma_cr.sel(time='2024-10-01')
)
assert_allclose(
out['deltaCR'].sel(time='2024-10-07'), expected_step,
)
49 changes: 49 additions & 0 deletions tests/test_wetsnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,55 @@ def test_newly_frozen_assertion(test_ds):
id_newly_frozen_snow(test_ds.drop_vars(['deltaGamma']))


def test_flag_wet_snow_snowcover_mask_applied_to_all_timesteps():
"""Regression test for the stale-`ts` bug (PR #79): the final snowcover
masking line in flag_wet_snow used a loop variable from outside the
loop scope, so it only zeroed wet_snow at the last timestep instead of
every timestep. Construct a multi-timestep dataset where snowcover is
False in different cells per timestep and assert that wet_snow == 0
at every (time, y, x) where snowcover is False.
"""
# 6 timesteps over 2 orbits, big enough to get past flag_wet_snow's
# >=4 melt-season check
times = pd.date_range("2025-02-01", periods=6, freq="6D")
n_t = len(times)
rng = np.random.default_rng(42)
# snowcover varies per timestep: a different cell is "no snow" each step
snowcover = np.ones((n_t, 3, 3), dtype=bool)
for t in range(n_t):
snowcover[t, t % 3, t % 3] = False
ds = xr.Dataset(
{
"deltavv": (("time", "y", "x"), rng.normal(0, 2, (n_t, 3, 3))),
"deltaCR": (("time", "y", "x"), rng.normal(0, 2, (n_t, 3, 3))),
"deltaGamma": (("time", "y", "x"), rng.normal(0, 1, (n_t, 3, 3))),
"snow_index": (("time", "y", "x"), rng.normal(0.2, 0.1, (n_t, 3, 3))),
"snowcover": (("time", "y", "x"), snowcover),
"fcf": (("y", "x"), np.full((3, 3), 0.5)),
"vv": (("time", "y", "x"), rng.normal(-15, 1, (n_t, 3, 3))),
},
coords={
"time": times,
"track": ("time", [10, 65, 10, 65, 10, 65]),
},
attrs={"s1_units": "dB"},
)
ds = id_newly_wet_snow(ds, wet_thresh=-2)
ds = id_wet_negative_si(ds, wet_SI_thresh=0)
ds = id_newly_frozen_snow(ds, freeze_thresh=0.5)
ds = flag_wet_snow(ds)

# Every cell with snowcover=False must end up with wet_snow=0 (or NaN
# from upstream masking, but never >0).
no_snow_mask = ~ds['snowcover'].values
wet_at_no_snow = ds['wet_snow'].values[no_snow_mask]
nonzero_at_no_snow = wet_at_no_snow[~np.isnan(wet_at_no_snow)] != 0
assert not nonzero_at_no_snow.any(), (
"wet_snow should be 0 (or NaN) wherever snowcover is False, "
"across ALL timesteps — not just the last."
)


# def test_negative_snow_index_wet(test_ds):
# ds = test_ds
# ds['snowcover'][0,0,0] = 2
Expand Down
Loading