From d4acb66622693db04c3a3da7181ab37076e0bb39 Mon Sep 17 00:00:00 2001 From: "Zink, Zephyr" Date: Mon, 18 May 2026 13:30:28 -0600 Subject: [PATCH 1/3] edits, tests --- src/gdm/distribution/sys_functools.py | 272 +++++++++++++++++++++----- tests/test_timeseries.py | 261 ++++++++++++++++++++++++ 2 files changed, 488 insertions(+), 45 deletions(-) diff --git a/src/gdm/distribution/sys_functools.py b/src/gdm/distribution/sys_functools.py index 2f586247..2153b8f8 100644 --- a/src/gdm/distribution/sys_functools.py +++ b/src/gdm/distribution/sys_functools.py @@ -18,6 +18,7 @@ from gdm.distribution.components.distribution_load import DistributionLoad from gdm.distribution.components.distribution_solar import DistributionSolar from gdm.distribution.components.distribution_battery import DistributionBattery +from gdm.distribution.enums import Phase from gdm.distribution.distribution_system import DistributionSystem, UserAttributes from gdm.exceptions import ( InconsistentTimeSeriesAggregation, @@ -123,6 +124,50 @@ def _get_solar_power( ) +def _get_load_power_per_phase( + load: DistributionLoad, ts_data: TimeSeriesData, metadata: TimeSeriesMetadata +) -> list[tuple[Phase, Quantity]]: + """Internal function to return per-phase load power as a list of (phase, power) tuples.""" + if metadata.features is None: + msg = f"The {metadata.name} data is not a GDM quantity: {metadata.get_time_series_data_type()}" + raise GDMQuantityError(msg) + + user_attr = UserAttributes.model_validate(metadata.features) + denormalized_data = get_time_series_actual_data(ts_data) + + if user_attr.use_actual: + return [(phase, denormalized_data) for phase in load.phases] + + if metadata.name in {"active_power", "reactive_power"}: + return [ + ( + phase, + denormalized_data.magnitude.tolist() + * ( + ph_load.real_power + if metadata.name == "active_power" + else ph_load.reactive_power + ), + ) + for phase, ph_load in zip(load.phases, load.equipment.phase_loads) + ] + else: + msg = f"{metadata.name} is not supported for load power calculation." + raise UnsupportedVariableError(msg) + + +def _get_solar_power_per_phase( + solar: DistributionSolar, ts_data: TimeSeriesData, metadata: TimeSeriesMetadata +) -> list[tuple[Phase, Quantity]]: + """Internal function to return per-phase solar power as a list of (phase, power) tuples. + + Solar has no per-phase power model, so total power is split equally across phases. + """ + total_power = _get_solar_power(solar, ts_data, metadata) + n_phases = len(solar.phases) + return [(phase, total_power / n_phases) for phase in solar.phases] + + def _check_for_time_series_metadata_consistency(ts_metadata: list[TimeSeriesMetadata]): # Extract unique properties from ts_data @@ -352,6 +397,9 @@ def _get_combined_single_time_series_df( power_function: Callable, unit_conversion: dict[str, str], time_series_type: Type[TimeSeriesData] = SingleTimeSeries, + aggregate_phases: bool = True, + per_phase_function: Callable | None = None, + include_features: bool = False, ) -> pd.DataFrame: """ Generalized function for returning combined single time series dataframe for given component type. @@ -370,6 +418,16 @@ def _get_combined_single_time_series_df( Optional dictionary to perform unit conversion on data in pint quantities. time_series_type: Type[TimeSeriesData] Type of time series data. Defaults to: SingleTimeSeries + aggregate_phases: bool + If True (default), phases are summed and no ``phase`` column is added. + If False, one row per phase is emitted and a ``phase`` column is added. + Requires ``per_phase_function`` when False. + per_phase_function: Callable | None + Function with the same signature as ``power_function`` that returns + ``list[tuple[Phase, Quantity]]``. Required when ``aggregate_phases=False``. + include_features: bool + If True, columns for each entry in ``metadata.features`` (excluding + ``use_actual``) are added to the output DataFrame. Defaults to False. Returns ------- pd.DataFrame @@ -410,35 +468,76 @@ def _get_combined_single_time_series_df( owner=component, name=var, time_series_type=time_series_type ) metadata = [meta for meta in ts_metadata if meta.name == var][0] - power_data = power_function(component, ts_data, metadata) + timestamps = [ + ts_data.initial_timestamp + idx * ts_data.resolution + for idx in range(ts_data.length) + ] + features_cols: dict = ( + { + k: [v] * ts_data.length + for k, v in (metadata.features or {}).items() + if k != "use_actual" + } + if include_features + else {} + ) - if var in unit_conversion and not isinstance(power_data, Quantity): - msg = ( - f"Unit conversion specified for {var}, but power data is not a pint Quantity." + if not aggregate_phases and per_phase_function is not None: + phase_power_pairs: list[tuple[Phase, Quantity]] = per_phase_function( + component, ts_data, metadata ) - raise GDMQuantityError(msg) - - dfs.append( - pd.DataFrame( - { - "timestamp": [ - ts_data.initial_timestamp + idx * ts_data.resolution - for idx in range(ts_data.length) - ], - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] if var in unit_conversion else power_data.units - ] - * ts_data.length, - } + for phase, power_data in phase_power_pairs: + if var in unit_conversion and not isinstance(power_data, Quantity): + msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." + raise GDMQuantityError(msg) + dfs.append( + pd.DataFrame( + { + "timestamp": timestamps, + "name": [var] * ts_data.length, + "component_uuid": [component.uuid] * ts_data.length, + "phase": [phase] * ts_data.length, + "value": ( + power_data.to(unit_conversion[var]).magnitude + if var in unit_conversion + else power_data + ), + "units": [ + unit_conversion[var] + if var in unit_conversion + else power_data.units + ] + * ts_data.length, + **features_cols, + } + ) + ) + else: + power_data = power_function(component, ts_data, metadata) + if var in unit_conversion and not isinstance(power_data, Quantity): + msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." + raise GDMQuantityError(msg) + dfs.append( + pd.DataFrame( + { + "timestamp": timestamps, + "name": [var] * ts_data.length, + "component_uuid": [component.uuid] * ts_data.length, + "value": ( + power_data.to(unit_conversion[var]).magnitude + if var in unit_conversion + else power_data + ), + "units": [ + unit_conversion[var] + if var in unit_conversion + else power_data.units + ] + * ts_data.length, + **features_cols, + } + ) ) - ) return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() @@ -450,9 +549,12 @@ def _get_combined_nonsequential_time_series_df( power_function: Callable, unit_conversion: dict[str, str], time_series_type: Type[TimeSeriesData] = NonSequentialTimeSeries, + aggregate_phases: bool = True, + per_phase_function: Callable | None = None, + include_features: bool = False, ) -> pd.DataFrame: """ - Generalized function for returning combined single time series dataframe for given component type. + Generalized function for returning combined nonsequential time series dataframe for given component type. Parameters ---------- @@ -468,6 +570,16 @@ def _get_combined_nonsequential_time_series_df( Optional dictionary to perform unit conversion on data in pint quantities. time_series_type: Type[TimeSeriesData] Type of time series data. Defaults to: NonSequentialTimeSeries + aggregate_phases: bool + If True (default), phases are summed and no ``phase`` column is added. + If False, one row per phase is emitted and a ``phase`` column is added. + Requires ``per_phase_function`` when False. + per_phase_function: Callable | None + Function with the same signature as ``power_function`` that returns + ``list[tuple[Phase, Quantity]]``. Required when ``aggregate_phases=False``. + include_features: bool + If True, columns for each entry in ``metadata.features`` (excluding + ``use_actual``) are added to the output DataFrame. Defaults to False. Returns ------- @@ -509,26 +621,67 @@ def _get_combined_nonsequential_time_series_df( owner=component, name=var, time_series_type=time_series_type ) metadata = [meta for meta in ts_metadata if meta.name == var][0] - power_data = power_function(component, ts_data, metadata) - dfs.append( - pd.DataFrame( - { - "timestamp": ts_data.timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] if var in unit_conversion else power_data.units - ] - * ts_data.length, - } - ) + features_cols: dict = ( + { + k: [v] * ts_data.length + for k, v in (metadata.features or {}).items() + if k != "use_actual" + } + if include_features + else {} ) + if not aggregate_phases and per_phase_function is not None: + phase_power_pairs: list[tuple[Phase, Quantity]] = per_phase_function( + component, ts_data, metadata + ) + for phase, power_data in phase_power_pairs: + dfs.append( + pd.DataFrame( + { + "timestamp": ts_data.timestamps, + "name": [var] * ts_data.length, + "component_uuid": [component.uuid] * ts_data.length, + "phase": [phase] * ts_data.length, + "value": ( + power_data.to(unit_conversion[var]).magnitude + if var in unit_conversion + else power_data + ), + "units": [ + unit_conversion[var] + if var in unit_conversion + else power_data.units + ] + * ts_data.length, + **features_cols, + } + ) + ) + else: + power_data = power_function(component, ts_data, metadata) + dfs.append( + pd.DataFrame( + { + "timestamp": ts_data.timestamps, + "name": [var] * ts_data.length, + "component_uuid": [component.uuid] * ts_data.length, + "value": ( + power_data.to(unit_conversion[var]).magnitude + if var in unit_conversion + else power_data + ), + "units": [ + unit_conversion[var] + if var in unit_conversion + else power_data.units + ] + * ts_data.length, + **features_cols, + } + ) + ) + return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() @@ -537,6 +690,8 @@ def get_combined_load_time_series_df( unit_conversion: dict[str, str], var_of_interest: set[str] = {"active_power", "reactive_power"}, time_series_type: Type[TimeSeriesData] = SingleTimeSeries, + aggregate_phases: bool = True, + include_features: bool = False, ) -> pd.DataFrame: """ Function for returning combined time series dataframe for load components. @@ -551,6 +706,12 @@ def get_combined_load_time_series_df( Set of variable names of interest. Defaults to: {"active_power", "reactive_power"} time_series_type: Type[TimeSeriesData] Type of time series data. Defaults to: SingleTimeSeries + aggregate_phases: bool + If True (default), phase powers are summed into a single row per timestamp. + If False, one row per phase is emitted with a ``phase`` column. + include_features: bool + If True, columns for each entry in ``metadata.features`` (excluding + ``use_actual``) are added to the output DataFrame. Defaults to False. Returns ------- pd.DataFrame @@ -563,6 +724,9 @@ def get_combined_load_time_series_df( power_function=_get_load_power, unit_conversion=unit_conversion, time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_load_power_per_phase, + include_features=include_features, ) elif time_series_type.__name__ == "NonSequentialTimeSeries": return _get_combined_nonsequential_time_series_df( @@ -572,6 +736,9 @@ def get_combined_load_time_series_df( power_function=_get_load_power, unit_conversion=unit_conversion, time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_load_power_per_phase, + include_features=include_features, ) else: msg = f"get_combined_load_time_series_df not implemented for {time_series_type.__name__}" @@ -583,6 +750,8 @@ def get_combined_solar_time_series_df( unit_conversion: dict[str, str], var_of_interest: set[str] = {"irradiance"}, time_series_type: Type[TimeSeriesData] = SingleTimeSeries, + aggregate_phases: bool = True, + include_features: bool = False, ) -> pd.DataFrame: """ Function for returning combined time series dataframe for solar components. @@ -597,6 +766,13 @@ def get_combined_solar_time_series_df( Set of variable names of interest. Defaults to: {"irradiance"} time_series_type: Type[TimeSeriesData] Type of time series data. Defaults to: SingleTimeSeries + aggregate_phases: bool + If True (default), the total solar power is returned as a single row per timestamp. + If False, one row per phase is emitted with a ``phase`` column; total power is + split equally across phases since no per-phase solar model exists. + include_features: bool + If True, columns for each entry in ``metadata.features`` (excluding + ``use_actual``) are added to the output DataFrame. Defaults to False. Returns ------- pd.DataFrame @@ -609,6 +785,9 @@ def get_combined_solar_time_series_df( power_function=_get_solar_power, unit_conversion=unit_conversion, time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_solar_power_per_phase, + include_features=include_features, ) return solar_df.replace("irradiance", "active_power") elif time_series_type.__name__ == "NonSequentialTimeSeries": @@ -619,6 +798,9 @@ def get_combined_solar_time_series_df( power_function=_get_solar_power, unit_conversion=unit_conversion, time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_solar_power_per_phase, + include_features=include_features, ) return solar_df.replace("irradiance", "active_power") else: diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 2a02606d..2fe9c59f 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -8,6 +8,7 @@ from gdm.distribution.distribution_system import DistributionSystem from gdm.distribution.components import DistributionLoad, DistributionSolar +from gdm.distribution.enums import Phase from gdm.distribution.sys_functools import ( get_combined_solar_time_series_df, get_combined_load_time_series_df, @@ -350,3 +351,263 @@ def test_quantity_error(simple_distribution_system): var_of_interest={"active_power"}, time_series_type=SingleTimeSeries, ) + + +# ── aggregate_phases=True default ────────────────────────────────────────── + + +def test_aggregate_phases_default_no_phase_column(distribution_system_with_single_time_series): + """Default behavior (aggregate_phases=True) must not emit a 'phase' column.""" + gdm_sys: DistributionSystem = distribution_system_with_single_time_series + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"active_power": "kilowatts"}, + var_of_interest={"active_power"}, + time_series_type=SingleTimeSeries, + ) + assert "phase" not in df.columns + + +# ── aggregate_phases=False (per-phase) ───────────────────────────────────── + + +def test_load_per_phase_single_time_series(distribution_system_with_single_time_series): + """aggregate_phases=False emits a 'phase' column with per-phase power values.""" + gdm_sys: DistributionSystem = distribution_system_with_single_time_series + + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"reactive_power": "kilovar"}, + var_of_interest={"reactive_power"}, + aggregate_phases=False, + time_series_type=SingleTimeSeries, + ) + + assert "phase" in df.columns + + loads: list[DistributionLoad] = list(gdm_sys.get_components(DistributionLoad)) + # Each load contributes len(phases) rows per timestamp + expected_rows = sum(len(load.phases) for load in loads) * 5 + assert len(df) == expected_rows + + # For a 3-phase load, per-phase reactive value == multiplier × phase_peak + # (NOT multiplier × sum_of_3_phase_peaks as the aggregated version would give) + three_phase_load = next( + load for load in loads if len(load.phases) == 3 and Phase.A in load.phases + ) + phase_a_rows = df[ + (df["component_uuid"] == three_phase_load.uuid) & (df["phase"] == Phase.A) + ].sort_values("timestamp") + phase_peak_kvar = ( + three_phase_load.equipment.phase_loads[0].reactive_power.to("kilovar").magnitude + ) + assert np.allclose(phase_a_rows["value"].values, np.array([1, 2, 3, 4, 5]) * phase_peak_kvar) + + +def test_load_per_phase_nonsequential_time_series( + distribution_system_with_nonsequential_time_series, +): + """aggregate_phases=False works for NonSequentialTimeSeries loads.""" + gdm_sys: DistributionSystem = distribution_system_with_nonsequential_time_series + + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"reactive_power": "kilovar"}, + var_of_interest={"reactive_power"}, + aggregate_phases=False, + time_series_type=NonSequentialTimeSeries, + ) + + assert "phase" in df.columns + + loads: list[DistributionLoad] = list(gdm_sys.get_components(DistributionLoad)) + expected_rows = sum(len(load.phases) for load in loads) * 5 + assert len(df) == expected_rows + + three_phase_load = next( + load for load in loads if len(load.phases) == 3 and Phase.A in load.phases + ) + phase_a_rows = df[ + (df["component_uuid"] == three_phase_load.uuid) & (df["phase"] == Phase.A) + ].sort_values("timestamp") + phase_peak_kvar = ( + three_phase_load.equipment.phase_loads[0].reactive_power.to("kilovar").magnitude + ) + assert np.allclose(phase_a_rows["value"].values, np.array([1, 2, 3, 4, 5]) * phase_peak_kvar) + + +def test_solar_per_phase_single_time_series(distribution_system_with_single_time_series): + """aggregate_phases=False splits total solar power equally across phases.""" + gdm_sys: DistributionSystem = distribution_system_with_single_time_series + + per_phase_df = get_combined_solar_time_series_df( + gdm_sys, + unit_conversion={"irradiance": "kilowatts"}, + aggregate_phases=False, + time_series_type=SingleTimeSeries, + ) + assert "phase" in per_phase_df.columns + + pvs: list[DistributionSolar] = list(gdm_sys.get_components(DistributionSolar)) + expected_rows = sum(len(pv.phases) for pv in pvs) * 5 + assert len(per_phase_df) == expected_rows + + # Per-phase value == total / n_phases for each component + agg_df = get_combined_solar_time_series_df( + gdm_sys, + unit_conversion={"irradiance": "kilowatts"}, + aggregate_phases=True, + time_series_type=SingleTimeSeries, + ) + for pv in pvs: + n_phases = len(pv.phases) + agg_vals = ( + agg_df[agg_df["component_uuid"] == pv.uuid].sort_values("timestamp")["value"].values + ) + for phase in pv.phases: + phase_vals = ( + per_phase_df[ + (per_phase_df["component_uuid"] == pv.uuid) & (per_phase_df["phase"] == phase) + ] + .sort_values("timestamp")["value"] + .values + ) + assert np.allclose(phase_vals, agg_vals / n_phases) + + +def test_solar_per_phase_nonsequential_time_series( + distribution_system_with_nonsequential_time_series, +): + """aggregate_phases=False works for NonSequentialTimeSeries solar.""" + gdm_sys: DistributionSystem = distribution_system_with_nonsequential_time_series + + df = get_combined_solar_time_series_df( + gdm_sys, + unit_conversion={"irradiance": "kilowatts"}, + aggregate_phases=False, + time_series_type=NonSequentialTimeSeries, + ) + assert "phase" in df.columns + + pvs: list[DistributionSolar] = list(gdm_sys.get_components(DistributionSolar)) + expected_rows = sum(len(pv.phases) for pv in pvs) * 5 + assert len(df) == expected_rows + + +# ── include_features=True ────────────────────────────────────────────────── + + +def test_include_features_load_single_time_series(distribution_system_with_single_time_series): + """include_features=True adds metadata.features columns (excluding use_actual).""" + gdm_sys: DistributionSystem = distribution_system_with_single_time_series + + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"active_power": "kilowatts", "reactive_power": "kilovar"}, + include_features=True, + time_series_type=SingleTimeSeries, + ) + + assert "profile_type" in df.columns + assert "profile_name" in df.columns + assert "use_actual" not in df.columns + + ap_rows = df[df["name"] == "active_power"] + assert (ap_rows["profile_type"] == "PMult").all() + assert (ap_rows["profile_name"] == "load_profile_kw").all() + + rp_rows = df[df["name"] == "reactive_power"] + assert (rp_rows["profile_type"] == "QMult").all() + assert (rp_rows["profile_name"] == "load_profile_kvar").all() + + +def test_include_features_load_nonsequential_time_series( + distribution_system_with_nonsequential_time_series, +): + """include_features=True works for NonSequentialTimeSeries loads.""" + gdm_sys: DistributionSystem = distribution_system_with_nonsequential_time_series + + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"active_power": "kilowatts", "reactive_power": "kilovar"}, + include_features=True, + time_series_type=NonSequentialTimeSeries, + ) + + assert "profile_type" in df.columns + assert "profile_name" in df.columns + assert "use_actual" not in df.columns + + ap_rows = df[df["name"] == "active_power"] + assert (ap_rows["profile_type"] == "PMult").all() + assert (ap_rows["profile_name"] == "load_profile_kw").all() + + +def test_include_features_solar_single_time_series(distribution_system_with_single_time_series): + """include_features=True adds features columns for solar.""" + gdm_sys: DistributionSystem = distribution_system_with_single_time_series + + df = get_combined_solar_time_series_df( + gdm_sys, + unit_conversion={"irradiance": "kilowatts"}, + include_features=True, + time_series_type=SingleTimeSeries, + ) + + assert "profile_type" in df.columns + assert "profile_name" in df.columns + assert "use_actual" not in df.columns + assert (df["profile_type"] == "PMult").all() + assert (df["profile_name"] == "pv_profile").all() + + +def test_load_per_phase_and_features(distribution_system_with_single_time_series): + """aggregate_phases=False and include_features=True work together.""" + gdm_sys: DistributionSystem = distribution_system_with_single_time_series + + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"reactive_power": "kilovar"}, + var_of_interest={"reactive_power"}, + aggregate_phases=False, + include_features=True, + time_series_type=SingleTimeSeries, + ) + + assert "phase" in df.columns + assert "profile_type" in df.columns + assert "profile_name" in df.columns + assert "use_actual" not in df.columns + + +def test_include_features_arbitrary_string_feature(simple_distribution_system): + """include_features=True includes arbitrary extra features like scenario='Scenario1'.""" + gdm_sys: DistributionSystem = simple_distribution_system + + load_profile_kw = SingleTimeSeries.from_array( + data=ActivePower([1, 2, 3, 4, 5], "kilowatt"), + name="active_power", + initial_timestamp=datetime(2020, 1, 1), + resolution=timedelta(minutes=30), + ) + loads: list[DistributionLoad] = list(gdm_sys.get_components(DistributionLoad)) + gdm_sys.add_time_series( + load_profile_kw, + *loads, + profile_type="PMult", + profile_name="load_profile_kw", + use_actual=True, + scenario="Scenario1", + ) + + df = get_combined_load_time_series_df( + gdm_sys, + unit_conversion={"active_power": "kilowatts"}, + var_of_interest={"active_power"}, + include_features=True, + time_series_type=SingleTimeSeries, + ) + + assert "scenario" in df.columns + assert (df["scenario"] == "Scenario1").all() + assert "use_actual" not in df.columns From 9d77f4a1c759caea535039fff3589884feb60b14 Mon Sep 17 00:00:00 2001 From: Aadil Latif Date: Tue, 19 May 2026 13:04:15 -0600 Subject: [PATCH 2/3] refactor: unify _get_combined_*_time_series_df, extract helpers, remove legacy aliases --- src/gdm/distribution/sys_functools.py | 361 +++++++------------------- 1 file changed, 97 insertions(+), 264 deletions(-) diff --git a/src/gdm/distribution/sys_functools.py b/src/gdm/distribution/sys_functools.py index 2153b8f8..54453474 100644 --- a/src/gdm/distribution/sys_functools.py +++ b/src/gdm/distribution/sys_functools.py @@ -390,171 +390,70 @@ def get_aggregated_load_time_series( ) -def _get_combined_single_time_series_df( - sys: DistributionSystem, - component_type: type, - var_of_interest: set[str], - power_function: Callable, - unit_conversion: dict[str, str], - time_series_type: Type[TimeSeriesData] = SingleTimeSeries, - aggregate_phases: bool = True, - per_phase_function: Callable | None = None, - include_features: bool = False, -) -> pd.DataFrame: - """ - Generalized function for returning combined single time series dataframe for given component type. - - Parameters - ---------- - sys: DistributionSystem - Instance of DistributionSystem. - component_type: type - The type of components to retrieve (e.g., DistributionLoad, DistributionSolar). - var_of_interest: set[str] - Set of variable names of interest. - power_function: callable - Function to compute power data for the component. - unit_conversion: dict[str, str] - Optional dictionary to perform unit conversion on data in pint quantities. - time_series_type: Type[TimeSeriesData] - Type of time series data. Defaults to: SingleTimeSeries - aggregate_phases: bool - If True (default), phases are summed and no ``phase`` column is added. - If False, one row per phase is emitted and a ``phase`` column is added. - Requires ``per_phase_function`` when False. - per_phase_function: Callable | None - Function with the same signature as ``power_function`` that returns - ``list[tuple[Phase, Quantity]]``. Required when ``aggregate_phases=False``. - include_features: bool - If True, columns for each entry in ``metadata.features`` (excluding - ``use_actual``) are added to the output DataFrame. Defaults to False. - Returns - ------- - pd.DataFrame - - Raises - ------ - NoComponentsFoundError - If no components of the specified type are found. - NoTimeSeriesDataFound - If no time series data is found for a component. - TypeError - If time series data is not of type SingleTimeSeries. - TimeSeriesVariableDoesNotExist - If specified variables do not exist for the given component. - """ - dfs = [] - components: list[Component] = list(sys.get_components(component_type)) - if not components: - raise NoComponentsFoundError( - f"No components of type {component_type.__name__} found in {sys.name}" - ) +def _get_timestamps(ts_data: TimeSeriesData) -> list: + """Extract timestamps from SingleTimeSeries or NonSequentialTimeSeries.""" + if isinstance(ts_data, SingleTimeSeries): + return [ + ts_data.initial_timestamp + idx * ts_data.resolution for idx in range(ts_data.length) + ] + return ts_data.timestamps - for component in components: - ts_metadata = sys.list_time_series_metadata(component, time_series_type=time_series_type) - if not ts_metadata: - msg = f"No time series data found for {component=}." - raise NoTimeSeriesDataFound(msg) - - avail_vars = {md.name for md in ts_metadata} +def _convert_power_value(power_data, var: str, unit_conversion: dict[str, str]): + """Apply unit conversion to power data. Returns (value, units) tuple.""" + if var in unit_conversion: + if not isinstance(power_data, Quantity): + msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." + raise GDMQuantityError(msg) + return power_data.to(unit_conversion[var]).magnitude, unit_conversion[var] + return power_data, power_data.units - if not var_of_interest.issubset(avail_vars): - msg = f"{avail_vars=}. Only {var_of_interest=} is supported for dataframe computation." - raise TimeSeriesVariableDoesNotExist(msg) - for var in var_of_interest & avail_vars: - ts_data: SingleTimeSeries = sys.get_time_series( - owner=component, name=var, time_series_type=time_series_type - ) - metadata = [meta for meta in ts_metadata if meta.name == var][0] - timestamps = [ - ts_data.initial_timestamp + idx * ts_data.resolution - for idx in range(ts_data.length) - ] - features_cols: dict = ( - { - k: [v] * ts_data.length - for k, v in (metadata.features or {}).items() - if k != "use_actual" - } - if include_features - else {} - ) +def _extract_features_cols(metadata: TimeSeriesMetadata, length: int) -> dict: + """Extract feature columns from metadata, excluding use_actual.""" + return {k: [v] * length for k, v in (metadata.features or {}).items() if k != "use_actual"} - if not aggregate_phases and per_phase_function is not None: - phase_power_pairs: list[tuple[Phase, Quantity]] = per_phase_function( - component, ts_data, metadata - ) - for phase, power_data in phase_power_pairs: - if var in unit_conversion and not isinstance(power_data, Quantity): - msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." - raise GDMQuantityError(msg) - dfs.append( - pd.DataFrame( - { - "timestamp": timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "phase": [phase] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } - ) - ) - else: - power_data = power_function(component, ts_data, metadata) - if var in unit_conversion and not isinstance(power_data, Quantity): - msg = f"Unit conversion specified for {var}, but power data is not a pint Quantity." - raise GDMQuantityError(msg) - dfs.append( - pd.DataFrame( - { - "timestamp": timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } - ) - ) - return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() +def _build_power_row_df( + timestamps: list, + var: str, + component_uuid, + length: int, + power_data, + unit_conversion: dict[str, str], + features_cols: dict, + phase: Phase | None = None, +) -> pd.DataFrame: + """Build a DataFrame for one (component, variable, [phase]) time series slice.""" + value, units = _convert_power_value(power_data, var, unit_conversion) + row: dict = { + "timestamp": timestamps, + "name": [var] * length, + "component_uuid": [component_uuid] * length, + } + if phase is not None: + row["phase"] = [phase] * length + row["value"] = value + row["units"] = [units] * length + row.update(features_cols) + return pd.DataFrame(row) -def _get_combined_nonsequential_time_series_df( +def _get_combined_time_series_df( sys: DistributionSystem, component_type: type, var_of_interest: set[str], power_function: Callable, unit_conversion: dict[str, str], - time_series_type: Type[TimeSeriesData] = NonSequentialTimeSeries, + time_series_type: Type[TimeSeriesData] = SingleTimeSeries, aggregate_phases: bool = True, per_phase_function: Callable | None = None, include_features: bool = False, ) -> pd.DataFrame: """ - Generalized function for returning combined nonsequential time series dataframe for given component type. + Generalized function for returning combined time series dataframe for given component type. + + Works with both SingleTimeSeries and NonSequentialTimeSeries. Parameters ---------- @@ -569,7 +468,7 @@ def _get_combined_nonsequential_time_series_df( unit_conversion: dict[str, str] Optional dictionary to perform unit conversion on data in pint quantities. time_series_type: Type[TimeSeriesData] - Type of time series data. Defaults to: NonSequentialTimeSeries + Type of time series data. Defaults to: SingleTimeSeries aggregate_phases: bool If True (default), phases are summed and no ``phase`` column is added. If False, one row per phase is emitted and a ``phase`` column is added. @@ -591,8 +490,6 @@ def _get_combined_nonsequential_time_series_df( If no components of the specified type are found. NoTimeSeriesDataFound If no time series data is found for a component. - TypeError - If time series data is not of type NonSequentialTimeSeries. TimeSeriesVariableDoesNotExist If specified variables do not exist for the given component. """ @@ -617,68 +514,40 @@ def _get_combined_nonsequential_time_series_df( raise TimeSeriesVariableDoesNotExist(msg) for var in var_of_interest & avail_vars: - ts_data: NonSequentialTimeSeries = sys.get_time_series( + ts_data = sys.get_time_series( owner=component, name=var, time_series_type=time_series_type ) metadata = [meta for meta in ts_metadata if meta.name == var][0] - features_cols: dict = ( - { - k: [v] * ts_data.length - for k, v in (metadata.features or {}).items() - if k != "use_actual" - } - if include_features - else {} + timestamps = _get_timestamps(ts_data) + features_cols = ( + _extract_features_cols(metadata, ts_data.length) if include_features else {} ) if not aggregate_phases and per_phase_function is not None: - phase_power_pairs: list[tuple[Phase, Quantity]] = per_phase_function( - component, ts_data, metadata - ) - for phase, power_data in phase_power_pairs: + for phase, power_data in per_phase_function(component, ts_data, metadata): dfs.append( - pd.DataFrame( - { - "timestamp": ts_data.timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "phase": [phase] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } + _build_power_row_df( + timestamps, + var, + component.uuid, + ts_data.length, + power_data, + unit_conversion, + features_cols, + phase=phase, ) ) else: power_data = power_function(component, ts_data, metadata) dfs.append( - pd.DataFrame( - { - "timestamp": ts_data.timestamps, - "name": [var] * ts_data.length, - "component_uuid": [component.uuid] * ts_data.length, - "value": ( - power_data.to(unit_conversion[var]).magnitude - if var in unit_conversion - else power_data - ), - "units": [ - unit_conversion[var] - if var in unit_conversion - else power_data.units - ] - * ts_data.length, - **features_cols, - } + _build_power_row_df( + timestamps, + var, + component.uuid, + ts_data.length, + power_data, + unit_conversion, + features_cols, ) ) @@ -712,37 +581,25 @@ def get_combined_load_time_series_df( include_features: bool If True, columns for each entry in ``metadata.features`` (excluding ``use_actual``) are added to the output DataFrame. Defaults to False. + Returns ------- pd.DataFrame """ - if time_series_type.__name__ == "SingleTimeSeries": - return _get_combined_single_time_series_df( - sys=sys, - component_type=DistributionLoad, - var_of_interest=var_of_interest, - power_function=_get_load_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_load_power_per_phase, - include_features=include_features, - ) - elif time_series_type.__name__ == "NonSequentialTimeSeries": - return _get_combined_nonsequential_time_series_df( - sys=sys, - component_type=DistributionLoad, - var_of_interest=var_of_interest, - power_function=_get_load_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_load_power_per_phase, - include_features=include_features, - ) - else: + if time_series_type.__name__ not in {"SingleTimeSeries", "NonSequentialTimeSeries"}: msg = f"get_combined_load_time_series_df not implemented for {time_series_type.__name__}" raise IncompatibleTimeSeries(msg) + return _get_combined_time_series_df( + sys=sys, + component_type=DistributionLoad, + var_of_interest=var_of_interest, + power_function=_get_load_power, + unit_conversion=unit_conversion, + time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_load_power_per_phase, + include_features=include_features, + ) def get_combined_solar_time_series_df( @@ -773,47 +630,23 @@ def get_combined_solar_time_series_df( include_features: bool If True, columns for each entry in ``metadata.features`` (excluding ``use_actual``) are added to the output DataFrame. Defaults to False. + Returns ------- pd.DataFrame """ - if time_series_type.__name__ == "SingleTimeSeries": - solar_df = _get_combined_single_time_series_df( - sys=sys, - component_type=DistributionSolar, - var_of_interest=var_of_interest, - power_function=_get_solar_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_solar_power_per_phase, - include_features=include_features, - ) - return solar_df.replace("irradiance", "active_power") - elif time_series_type.__name__ == "NonSequentialTimeSeries": - solar_df = _get_combined_nonsequential_time_series_df( - sys=sys, - component_type=DistributionSolar, - var_of_interest=var_of_interest, - power_function=_get_solar_power, - unit_conversion=unit_conversion, - time_series_type=time_series_type, - aggregate_phases=aggregate_phases, - per_phase_function=_get_solar_power_per_phase, - include_features=include_features, - ) - return solar_df.replace("irradiance", "active_power") - else: - msg = f"get_combined_load_time_series_df not implemented for {time_series_type.__name__}" + if time_series_type.__name__ not in {"SingleTimeSeries", "NonSequentialTimeSeries"}: + msg = f"get_combined_solar_time_series_df not implemented for {time_series_type.__name__}" raise IncompatibleTimeSeries(msg) - - -# Backward-compatible aliases for legacy API names. -get_timeseries_actual_data = get_time_series_actual_data -_check_for_timeseries_metadata_consistency = _check_for_time_series_metadata_consistency -_check_for_timeseries_consistency = _check_for_time_series_consistency -get_aggregated_solar_timeseries = get_aggregated_solar_time_series -get_aggregated_battery_timeseries = get_aggregated_battery_time_series -get_aggregated_load_timeseries = get_aggregated_load_time_series -get_combined_load_timeseries_df = get_combined_load_time_series_df -get_combined_solar_timeseries_df = get_combined_solar_time_series_df + solar_df = _get_combined_time_series_df( + sys=sys, + component_type=DistributionSolar, + var_of_interest=var_of_interest, + power_function=_get_solar_power, + unit_conversion=unit_conversion, + time_series_type=time_series_type, + aggregate_phases=aggregate_phases, + per_phase_function=_get_solar_power_per_phase, + include_features=include_features, + ) + return solar_df.replace("irradiance", "active_power") From 6ed71961c55fb118e4d8edcb56e0cdad9a226ef9 Mon Sep 17 00:00:00 2001 From: "Zink, Zephyr" Date: Tue, 19 May 2026 13:55:25 -0600 Subject: [PATCH 3/3] comments --- src/gdm/distribution/sys_functools.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/gdm/distribution/sys_functools.py b/src/gdm/distribution/sys_functools.py index 54453474..e4ee4fe0 100644 --- a/src/gdm/distribution/sys_functools.py +++ b/src/gdm/distribution/sys_functools.py @@ -136,7 +136,8 @@ def _get_load_power_per_phase( denormalized_data = get_time_series_actual_data(ts_data) if user_attr.use_actual: - return [(phase, denormalized_data) for phase in load.phases] + n_phases = len(load.phases) + return [(phase, denormalized_data / n_phases) for phase in load.phases] if metadata.name in {"active_power", "reactive_power"}: return [ @@ -537,6 +538,9 @@ def _get_combined_time_series_df( phase=phase, ) ) + elif not aggregate_phases and per_phase_function is None: + msg = "per_phase_function is required when aggregate_phases is False." + raise ValueError(msg) else: power_data = power_function(component, ts_data, metadata) dfs.append(