diff --git a/experiments/trimmed-ensemble/ensembles-first-run.py b/experiments/trimmed-ensemble/ensembles-first-run.py new file mode 100644 index 00000000..45039632 --- /dev/null +++ b/experiments/trimmed-ensemble/ensembles-first-run.py @@ -0,0 +1,167 @@ +from functools import partial + +import pandas as pd + +# Remember to set your project path! +# Added for standarization +from datasetsforecast.m4 import M4, Monthly +from utilsforecast.evaluation import evaluate +from utilsforecast.losses import mase, smape + +from timecopilot.models.ensembles.median import MedianEnsemble +from timecopilot.models.ensembles.trimmed import TrimmedEnsemble +from timecopilot.models.foundation.chronos import Chronos +from timecopilot.models.foundation.flowstate import FlowState +from timecopilot.models.foundation.timesfm import TimesFM +from timecopilot.models.foundation.tirex import TiRex +from timecopilot.models.stats import AutoARIMA, SeasonalNaive, Theta + +# A few notes and instructions regarding this script. This experiment was +# created as a means to compare the implementation of the median and the trimmed +# ensembles. These run on 50 timeseries on 7 models. The idea is to revise +# proper execution of both ensembles, but to later establish a whole gift-eval +# run and assess whether conditions exists where one ensemble turns out to be +# sharper than the other. + + +# ----------------------------- +# helpers +# ----------------------------- +def normalize_month_start(df: pd.DataFrame) -> pd.DataFrame: + out = df.copy() + p = pd.to_datetime(out["ds"]).dt.to_period("M") + out["ds"] = p.dt.to_timestamp() # month start by default + return out + + +def debug_df(name, df): + print(f"\n[{name}] shape={df.shape}") + print(df.head(3)) + print(f"[{name}] dtypes:\n{df.dtypes}") + print(f"[{name}] unique_id n={df['unique_id'].nunique()}") + print(f"[{name}] ds min/max: {df['ds'].min()} -> {df['ds'].max()}") + print(f"[{name}] y NaNs: {df['y'].isna().sum()}") + lens = df.groupby("unique_id").size() + print(f"[{name}] per-series length:\n{lens.to_string()}") + + +def debug_forecast_output(name, fcst, alias): + print(f"\n[{name}] forecast shape={fcst.shape}") + print(fcst.head(3)) + print(f"[{name}] columns: {list(fcst.columns)}") + if alias not in fcst.columns: + raise RuntimeError(f"[{name}] missing point column: {alias}") + na_point = fcst[alias].isna().sum() + print(f"[{name}] point NaNs ({alias}): {na_point}/{len(fcst)}") + + +def hard_align_or_die(test_df, fcst_df, alias, name): + merged = test_df.merge(fcst_df, on=["unique_id", "ds"], how="inner") + if len(merged) != len(test_df): + # show a tiny forensic sample + t0 = test_df["ds"].sort_values().head(3).to_list() + f0 = fcst_df["ds"].sort_values().head(3).to_list() + raise RuntimeError( + f"[{name}] Broken alignment: merged={len(merged)} test={len(test_df)}. " + f"ds(test) head={t0} ds(fcst) head={f0}" + ) + if merged[alias].isna().any(): + raise RuntimeError(f"[{name}] Alignment ok but predictions contain NaNs.") + return merged + + +# ----------------------------- +# data (M4 Monthly via datasetsforecast) +# ----------------------------- +DATA_DIR = "data" +GROUP = "Monthly" + +H = int(Monthly.horizon) # 18 +SEAS = int(Monthly.seasonality) # 12 +FREQ = str(Monthly.freq) # 'M' (we still normalize ds ourselves) + +y_df, *_ = M4.load(directory=DATA_DIR, group=GROUP) +y_df = normalize_month_start(y_df) + +# Official split: last H points per series are test +test_all = y_df.groupby("unique_id", sort=False).tail(H).copy() +train_all = y_df.drop(test_all.index).copy() + +# keep only series with >=70 TRAIN points, then pick 50 +len_by_id = train_all.groupby("unique_id").size() +eligible = len_by_id[len_by_id >= 70].index +series_ids = eligible[:50].to_numpy() + +train_df = train_all[train_all["unique_id"].isin(series_ids)].copy() +test_df = test_all[test_all["unique_id"].isin(series_ids)].copy() + +print(f"[setup] eligible(>=70 train)={len(eligible)}; using={len(series_ids)}") +print(f"[setup] horizon h={H} seasonality={SEAS} freq={FREQ}") + +debug_df("train_df", train_df) +debug_df("test_df", test_df) + +# ----------------------------- +# models +# ----------------------------- +batch_size = 64 +base_models = [ + Chronos(repo_id="amazon/chronos-2", batch_size=batch_size), + TimesFM(repo_id="google/timesfm-2.5-200m-pytorch", batch_size=batch_size), + TiRex(batch_size=batch_size), + SeasonalNaive(), + AutoARIMA(), + Theta(), + FlowState(), +] + +median_ens = MedianEnsemble(models=base_models, alias="Median") +trimmed_ens = TrimmedEnsemble(models=base_models, alias="Trimmed") + + +# ----------------------------- +# run + eval +# ----------------------------- +def run_and_score(ens, name): + print(f"\n=== running {name} ===") + fcst = ens.forecast(df=train_df, h=H, freq="M") # keep your call + fcst = normalize_month_start(fcst) + + debug_forecast_output(name, fcst, ens.alias) + + # HARD alignment check (no silent NaN merges) + merged = hard_align_or_die(test_df, fcst, ens.alias, name) + print(f"[{name}] merge rows={len(merged)} (expected {len(test_df)})") + + # Standardized eval (sMAPE + MASE) + monthly_mase = partial(mase, seasonality=SEAS) + scores = evaluate( + merged, + metrics=[smape, monthly_mase], + train_df=train_df, # needed for MASE + ) + + # Extract sMAPE per series for this model + smape_rows = scores[scores["metric"] == "smape"][ + ["unique_id", ens.alias] + ].set_index("unique_id") + per_series = smape_rows[ens.alias] + overall = float(per_series.mean()) + + print(f"[{name}] sMAPE overall: {overall:.2f}") + + # Optional: also print MASE overall + mase_rows = scores[scores["metric"] == "mase"][["unique_id", ens.alias]].set_index( + "unique_id" + ) + print(f"[{name}] MASE overall: {float(mase_rows[ens.alias].mean()):.4f}") + + return overall, per_series, fcst + + +median_overall, median_per, median_fcst = run_and_score(median_ens, "MedianEnsemble") +trim_overall, trim_per, trim_fcst = run_and_score(trimmed_ens, "TrimmedEnsemble") + +print("\n=== summary (sMAPE ↓ better) ===") +print(f"MedianEnsemble : {median_overall:.2f}") +print(f"TrimmedEnsemble: {trim_overall:.2f}") diff --git a/timecopilot/models/ensembles/trimmed.py b/timecopilot/models/ensembles/trimmed.py new file mode 100644 index 00000000..31c8f34d --- /dev/null +++ b/timecopilot/models/ensembles/trimmed.py @@ -0,0 +1,239 @@ +import numpy as np +import pandas as pd +from sklearn.isotonic import IsotonicRegression + +from ... import TimeCopilotForecaster +from ..utils.forecaster import Forecaster, QuantileConverter + + +class TrimmedEnsemble(Forecaster): + """ + TrimmedEnsemble (alternate ensemble to MedianEnsemble) + + Purpose + ------- + A robust ensemble that aggregates model forecasts using a + *trimmed mean* for quantiles + (and optionally for point forecasts), with safety rails: + + 1) Fixed trimming per row (unique_id, ds): we first compute the + *minimum* number of + available contributors across all requested quantiles for that row (n_min). + Then we decide how much to trim based on n_min, and apply the same trimming + intensity to every quantile in that row. + + 2) Minimum contributor quota: if n_min < min_quota, we fall back to the *median* + aggregation for quantiles for that row (and skip isotonic repair). + + 3) Monotone quantiles: when a full quantile vector exists (no NaNs) and the row + did not fallback, we run isotonic regression to enforce: + q10 <= q50 <= q90 <= ... + This is a "repair" step only for monotonicity, not a modeling step. + + Notes + ----- + - This ensemble tolerates missing quantile columns per model (point-only models). + - When quantiles include 0.5, point forecast is set to the ensemble + q50 for coherence. + """ + + def __init__( + self, + models: list[Forecaster], + alias: str = "TrimmedEnsemble", + min_quota: int = 2, + trim_low_threshold: int = 8, # n_min >= this -> switch to lower trim percentage + trim_10p_threshold: int | None = None, + ): + self.tcf = TimeCopilotForecaster(models=models, fallback_model=None) + self.alias = alias + self.min_quota = int(min_quota) + # Backward-compatible handling: `trim_10p_threshold` (deprecated) overrides + # `trim_low_threshold` if explicitly provided. + effective_threshold = ( + trim_10p_threshold if trim_10p_threshold is not None else trim_low_threshold + ) + self.trim_low_threshold = int(effective_threshold) + # Preserve old attribute name for any existing internal uses. + self.trim_10p_threshold = self.trim_low_threshold + + # ---------- trimming policy (fixed per row based on n_min) ---------- + + def _trim_k_from_nmin(self, n_min: int) -> int: + """ + Decide how many values to trim from each tail (k) given n_min contributors. + + Rule agreed: + - n_min 3–4 -> trim 1 each side + - n_min 5–7 -> trim 20% + - n_min >= 8 -> trim 10% (simple fixed choice; avoids a "10–20%" ambiguity) + + Returns: + k (int): how many to drop from each tail. + """ + if n_min <= 7: + return 0 + if n_min <= 8: + return 1 + if n_min <= 9: + return int(np.floor(0.20 * n_min)) + return int(np.floor(0.10 * n_min)) + + @staticmethod + def _trimmed_mean_1d(values: np.ndarray, k: int) -> float: + """ + Trim k from each tail, then mean. Ignores NaNs. + + If trimming would remove everything (2k >= n), we fall back to plain mean. + """ + x = values.astype(float) + x = x[~np.isnan(x)] + n = x.size + if n == 0: + return np.nan + if k <= 0 or (2 * k) >= n: + return float(np.mean(x)) + x.sort() + return float(np.mean(x[k : n - k])) + + @staticmethod + def _nanmedian_1d(values: np.ndarray) -> float: + """Median ignoring NaNs; returns NaN if all values are NaN.""" + x = values.astype(float) + return float(np.nanmedian(x)) if np.any(~np.isnan(x)) else np.nan + + # ---------- main API ---------- + + def forecast( + self, + df: pd.DataFrame, + h: int, + freq: str | None = None, + level: list[int | float] | None = None, + quantiles: list[float] | None = None, + ) -> pd.DataFrame: + qc = QuantileConverter(level=level, quantiles=quantiles) + + # Call all models; merged output includes each model alias column (point), + # and (if provided by the model) alias-q-{pct} columns for each quantile. + _fcst_df = self.tcf._call_models( + "forecast", + merge_on=["unique_id", "ds"], + df=df, + h=h, + freq=freq, + level=None, + quantiles=qc.quantiles, + ) + + fcst_df = _fcst_df[["unique_id", "ds"]].copy() + model_cols = [m.alias for m in self.tcf.models] + + # Point forecast: + # Keep median for robustness (same as MedianEnsemble baseline). + # If q50 is requested later, we overwrite with ensemble q50 for coherence. + fcst_df[self.alias] = _fcst_df[model_cols].median(axis=1) + + # No probabilistic output requested -> done. + if qc.quantiles is None: + return fcst_df + + # Quantile setup + qs = sorted(qc.quantiles) + q_cols = [f"{self.alias}-q-{int(q * 100)}" for q in qs] + + # Map pct -> existing per-model quantile columns (some may be missing) + models_q_cols_map: dict[int, list[str]] = {} + for q in qs: + pct = int(q * 100) + expected = [f"{alias}-q-{pct}" for alias in model_cols] + models_q_cols_map[pct] = [c for c in expected if c in _fcst_df.columns] + + n_rows = len(_fcst_df) + fallback_mask = np.zeros(n_rows, dtype=bool) + k_by_row = np.zeros(n_rows, dtype=int) + + # Decide trimming ONCE per row: + # - Compute n_min = min contributors across requested + # quantiles (after NaN filtering) + # - If n_min < min_quota -> fallback to median for ALL quantiles in that row + # - Else compute k = trim_k_from_nmin(n_min) + for i in range(n_rows): + counts = [] + row_idx = _fcst_df.index[i] + + for q in qs: + pct = int(q * 100) + cols_here = models_q_cols_map[pct] + if not cols_here: + counts.append(0) + continue + vals = _fcst_df.loc[row_idx, cols_here].to_numpy(dtype=float) + counts.append(int(np.sum(~np.isnan(vals)))) + + n_min = int(min(counts)) if counts else 0 + + if n_min < self.min_quota: + fallback_mask[i] = True + k_by_row[i] = 0 + else: + k_by_row[i] = self._trim_k_from_nmin(n_min) + + # Aggregate quantiles (trimmed mean if not fallback; otherwise median) + for q in qs: + pct = int(q * 100) + cols_here = models_q_cols_map[pct] + out_col = f"{self.alias}-q-{pct}" + + if not cols_here: + # Nobody produced this quantile column. + fcst_df[out_col] = np.nan + continue + + vals_mat = _fcst_df[cols_here].to_numpy(dtype=float) + out = np.empty(n_rows, dtype=float) + + for i in range(n_rows): + if fallback_mask[i]: + out[i] = self._nanmedian_1d(vals_mat[i]) + else: + out[i] = self._trimmed_mean_1d(vals_mat[i], k=int(k_by_row[i])) + + fcst_df[out_col] = out + + # Isotonic monotonicity repair: + # Only valid when: + # - row did NOT fallback, AND + # - all requested quantiles exist for that row + # (no NaNs in the ensemble quantiles) + # Otherwise: skip (do not "repair" partial/broken vectors). + ir = IsotonicRegression(increasing=True) + q_vals = fcst_df[q_cols].to_numpy(dtype=float) + repaired = q_vals.copy() + + for i in range(n_rows): + if fallback_mask[i]: + continue + if np.any(np.isnan(repaired[i])): + continue + repaired[i] = ir.fit_transform(qs, repaired[i]) + + fcst_df[q_cols] = repaired + + # If q50 requested, make point forecast equal to median quantile output. + if 0.5 in qc.quantiles: + fcst_df[self.alias] = fcst_df[f"{self.alias}-q-50"].values + + # One-line disclosure if any fallback occurred. + n_fallback = int(fallback_mask.sum()) + if n_fallback > 0: + print( + f"{self.alias}: quantiles fallback->median \ + for {n_fallback}/{n_rows} rows " + f"(min_quota={self.min_quota}); isotonic \ + skipped on fallback/NaN rows." + ) + + # Convert quantiles to levels if user requested `level=...` + fcst_df = qc.maybe_convert_quantiles_to_level(fcst_df, models=[self.alias]) + return fcst_df