Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions climate_tookit/calculate_hazards/ensemble_hazards.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,9 @@ def calculate_ensemble(crop: str, lat: float, lon: float,
if fixed_season else None)

jobs = [(m, sc) for sc in scenarios for m in models]
# max_workers <= 0 -> auto: one worker per job, capped at 16.
workers = max(1, min(len(jobs), 16) if max_workers <= 0 else min(max_workers, len(jobs)))
# max_workers <= 0 -> auto: one worker per job, capped at 10 to match the
# GEE HTTP connection pool (avoids "connection pool is full" churn).
workers = max(1, min(len(jobs), 10) if max_workers <= 0 else min(max_workers, len(jobs)))

print(f"\nNEX-GDDP ensemble: {crop} at ({lat:.4f}, {lon:.4f}) "
f"{start_year}-{end_year}")
Expand Down
5 changes: 3 additions & 2 deletions climate_tookit/climate_statistics/ensemble_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,9 @@ def _run_model(model: str):

raw_by_model: Dict[str, Dict[str, Any]] = {}
failed: List[Dict[str, str]] = []
# max_workers <= 0 -> auto: one worker per model, capped at 16.
workers = max(1, min(len(active), 16) if max_workers <= 0 else min(max_workers, len(active)))
# max_workers <= 0 -> auto: one worker per model, capped at 10 to match the
# GEE HTTP connection pool (avoids "connection pool is full" churn).
workers = max(1, min(len(active), 10) if max_workers <= 0 else min(max_workers, len(active)))

def _record(model, r, err, idx):
if err is not None or r is None:
Expand Down
5 changes: 3 additions & 2 deletions climate_tookit/climatology/long_term_climatology.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,8 +945,9 @@ def _run_model(model: str) -> Tuple[str, Optional[Dict[str, Any]], Optional[str]

per_model_results: Dict[str, Dict[str, Any]] = {}
failed: List[Dict[str, str]] = []
# max_workers <= 0 -> auto: one worker per model, capped at 16.
workers = max(1, min(len(active), 16) if max_workers <= 0 else min(max_workers, len(active)))
# max_workers <= 0 -> auto: one worker per model, capped at 10 to match the
# GEE HTTP connection pool (avoids "connection pool is full" churn).
workers = max(1, min(len(active), 10) if max_workers <= 0 else min(max_workers, len(active)))

def _record(model: str, r: Optional[Dict[str, Any]], err: Optional[str], idx: int) -> None:
if err is not None or r is None:
Expand Down
5 changes: 3 additions & 2 deletions climate_tookit/compare_periods/ensemble_periods.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,9 @@ def _run_model(model: str):

by_model: Dict[str, Dict[str, Any]] = {}
failed: List[Dict[str, str]] = []
# max_workers <= 0 -> auto: one worker per model, capped at 16.
workers = max(1, min(len(active), 16) if max_workers <= 0 else min(max_workers, len(active)))
# max_workers <= 0 -> auto: one worker per model, capped at 10 to match the
# GEE HTTP connection pool (avoids "connection pool is full" churn).
workers = max(1, min(len(active), 10) if max_workers <= 0 else min(max_workers, len(active)))

def _record(model, r, err, idx):
if err is not None or r is None:
Expand Down
74 changes: 26 additions & 48 deletions climate_tookit/season_analysis/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def aggregate_overall(model_results: List[Dict]):
return ensemble, model_averages

# Top-level orchestrator
def run_ensemble(lat, lon, start_year, end_year, scenarios, models, fixed_arg=None, verbose=True, max_workers=0):
def run_ensemble(lat, lon, start_year, end_year, scenarios, models, fixed_arg=None, verbose=True):
results = {}
mode = 'fixed' if fixed_arg else 'auto'

Expand All @@ -370,61 +370,42 @@ def run_ensemble(lat, lon, start_year, end_year, scenarios, models, fixed_arg=No
f"{start_year}–{end_year} | mode={mode}")
print('=' * 70)

# Each model is an independent, I/O-bound GEE run; fetch them
# concurrently and restore the input order afterwards.
def _run_model(model):
# NOTE: models are processed SERIALLY here on purpose. analyze_one_model
# relies on process-global state — use_nex_gddp() monkeypatches
# seasons.get_climate_data and redirect_stdout() swaps sys.stdout — which
# is not thread-safe: concurrent models would clobber each other's
# model/scenario binding (e.g. a fetch running under the wrong scenario).
# The big speed win for this path comes from the single span-fetch in
# seasons.fetch_and_analyze_years_fixed (one GEE call per model instead
# of one per year), which applies regardless of concurrency.
per_model = []
for i, model in enumerate(models, 1):
if verbose:
print(f" [{i:02d}/{len(models):02d}] {model:<22}", end=' ', flush=True)
try:
s_dict, a_dict, skip_info = analyze_one_model(
lat, lon, start_year, end_year, model, scenario, fixed_arg)
return model, {
per_model.append({
'model': model,
'seasons_dict': s_dict,
'annual_dict': a_dict,
'skip_info': skip_info,
}, None
})
if verbose:
n_seasons = sum(len(v) for v in s_dict.values())
n_years = sum(1 for v in s_dict.values() if v)
extra = ''
if mode == 'auto' and skip_info['perhumid_years']:
extra = f" [perhumid: {len(skip_info['perhumid_years'])}y]"
print(f"✓ {n_seasons} season(s) over {n_years} year(s){extra}")
except Exception as exc:
return model, {
per_model.append({
'model': model, 'seasons_dict': {}, 'annual_dict': {},
'skip_info': {'perhumid_years': [], 'no_season_years': [], 'analyzed_years': []},
'error': f"{type(exc).__name__}: {exc}",
}, f"{type(exc).__name__}: {exc}"

def _log(idx, entry, err):
if not verbose:
return
if err:
print(f" [{idx:02d}/{len(models):02d}] {model_name(entry):<22} ✗ {err}", flush=True)
else:
s_dict = entry['seasons_dict']
n_seasons = sum(len(v) for v in s_dict.values())
n_years = sum(1 for v in s_dict.values() if v)
extra = ''
if mode == 'auto' and entry['skip_info'].get('perhumid_years'):
extra = f" [perhumid: {len(entry['skip_info']['perhumid_years'])}y]"
print(f" [{idx:02d}/{len(models):02d}] {model_name(entry):<22} "
f"✓ {n_seasons} season(s) over {n_years} year(s){extra}", flush=True)

def model_name(entry):
return entry.get('model', '?')

by_model = {}
# max_workers <= 0 -> auto: one worker per model, capped at 16.
workers = max(1, min(len(models), 16) if max_workers <= 0 else min(max_workers, len(models)))
if workers == 1:
for i, model in enumerate(models, 1):
_, entry, err = _run_model(model)
by_model[model] = entry
_log(i, entry, err)
else:
from concurrent.futures import ThreadPoolExecutor, as_completed
order = {m: i for i, m in enumerate(models, 1)}
with ThreadPoolExecutor(max_workers=workers) as ex:
futs = {ex.submit(_run_model, m): m for m in models}
for fut in as_completed(futs):
m, entry, err = fut.result()
by_model[m] = entry
_log(order[m], entry, err)
per_model = [by_model[m] for m in models if m in by_model]
})
if verbose:
print(f"✗ {type(exc).__name__}: {exc}")

ok = sum(1 for r in per_model if not r.get('error'))
diagnostics = _aggregate_skip_info(per_model)
Expand Down Expand Up @@ -667,8 +648,6 @@ def main():
p.add_argument('--list-models', action='store_true', help='Print models and exit')
p.add_argument('--output', help='Save JSON result here')
p.add_argument('--quiet', action='store_true')
p.add_argument('--workers', type=int, default=0,
help='Parallel GEE fetch workers across models (default: auto = one per model, capped at 16; use 1 to disable)')
args = p.parse_args()

if args.source_key:
Expand Down Expand Up @@ -705,7 +684,6 @@ def main():
scenarios = scenarios, models = models,
fixed_arg = args.fixed_season,
verbose = not args.quiet,
max_workers = args.workers,
)

if not args.quiet:
Expand Down
45 changes: 29 additions & 16 deletions climate_tookit/season_analysis/seasons.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,35 +615,48 @@ def fetch_and_analyze_years_fixed(
annual_dict : Dict[int, Dict] = {}
force = None if source == "auto" else source

# Resolve all season windows up front so we know the full span to fetch.
resolved_by_year: Dict[int, List] = {}
max_cess = date(end_year, 12, 31)
for year in range(start_year, end_year + 1):
print(f"\nFixed-season year {year} | source={source}")

# Resolve all season dates for this year
resolved = []
for sd in fixed_seasons:
(o_m, o_d) = sd["onset_md"]
(c_m, c_d) = sd["cessation_md"]
cess_year = year + 1 if (c_m, c_d) < (o_m, o_d) else year
try:
resolved.append((date(year, o_m, o_d), date(cess_year, c_m, c_d)))
cd = date(cess_year, c_m, c_d)
resolved.append((date(year, o_m, o_d), cd))
if cd > max_cess:
max_cess = cd
except ValueError as exc:
print(f" [WARNING] Invalid date: {exc}")
resolved_by_year[year] = resolved

# Fetch the entire period (plus any year-crossing tail) in ONE call, then
# slice per year in memory. The per-year helpers filter internally by year
# / window, so passing the full master frame yields identical results
# while replacing N_years fetches with one.
overall_start = f"{start_year}-01-01"
overall_end = max_cess.strftime("%Y-%m-%d")
print(f" Fetching {overall_start} to {overall_end} (full span, one call) ...")
try:
master = get_climate_data(lat, lon, overall_start, overall_end, force_source=force)
master = add_et0(master, lat)
print(f" Retrieved {len(master)} days")
except Exception as exc:
print(f" ✗ Data fetch failed: {exc} — stats will be n/a")
master = None

for year in range(start_year, end_year + 1):
print(f"\nFixed-season year {year} | source={source}")

resolved = resolved_by_year.get(year, [])
if not resolved:
annual_dict[year] = {}
continue

# Fetch data: full calendar year + any year-crossing tail, in one call
fetch_start = f"{year}-01-01"
fetch_end = max(date(year, 12, 31), max(c for _, c in resolved)).strftime("%Y-%m-%d")
print(f" Fetching {fetch_start} to {fetch_end} ...")

try:
df = get_climate_data(lat, lon, fetch_start, fetch_end, force_source=force)
df = add_et0(df, lat)
print(f" Retrieved {len(df)} days")
except Exception as exc:
print(f" ✗ Data fetch failed: {exc} — stats will be n/a")
df = None
df = master # helpers slice to the year / window internally

# Annual stats (calendar year only)
if df is not None and not df.empty:
Expand Down
Loading