From 8c82c2eb7c9481500773fd4f941aa4b1210370d3 Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Fri, 31 Oct 2025 13:42:57 -0400 Subject: [PATCH 01/11] First commit addressing a few todos in the code. This includes a fairly sizeable refactor of the various transformations in the dataset file and adding a number of tests. --- .../attacks/ensemble/clavaddpm_fine_tuning.py | 13 +- src/midst_toolkit/models/clavaddpm/dataset.py | 845 ++++-------------- .../clavaddpm/dataset_transformations.py | 290 ++++++ .../models/clavaddpm/dataset_utils.py | 171 ++++ src/midst_toolkit/models/clavaddpm/metrics.py | 139 +++ src/midst_toolkit/models/clavaddpm/train.py | 11 +- tests/unit/models/clavaddpm/test_dataset.py | 149 ++- .../clavaddpm/test_dataset_transformations.py | 174 ++++ .../models/clavaddpm/test_dataset_utils.py | 93 ++ 9 files changed, 1113 insertions(+), 772 deletions(-) create mode 100644 src/midst_toolkit/models/clavaddpm/dataset_transformations.py create mode 100644 src/midst_toolkit/models/clavaddpm/dataset_utils.py create mode 100644 src/midst_toolkit/models/clavaddpm/metrics.py create mode 100644 tests/unit/models/clavaddpm/test_dataset_transformations.py create mode 100644 tests/unit/models/clavaddpm/test_dataset_utils.py diff --git a/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py b/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py index 552a65be..a41ca4cd 100644 --- a/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py +++ b/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py @@ -17,10 +17,7 @@ from midst_toolkit.common.logger import KeyValueLogger, log from midst_toolkit.common.variables import DEVICE from midst_toolkit.models.clavaddpm.data_loaders import prepare_fast_dataloader -from midst_toolkit.models.clavaddpm.dataset import ( - Transformations, - make_dataset_from_df, -) +from midst_toolkit.models.clavaddpm.dataset import Dataset, Transformations from midst_toolkit.models.clavaddpm.enumerations import ( CategoricalEncoding, Configs, @@ -84,11 +81,11 @@ def fine_tune_model( - dataset: The dataset. - column_orders: The column orders. """ - dataset, label_encoders, column_orders = make_dataset_from_df( + dataset, label_encoders, column_orders = Dataset.make_dataset_from_df( fine_tuning_data, transformations, is_target_conditioned=model_params.is_target_conditioned, - data_split_ratios=data_split_ratios, + data_split_percentages=data_split_ratios, info=fine_tuning_data_info, noise_scale=0, ) @@ -173,11 +170,11 @@ def fine_tune_classifier( Returns: The fine-tuned classifier model. """ - dataset, label_encoders, column_orders = make_dataset_from_df( + dataset, _, _ = Dataset.make_dataset_from_df( fine_tuning_data, transformations, is_target_conditioned=model_params.is_target_conditioned, - data_split_ratios=data_split_ratios, + data_split_percentages=data_split_ratios, info=fine_tuning_data_info, noise_scale=0, ) diff --git a/src/midst_toolkit/models/clavaddpm/dataset.py b/src/midst_toolkit/models/clavaddpm/dataset.py index 301d77e9..9f365720 100644 --- a/src/midst_toolkit/models/clavaddpm/dataset.py +++ b/src/midst_toolkit/models/clavaddpm/dataset.py @@ -1,35 +1,41 @@ """Defines the dataset functions for the ClavaDDPM model.""" +from __future__ import annotations + import hashlib import json -import pickle -from collections import Counter from copy import deepcopy from dataclasses import astuple, dataclass, replace from logging import INFO from pathlib import Path -from typing import Any, Self, cast +from typing import Any, Self import numpy as np import pandas as pd -import torch -from category_encoders import LeaveOneOutEncoder -from scipy.special import expit, softmax -from sklearn.impute import SimpleImputer -from sklearn.metrics import classification_report, mean_squared_error, r2_score, roc_auc_score from sklearn.model_selection import train_test_split -from sklearn.pipeline import make_pipeline from sklearn.preprocessing import ( LabelEncoder, - MinMaxScaler, OneHotEncoder, - OrdinalEncoder, - QuantileTransformer, StandardScaler, ) from midst_toolkit.common.enumerations import DataSplit, PredictionType, TaskType from midst_toolkit.common.logger import log +from midst_toolkit.models.clavaddpm.dataset_transformations import ( + collapse_rare_categories, + drop_rows_according_to_mask, + encode_categorical_features, + normalize, + process_nans_in_categorical_features, + transform_targets, +) +from midst_toolkit.models.clavaddpm.dataset_utils import ( + dump_pickle, + encode_and_merge_features, + get_categorical_and_numerical_column_names, + get_category_sizes, + load_pickle, +) from midst_toolkit.models.clavaddpm.enumerations import ( ArrayDict, CategoricalEncoding, @@ -39,11 +45,7 @@ NumericalNaNPolicy, TargetPolicy, ) - - -# Wildcard value to which all rare categorical variables are mapped -CAT_RARE_VALUE = "_rare_" -CAT_MISSING_VALUE = "_nan_" +from midst_toolkit.models.clavaddpm.metrics import calculate_metrics @dataclass(frozen=True) @@ -57,7 +59,7 @@ class Transformations: target_policy: TargetPolicy | None = TargetPolicy.DEFAULT @classmethod - def default(cls) -> Self: + def default(cls) -> Transformations: """Return the default transformations.""" return cls(seed=0, normalization=Normalization.QUANTILE, target_policy=TargetPolicy.DEFAULT) @@ -112,8 +114,12 @@ def _load_datasets(cls, directory: Path, dataset_name: str) -> ArrayDict: The loaded datasets with all the splits. """ splits = [k.value for k in list(DataSplit) if directory.joinpath(f"y_{k.value}.npy").exists()] - # TODO: figure out if there is a way of getting rid of the cast - return {x: cast(np.ndarray, np.load(directory / f"{dataset_name}_{x}.npy", allow_pickle=True)) for x in splits} + datasets: ArrayDict = {} + for split in splits: + dataset = np.load(directory / f"{dataset_name}_{split}.npy", allow_pickle=True) + assert isinstance(dataset, np.ndarray), "Dataset must be of type Numpy Array" + datasets[split] = dataset + return datasets @property def is_binclass(self) -> bool: @@ -251,394 +257,173 @@ def calculate_metrics( part_metrics["score"] = score_sign * part_metrics[score_key] return metrics + @staticmethod + def make_dataset_from_df( + data: pd.DataFrame, + transformations: Transformations, + is_target_conditioned: IsTargetConditioned, + info: dict[str, Any], + data_split_percentages: list[float] | None = None, + noise_scale: float = 0, + data_split_random_state: int = 42, + ) -> tuple[Dataset, dict[int, LabelEncoder], list[str]]: + """ + Generate a dataset from a pandas DataFrame. -# TODO consider moving all the functions below into the Dataset class -def get_category_sizes(features: torch.Tensor | np.ndarray) -> list[int]: - """ - Get the size of the categories in the data by counting the number of - unique values in each column. + NOTE: For now, n_classes (which is part of the info dictionary) has to be set to 0. This is because our + matrix is the concatenation of (x_num, x_cat). In this case, if we have + is_y_cond == IsTargetConditioned.CONCAT, we can guarantee that y is the first column of the matrix. However, + if we have n_classes > 0, then y is not the first column of the matrix. - Args: - features: The data from which to extract category sizes. + Args: + data: The pandas DataFrame from which to generate the dataset. + transformations: The transformations to apply to the dataset AFTER creation. + is_target_conditioned: The condition on the y column. + IsTargetConditioned.CONCAT: y is concatenated to X, the model learns a joint distribution of (y, X) + IsTargetConditioned.EMBEDDING: y is not concatenated to X. During computations, y is embedded + and added to the latent vector of X + IsTargetConditioned.NONE: y column is completely ignored + + How does is_target_conditioned affect the generation of y? + IsTargetConditioned.CONCAT: the model synthesizes (y, X) directly, so y is just the first column + IsTargetConditioned.EMBEDDING: y is first sampled using empirical distribution of y. The model only + synthesizes X. When returning the generated data, we return the generated X and the sampled y. + (y is sampled from empirical distribution, instead of being generated by the model). Note that in + this way, y is still not independent of X, because the model has been adding the embedding of y + to the latent vector of X during computations. + IsTargetConditioned.NONE: y is synthesized using y's empirical distribution. X is generated by the + model. In this case, y is completely independent of X. + + info: A dictionary with metadata about the DataFrame. + data_split_percentages: The percentages of the dataset to go into train, val, and test splits. The sum of + the percentages must amount to 1 (within a tolerance of 0.01). Optional, default is [0.7, 0.2, 0.1]. + noise_scale: The scale of the noise to add to the categorical features. Optional, default is 0. + data_split_random_state: The random state to use for the data split. Will be passed down to the + ``train_test_split`` function from sklearn. Optional, default is 42. - Returns: - A list with the category sizes in the data. - """ - x_t = features.T.cpu().tolist() if isinstance(features, torch.Tensor) else features.T.tolist() - return [len(set(xt)) for xt in x_t] + Returns: + A tuple with: + - The dataset object containing the created dataset, + - The label encoders for the categorical columns as a dictionary mapping column INDEX within the + categorical columns to a label encoder for that column. + - The column names, with numerical columns first, then categorical columns. Within these two categories, + column names are in the order they appear in the dataset. + """ + if data_split_percentages is None: + data_split_percentages = [0.7, 0.2, 0.1] + assert len(data_split_percentages) == 3, "The ratios must be a list of 3 values (train, validation, test)." + assert np.isclose(sum(data_split_percentages), 1, atol=0.01), ( + "The sum of the ratios must amount to 1 (with a tolerance of 0.01)." + ) -def calculate_metrics( - y_true: np.ndarray, - y_pred: np.ndarray, - task_type: TaskType, - prediction_type: PredictionType | None, - y_info: dict[str, Any], -) -> dict[str, Any]: - """ - Calculate the metrics of the predictions. + train_percent, validation_percent, test_percent = data_split_percentages + train_val_data, test_data = train_test_split( + data, + test_size=test_percent, + random_state=data_split_random_state, + ) + train_data, val_data = train_test_split( + train_val_data, + test_size=validation_percent / (train_percent + validation_percent), + random_state=data_split_random_state, + ) - Usage: calculate_metrics(y_true, y_pred, TaskType.BINCLASS, PredictionType.LOGITS, {}) + categorical_column_names, numerical_column_names = get_categorical_and_numerical_column_names( + info, + is_target_conditioned, + ) - Args: - y_true: The true labels as a numpy array. - y_pred: The predicted labels as a numpy array. - task_type: The type of the task. - prediction_type: The type of the predictions. - y_info: A dictionary with metadata about the labels. + if len(categorical_column_names) > 0: + categorical_features = { + DataSplit.TRAIN.value: train_data[categorical_column_names].to_numpy(dtype=np.str_), + DataSplit.VALIDATION.value: val_data[categorical_column_names].to_numpy(dtype=np.str_), + DataSplit.TEST.value: test_data[categorical_column_names].to_numpy(dtype=np.str_), + } + else: + categorical_features = None - Returns: - The metrics of the predictions as a dictionary with the following keys: - If the task type is TaskType.REGRESSION: - { - "rmse": The root mean squared error. - "r2": The R^2 score. - } + if len(numerical_column_names) > 0: + numerical_features = { + DataSplit.TRAIN.value: train_data[numerical_column_names].values.astype(np.float32), + DataSplit.VALIDATION.value: val_data[numerical_column_names].values.astype(np.float32), + DataSplit.TEST.value: test_data[numerical_column_names].values.astype(np.float32), + } + else: + numerical_features = None - If the task type is TaskType.MULTICLASS, it will have a key for each label - with the following metrics (result of sklearn.metrics.classification_report): - { - "label-1": { - "precision": The precision of the label. - "recall": The recall of the label. - "f1-score": The F1 score of the label. - "support": The number of occurrences of this label in y_true. - }, - "label-2": {...} - ... - } + target = { + DataSplit.TRAIN.value: train_data[info["y_col"]].values.astype(np.float32), + DataSplit.VALIDATION.value: val_data[info["y_col"]].values.astype(np.float32), + DataSplit.TEST.value: test_data[info["y_col"]].values.astype(np.float32), + } - If the task type is TaskType.BINCLASS, it will have a key for each label - with the following metrics ((result of sklearn.metrics.classification_report), - and an additional ROC AUC metric: - { - "label-1": { - "precision": The precision of the label. - "recall": The recall of the label. - "f1-score": The F1 score of the label. - "support": The number of occurrences of this label in y_true. - }, - "label-2": {...} - ... - "roc_auc": The ROC AUC score. - } - """ - if task_type == TaskType.REGRESSION: - assert prediction_type is None - assert "std" in y_info - rmse = calculate_rmse(y_true, y_pred, y_info["std"]) - r2 = r2_score(y_true, y_pred) - result = {"rmse": rmse, "r2": r2} - else: - labels, probs = _get_predicted_labels_and_probs(y_pred, task_type, prediction_type) - # TODO: figure out if there is a way of getting rid of the cast - result = cast(dict[str, Any], classification_report(y_true, labels, output_dict=True)) - if task_type == TaskType.BINCLASS: - result["roc_auc"] = roc_auc_score(y_true, probs) - return result + column_orders = numerical_column_names + categorical_column_names + # Encode the categorical features and merge them with the numerical features + features, label_encoders = encode_and_merge_features( + categorical_features, + numerical_features, + noise_scale, + ) -def calculate_rmse(y_true: np.ndarray, y_pred: np.ndarray, std: float | None) -> float: - """ - Calculate the root mean squared error (RMSE) of the predictions. + assert isinstance(info["n_classes"], int) - Args: - y_true: The true labels as a numpy array. - y_pred: The predicted labels as a numpy array. - std: The standard deviation of the labels. If None, the RMSE is calculated - without the standard deviation. + dataset = Dataset( + features, + None, + target, + y_info={}, + task_type=TaskType(info["task_type"]), + n_classes=info["n_classes"], + ) - Returns: - The RMSE of the predictions. - """ - rmse = mean_squared_error(y_true, y_pred) ** 0.5 - if std is not None: - rmse *= std - return rmse + return transform_dataset(dataset, transformations, None), label_encoders, column_orders -def _get_predicted_labels_and_probs( - y_pred: np.ndarray, task_type: TaskType, prediction_type: PredictionType | None -) -> tuple[np.ndarray, np.ndarray | None]: +def setup_cache_path(transformations: Transformations, cache_dir: Path | None) -> Path | None: """ - Get the labels and probabilities from the predictions. - If prediction_type is None, will return the predicted labels as is - and the probabilities as None. + Setup the cache path for the transformations and transformed dataset. This will be used to check if a cache for + the specified transformations already exists. If they don't already exist, this is where they will be saved. Args: - y_pred: The predicted labels as a numpy array. - task_type: The type of the task. Can be TaskType.BINCLASS or TaskType.MULTICLASS. - Other task types are not supported. - prediction_type: The type of the predictions. If None, will return the predictions as labels - and probabilities as None. + transformations: Set of transformations to be cached. + cache_dir: Directory to look for the cached transformations and datasets pickle. This will be used as the + stub and the path will be determined by the specified transformations Returns: - A tuple with the labels and probabilities. The probabilities are None - if the prediction_type is None. + _description_ """ - assert task_type in (TaskType.BINCLASS, TaskType.MULTICLASS), f"Unsupported task type: {task_type.value}" - - if prediction_type is None: - return y_pred, None - - if prediction_type == PredictionType.LOGITS: - probs = expit(y_pred) if task_type == TaskType.BINCLASS else softmax(y_pred, axis=1) - elif prediction_type == PredictionType.PROBS: - probs = y_pred - else: - raise ValueError(f"Unsupported prediction_type: {prediction_type.value}") - - assert probs is not None - labels = np.round(probs) if task_type == TaskType.BINCLASS else probs.argmax(axis=1) - return labels.astype("int64"), probs + if cache_dir is None: + log(INFO, "No cache_dir provided. Will not attempt to load or save transformed dataset from/to cache") + return None + transformations_md5 = hashlib.md5(str(transformations).encode("utf-8")).hexdigest() + transformations_str = "__".join(map(str, astuple(transformations))) + return cache_dir / f"cache__{transformations_str}__{transformations_md5}.pickle" -def make_dataset_from_df( - data: pd.DataFrame, - transformations: Transformations, - is_target_conditioned: IsTargetConditioned, - info: dict[str, Any], - data_split_ratios: list[float] | None = None, - noise_scale: float = 0, - data_split_random_state: int = 42, -) -> tuple[Dataset, dict[int, LabelEncoder], list[str]]: +def get_cached_dataset(cache_path: Path, transformations: Transformations) -> Dataset: """ - Generate a dataset from a pandas DataFrame. - - The order of the generated dataset: (y, x_num, x_cat). - - Note: For now, n_classes has to be set to 0. This is because our matrix is the concatenation - of (x_num, x_cat). In this case, if we have is_y_cond == 'concat', we can guarantee that y - is the first column of the matrix. - However, if we have n_classes > 0, then y is not the first column of the matrix. + Provided a ``cache_path`` that exists, we load the contents of the pickle, which should be a tuple of + Transformations followed by a transformed dataset object. We check if the cached transformations match the + specified transformations. If they don't, then our cache and the transformations requested are misaligned and we + throw an error. Args: - data: The pandas DataFrame to generate the dataset from. - transformations: The transformations to apply to the dataset. - is_target_conditioned: The condition on the y column. - IsTargetConditioned.CONCAT: y is concatenated to X, the model learns a joint distribution of (y, X) - IsTargetConditioned.EMBEDDING: y is not concatenated to X. During computations, y is embedded - and added to the latent vector of X - IsTargetConditioned.NONE: y column is completely ignored - - How does is_target_conditioned affect the generation of y? - is_target_conditioned: - IsTargetConditioned.CONCAT: the model synthesizes (y, X) directly, so y is just the first column - IsTargetConditioned.EMBEDDING: y is first sampled using empirical distribution of y. The model only - synthesizes X. When returning the generated data, we return the generated X - and the sampled y. (y is sampled from empirical distribution, instead of being - generated by the model) - Note that in this way, y is still not independent of X, because the model has been - adding the embedding of y to the latent vector of X during computations. - IsTargetConditioned.NONE: - y is synthesized using y's empirical distribution. X is generated by the model. - In this case, y is completely independent of X. - - info: A dictionary with metadata about the DataFrame. - data_split_ratios: The ratios of the dataset to split into train, val, and test. The sum of - the ratios must amount to 1 (with a tolerance of 0.01). Optional, default is [0.7, 0.2, 0.1]. - noise_scale: The scale of the noise to add to the categorical features. Optional, default is 0. - data_split_random_state: The random state to use for the data split. Will be passed down to the - train_test_split function from sklearn. Optional, default is 42. + cache_path: A Path that has already been verified to exist. + transformations: A set of desired transformations to have been applied to the cached dataset. - Returns: - A tuple with the dataset, the label encoders, and the column names in the order they appear in the dataset. - """ - if data_split_ratios is None: - data_split_ratios = [0.7, 0.2, 0.1] - - assert len(data_split_ratios) == 3, "The ratios must be a list of 3 values (train, validation, test)." - assert np.isclose(sum(data_split_ratios), 1, atol=0.01), ( - "The sum of the ratios must amount to 1 (with a tolerance of 0.01)." - ) - - train_val_data, test_data = train_test_split( - data, - test_size=data_split_ratios[2], - random_state=data_split_random_state, - ) - train_data, val_data = train_test_split( - train_val_data, - test_size=data_split_ratios[1] / (data_split_ratios[0] + data_split_ratios[1]), - random_state=data_split_random_state, - ) - - categorical_column_names, numerical_column_names = _get_categorical_and_numerical_column_names( - info, - is_target_conditioned, - ) - - if len(categorical_column_names) > 0: - categorical_features = { - DataSplit.TRAIN.value: train_data[categorical_column_names].to_numpy(dtype=np.str_), - DataSplit.VALIDATION.value: val_data[categorical_column_names].to_numpy(dtype=np.str_), - DataSplit.TEST.value: test_data[categorical_column_names].to_numpy(dtype=np.str_), - } - else: - categorical_features = None - - if len(numerical_column_names) > 0: - numerical_features = { - DataSplit.TRAIN.value: train_data[numerical_column_names].values.astype(np.float32), - DataSplit.VALIDATION.value: val_data[numerical_column_names].values.astype(np.float32), - DataSplit.TEST.value: test_data[numerical_column_names].values.astype(np.float32), - } - else: - numerical_features = None - - target = { - DataSplit.TRAIN.value: train_data[info["y_col"]].values.astype(np.float32), - DataSplit.VALIDATION.value: val_data[info["y_col"]].values.astype(np.float32), - DataSplit.TEST.value: test_data[info["y_col"]].values.astype(np.float32), - } - - # build the column_orders list - # It's a string list with the names numerical columns followed by the names of - # the categorical columns in order they appear in the dataset that will be returned - index_to_column = list(data.columns) - column_to_index = {col: i for i, col in enumerate(index_to_column)} - categorical_column_orders = [column_to_index[col] for col in categorical_column_names] - numerical_column_orders = [column_to_index[col] for col in numerical_column_names] - - column_orders_indices = numerical_column_orders + categorical_column_orders - column_orders = [index_to_column[index] for index in column_orders_indices] - - # Encode the categorical features and merge them with the numerical features - numerical_features, label_encoders = _encode_and_merge_features( - categorical_features, - numerical_features, - noise_scale, - ) - - assert isinstance(info["n_classes"], int) - - dataset = Dataset( - numerical_features, - None, - target, - y_info={}, - task_type=TaskType(info["task_type"]), - n_classes=info["n_classes"], - ) - - return transform_dataset(dataset, transformations, None), label_encoders, column_orders - - -def _get_categorical_and_numerical_column_names( - info: dict[str, Any], - is_target_conditioned: IsTargetConditioned, -) -> tuple[list[str], list[str]]: - """ - Get the categorical and numerical column names from the info dictionary. - - Args: - info: The info dictionary. - is_target_conditioned: The condition on the y column. - """ - numerical_columns: list[str] = [] - categorical_columns: list[str] = [] - - if info["n_classes"] > 0: - if info["cat_cols"] is not None: - categorical_columns += info["cat_cols"] - if is_target_conditioned == IsTargetConditioned.CONCAT: - categorical_columns += [info["y_col"]] - - numerical_columns = info["num_cols"] - - else: - if info["num_cols"] is not None: - numerical_columns += info["num_cols"] - if is_target_conditioned == IsTargetConditioned.CONCAT: - numerical_columns += [info["y_col"]] - - categorical_columns = info["cat_cols"] - - return categorical_columns, numerical_columns - - -def _encode_and_merge_features( - categorical_features: ArrayDict | None, - numerical_features: ArrayDict | None, - noise_scale: float, -) -> tuple[ArrayDict, dict[int, LabelEncoder]]: - """ - Merge the categorical with the numerical features for train, validation, and test datasets. - - The categorical features are encoded and then merged with the numerical features. The - label encoders used to do that are also returned. - - If ``noise_scale`` is greater than 0, noise from a normal distribution with a standard - deviation of ``noise_scale`` is added to the categorical features. - - Args: - categorical_features: A dictionary with the categorical features data for train, - validation, and test datasets. - numerical_features: A dictionary with the numerical features data for train, - validation, and test datasets. - noise_scale: The scale of the noise to add to the categorical features. + Raises: + RuntimeError: Throws if the set of transformations desired does not match those in the cache. Returns: - The merged features for train, validation, and test datasets and the label encoders - used to do so. + A cached dataset with the transformations requested already applied. """ - if categorical_features is None: - # if no categorical features, just return the numerical features - assert numerical_features is not None - return numerical_features, {} - - # Otherwise, encode the categorical features - all_categorical_data = np.vstack( - ( - categorical_features[DataSplit.TRAIN.value], - categorical_features[DataSplit.VALIDATION.value], - categorical_features[DataSplit.TEST.value], - ) - ) - - categorical_data_converted = [] - label_encoders = {} - for column in range(all_categorical_data.shape[1]): - label_encoder = LabelEncoder() - encoded_labels = label_encoder.fit_transform(all_categorical_data[:, column]).astype(float) - categorical_data_converted.append(encoded_labels) - if noise_scale > 0: - # add noise - categorical_data_converted[-1] += np.random.normal(0, noise_scale, categorical_data_converted[-1].shape) - label_encoders[column] = label_encoder - - categorical_data_transposed = np.vstack(categorical_data_converted).T - - num_train_samples = categorical_features[DataSplit.TRAIN.value].shape[0] - num_validation_samples = categorical_features[DataSplit.VALIDATION.value].shape[0] - - categorical_features[DataSplit.TRAIN.value] = categorical_data_transposed[:num_train_samples, :] - categorical_features[DataSplit.VALIDATION.value] = categorical_data_transposed[ - num_train_samples : num_train_samples + num_validation_samples, : - ] - categorical_features[DataSplit.TEST.value] = categorical_data_transposed[ - num_train_samples + num_validation_samples :, : - ] - - if numerical_features is None: - # if no numerical features then no need to merge, just return the categorical features - return categorical_features, label_encoders - - # Otherwise, merge the categorical and numerical features - merged_features = { - DataSplit.TRAIN.value: np.concatenate( - (numerical_features[DataSplit.TRAIN.value], categorical_features[DataSplit.TRAIN.value]), axis=1 - ), - DataSplit.VALIDATION.value: np.concatenate( - (numerical_features[DataSplit.VALIDATION.value], categorical_features[DataSplit.VALIDATION.value]), - axis=1, - ), - DataSplit.TEST.value: np.concatenate( - (numerical_features[DataSplit.TEST.value], categorical_features[DataSplit.TEST.value]), axis=1 - ), - } - - return merged_features, label_encoders + cache_transformations, transformed_dataset = load_pickle(cache_path) + if transformations == cache_transformations: + log(INFO, f"Using cached features: {cache_path}") + return transformed_dataset + raise RuntimeError(f"Hash collision for {cache_path}") def transform_dataset( @@ -647,33 +432,24 @@ def transform_dataset( cache_dir: Path | None, ) -> Dataset: """ - Transform the dataset. + Fits and applies the given set of transformations to the contents of the provided dataset and returns the + transformed dataset. If an appropriate cache is specified and exists, this function simply loads an already + transformed dataset from the cache. If a cache does not exist, this function will cache the dataset and + transformations there in addition to returning the transformed dataset. Args: dataset: The dataset to transform. transformations: The transformations to apply to the dataset. - cache_dir: The directory to cache the transformed dataset. - Optional, default is None. If not None, will check if the transformations exist in the cache directory. - If they do, will returned the cached transformed dataset. If not, will transform the dataset and cache it. + cache_dir: The directory to cache the transformed dataset. Optional, default is None. If not None, will check + if the transformations and dataset exist in the cache directory. If they do, will returned the cached + transformed dataset. If not, will transform the dataset and cache it. Returns: The transformed dataset. """ - # WARNING: the order of transformations matters. Moreover, the current - # implementation is not ideal in that sense. - cache_path = None - if cache_dir is not None: - # if cache_dir is not None, will save the cache file path into the cache_path variable - # so the transformations can be saved in the cache dir - transformations_md5 = hashlib.md5(str(transformations).encode("utf-8")).hexdigest() - transformations_str = "__".join(map(str, astuple(transformations))) - cache_path = cache_dir / f"cache__{transformations_str}__{transformations_md5}.pickle" - if cache_path.exists(): - cache_transformations, value = load_pickle(cache_path) - if transformations == cache_transformations: - print(f"Using cached features: {cache_dir.name + '/' + cache_path.name}") - return value - raise RuntimeError(f"Hash collision for {cache_path}") + cache_path = setup_cache_path(transformations, cache_dir) + if cache_path is not None and cache_path.exists(): + return get_cached_dataset(cache_path, transformations) if dataset.x_num is not None: dataset = process_nans_in_numerical_features(dataset, transformations.numerical_nan_policy) @@ -690,10 +466,7 @@ def transform_dataset( transformations.seed, ) - if categorical_features is None: - assert transformations.categorical_nan_policy is None - assert transformations.category_minimum_frequency is None - else: + if categorical_features is not None: categorical_features = process_nans_in_categorical_features( categorical_features, transformations.categorical_nan_policy, @@ -720,7 +493,7 @@ def transform_dataset( } categorical_features = None - target, target_info = build_target(dataset.y, transformations.target_policy, dataset.task_type) + target, target_info = transform_targets(dataset.y, transformations.target_policy, dataset.task_type) dataset = replace(dataset, x_num=numerical_features, x_cat=categorical_features, y=target, y_info=target_info) dataset.numerical_transform = numerical_transform @@ -732,68 +505,6 @@ def transform_dataset( return dataset -def load_pickle(path: Path | str, **kwargs: Any) -> Any: - """ - Load a pickle file. - - Args: - path: The path to the pickle file. - **kwargs: Additional arguments to pass to the pickle.loads function. - - Returns: - The loaded pickle file. - """ - return pickle.loads(Path(path).read_bytes(), **kwargs) - - -def dump_pickle(x: Any, path: Path | str, **kwargs: Any) -> None: - """ - Dump an object into a pickle file. - - Args: - x: The object to dump. - path: The path to the pickle file. - **kwargs: Additional arguments to pass to the pickle.dumps function. - """ - Path(path).write_bytes(pickle.dumps(x, **kwargs)) - - -# Inspired by: https://github.com/yandex-research/rtdl/blob/a4c93a32b334ef55d2a0559a4407c8306ffeeaee/lib/data.py#L20 -def normalize( - x: ArrayDict, - normalization: Normalization, - seed: int | None, -) -> tuple[ArrayDict, StandardScaler | MinMaxScaler | QuantileTransformer]: - """ - Normalize the input data. - - Args: - x: The data to normalize. - normalization: The normalization to use. - seed: The seed to use for the random state. Optional, default is None. - - Returns: - The normalized data and the normalizer. - """ - x_train = x[DataSplit.TRAIN.value] - if normalization == Normalization.STANDARD: - normalizer = StandardScaler() - elif normalization == Normalization.MINMAX: - normalizer = MinMaxScaler() - elif normalization == Normalization.QUANTILE: - normalizer = QuantileTransformer( - output_distribution="normal", - n_quantiles=max(min(x[DataSplit.TRAIN.value].shape[0] // 30, 1000), 10), - subsample=int(1e9), - random_state=seed, - ) - else: - raise ValueError(f"Unsupported normalization: {normalization.value}") - normalizer.fit(x_train) - - return {k: normalizer.transform(v) for k, v in x.items()}, normalizer - - def process_nans_in_numerical_features(dataset: Dataset, policy: NumericalNaNPolicy | None) -> Dataset: """ Process the NaN values in the numerical features of the dataset. Note that the signature here is different from @@ -850,211 +561,3 @@ def process_nans_in_numerical_features(dataset: Dataset, policy: NumericalNaNPol raise ValueError(f"Unsupported policy: {policy.value}") return dataset - - -def drop_rows_according_to_mask(data_split: ArrayDict, valid_masks: dict[str, np.ndarray]) -> ArrayDict: - """ - Provided a dictionary of keys to numpy arrays, this function drops rows in each numpy array in the dictionary - according to the values in `valid_masks`. The keys of `valid_masks` must match the entries in data. - - Args: - data_split: The data to apply the mask to. - valid_masks: Mapping from datasplit key to 1D boolean array with entries corresponding to rows of an array. - An entry of True indicates that the row should be kept. False implies it should be dropped. - - Returns: - The data with the mask applied, dropping rows corresponding to False entries of the mask. - """ - if set(data_split.keys()) != set(valid_masks.keys()): - raise KeyError("Keys of data do not match the provided valid_masks") - # Dropping rows in each array that have a False entry in valid_masks - filtered_data_split: ArrayDict = {} - for split_name, data in data_split.items(): - row_mask = valid_masks[split_name] - if row_mask.ndim != 1 or row_mask.shape[0] != data.shape[0]: - raise ValueError(f"Mask for split '{split_name}' has shape {row_mask.shape}; expected ({data.shape[0]},)") - filtered_data_split[split_name] = data[row_mask] - return filtered_data_split - - -def process_nans_in_categorical_features(data_splits: ArrayDict, policy: CategoricalNaNPolicy | None) -> ArrayDict: - """ - Process the NaN values in the categorical features of the datasets provided. Supports only string or float arrays. - - Args: - data_splits: A dictionary containing data to process, split into different partitions. One of which must - be keys with DataSplit.TRAIN.value. - policy: The policy to use to process the NaN values. If none, will no-op. - - Returns: - The processed data. - """ - if policy is None: - log(INFO, "No NaN processing policy specified.") - return data_splits - - assert len(data_splits) > 0, "data_splits is empty, processing will fail." - - # Determine whether the arrays are float or string typed. We assume all arrays in data_splits have the same type - train_data_split = data_splits[DataSplit.TRAIN.value] - is_float_array = np.issubdtype(train_data_split.dtype, np.floating) - # Value that we're looking for to replace - missing_values = float("nan") if is_float_array else CAT_MISSING_VALUE - - # If there are any NaN values, try to apply a the policy. - nan_values = [ - np.isnan(data).any() if is_float_array else (data == CAT_MISSING_VALUE).any() for data in data_splits.values() - ] - if any(nan_values): - if policy == CategoricalNaNPolicy.MOST_FREQUENT: - imputer = SimpleImputer(missing_values=missing_values, strategy=policy.value) - imputer.fit(data_splits[DataSplit.TRAIN.value]) - return {k: imputer.transform(v) for k, v in data_splits.items()} - raise ValueError(f"Unsupported cat_nan_policy: {policy.value}") - - # If no nan values are present. We do nothing. - return data_splits - - -def collapse_rare_categories(data_splits: ArrayDict, min_frequency: float) -> ArrayDict: - """ - Collapses rare categories in each column of the datasets under ``data_splits`` into a single category encoded by - the global variable CAT_RARE_VALUE. Categories considered rare are those not satisfying the ``min_frequency`` - threshold within the training split of ``data_splits``. - - NOTE: Arrays must be of type string - - Args: - data_splits: A dictionary containing data to process, split into different partitions. One of which must - be keys with DataSplit.TRAIN.value. - min_frequency: The minimum frequency threshold of the categories to keep. Has to be between 0 and 1. - - Returns: - The processed data. - """ - assert 0.0 < min_frequency < 1.0, "min_frequency has to be between 0 and 1" - - training_data = data_splits[DataSplit.TRAIN.value] - min_count = max(1, int(np.ceil(len(training_data) * min_frequency))) - # Creating a container to hold each of the edited columns of each data split. During transformation each column - # of the data becomes a list of entries (one for each row). The outer list holds all the columns in order. - new_data_split: dict[str, list[list[str]]] = {key: [] for key in data_splits} - - # Run through each of the columns in the training data - for column_idx in range(training_data.shape[1]): - counter = Counter(training_data[:, column_idx].tolist()) - popular_categories = {k for k, v in counter.items() if v >= min_count} - - for split, data_split in data_splits.items(): - data_split_column: list[str] = data_split[:, column_idx].tolist() - collapsed_categories = [ - (cat if cat in popular_categories else CAT_RARE_VALUE) for cat in data_split_column - ] - new_data_split[split].append(collapsed_categories) - - return {k: np.array(v).T for k, v in new_data_split.items()} - - -def encode_categorical_features( - x: ArrayDict, - encoding: CategoricalEncoding | None, - y_train: np.ndarray | None, - seed: int | None, - return_encoder: bool = False, -) -> tuple[ArrayDict, bool, Any | None]: - """ - Encode the categorical features of the dataset. - - Args: - x: The data to encode. - encoding: The encoding to use. If None, will use CatEncoding.ORDINAL. - y_train: The target values. Optional, default is None. Will only be used for the "counter" encoding. - seed: The seed to use for the random state. Optional, default is None. - return_encoder: Whether to return the encoder. Optional, default is False. - - Returns: - A tuple with the following values: - - The encoded data. - - A boolean value indicating if the data was converted to numerical. - - The encoder, if return_encoder is True. None otherwise. - """ - if encoding != CategoricalEncoding.COUNTER: - y_train = None - - # Step 1. Map strings to 0-based ranges - - if encoding is None or encoding == CategoricalEncoding.ORDINAL: - unknown_value = np.iinfo("int64").max - 3 - oe = OrdinalEncoder( - handle_unknown="use_encoded_value", - unknown_value=unknown_value, - dtype="int64", - ).fit(x[DataSplit.TRAIN.value]) - encoder = make_pipeline(oe) - encoder.fit(x[DataSplit.TRAIN.value]) - x = {k: encoder.transform(v) for k, v in x.items()} - max_values = x[DataSplit.TRAIN.value].max(axis=0) - for part in x: - if part == DataSplit.TRAIN.value: - continue - for column_idx in range(x[part].shape[1]): - x[part][x[part][:, column_idx] == unknown_value, column_idx] = max_values[column_idx] + 1 - if return_encoder: - return x, False, encoder - return x, False, None - - # Step 2. Encode. - - if encoding == CategoricalEncoding.ONE_HOT: - ohe = OneHotEncoder( - handle_unknown="ignore", - sparse=False, - dtype=np.float32, - ) - encoder = make_pipeline(ohe) - encoder.fit(x[DataSplit.TRAIN.value]) - x = {k: encoder.transform(v) for k, v in x.items()} - - elif encoding == CategoricalEncoding.COUNTER: - assert y_train is not None - assert seed is not None - loe = LeaveOneOutEncoder(sigma=0.1, random_state=seed, return_df=False) - encoder.steps.append(("loe", loe)) - encoder.fit(x[DataSplit.TRAIN.value], y_train) - x = {k: encoder.transform(v).astype("float32") for k, v in x.items()} - if not isinstance(x[DataSplit.TRAIN.value], pd.DataFrame): - x = {k: v.value if hasattr(v, "value") else v for k, v in x.items()} - else: - raise ValueError(f"Unsupported encoding: {encoding.value}") - - if return_encoder: - return x, True, encoder - return x, True, None - - -def build_target(y: ArrayDict, policy: TargetPolicy | None, task_type: TaskType) -> tuple[ArrayDict, dict[str, Any]]: - """ - Build the target and return the target values metadata. - - Args: - y: The target values. - policy: The policy to use to build the target. Can be YPolicy.DEFAULT. If none, it will no-op. - task_type: The type of the task. - - Returns: - A tuple with the target values and the target values metadata. - """ - info: dict[str, Any] = {"policy": policy} - if policy is None: - pass - elif policy == TargetPolicy.DEFAULT: - if task_type == TaskType.REGRESSION: - mean = float(y[DataSplit.TRAIN.value].mean()) - std = float(y[DataSplit.TRAIN.value].std()) - y = {k: (v - mean) / std for k, v in y.items()} - info["mean"] = mean - info["std"] = std - else: - raise ValueError(f"Unsupported policy: {policy.value}") - - return y, info diff --git a/src/midst_toolkit/models/clavaddpm/dataset_transformations.py b/src/midst_toolkit/models/clavaddpm/dataset_transformations.py new file mode 100644 index 00000000..635f9810 --- /dev/null +++ b/src/midst_toolkit/models/clavaddpm/dataset_transformations.py @@ -0,0 +1,290 @@ +from collections import Counter +from logging import INFO +from typing import Any + +import numpy as np +from category_encoders import LeaveOneOutEncoder +from sklearn.impute import SimpleImputer +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import ( + MinMaxScaler, + OneHotEncoder, + OrdinalEncoder, + QuantileTransformer, + StandardScaler, +) + +from midst_toolkit.common.enumerations import DataSplit, TaskType +from midst_toolkit.common.logger import log +from midst_toolkit.models.clavaddpm.enumerations import ( + ArrayDict, + CategoricalEncoding, + CategoricalNaNPolicy, + Normalization, + TargetPolicy, +) + + +# Wildcard value to which all rare categorical variables are mapped +CAT_RARE_VALUE = "_rare_" +CAT_MISSING_VALUE = "_nan_" + + +# Inspired by: https://github.com/yandex-research/rtdl/blob/a4c93a32b334ef55d2a0559a4407c8306ffeeaee/lib/data.py#L20 +def normalize( + datasets: ArrayDict, + normalization: Normalization, + seed: int | None, +) -> tuple[ArrayDict, StandardScaler | MinMaxScaler | QuantileTransformer]: + """ + Normalize the input data according to the specified normalization strategy of ``normalization``. Normalization is + fit on the training split of the datasets passed and then applied to all splits. + + Args: + datasets: The data to normalize. + normalization: The normalization to use. + seed: The seed to use for any random state in the normalization strategy. Currently only applicable to + QuantileTransformer. + + Returns: + The normalized data and the fitted normalizer class. + """ + train_split = datasets[DataSplit.TRAIN.value] + + if normalization == Normalization.STANDARD: + normalizer = StandardScaler() + elif normalization == Normalization.MINMAX: + normalizer = MinMaxScaler() + elif normalization == Normalization.QUANTILE: + normalizer = QuantileTransformer( + output_distribution="normal", + n_quantiles=max(min(train_split.shape[0] // 30, 1000), 10), + subsample=int(1e9), + random_state=seed, + ) + else: + raise ValueError(f"Unsupported normalization: {normalization.value}") + + normalizer.fit(train_split) + return {k: normalizer.transform(v) for k, v in datasets.items()}, normalizer + + +def drop_rows_according_to_mask(data_split: ArrayDict, valid_masks: dict[str, np.ndarray]) -> ArrayDict: + """ + Provided a dictionary of keys to numpy arrays, this function drops rows in each numpy array in the dictionary + according to the values in `valid_masks`. The keys of `valid_masks` must match the entries in data. + + Args: + data_split: The data to apply the mask to. + valid_masks: Mapping from datasplit key to 1D boolean array with entries corresponding to rows of an array. + An entry of True indicates that the row should be kept. False implies it should be dropped. + + Returns: + The data with the mask applied, dropping rows corresponding to False entries of the mask. + """ + if set(data_split.keys()) != set(valid_masks.keys()): + raise KeyError("Keys of data do not match the provided valid_masks") + # Dropping rows in each array that have a False entry in valid_masks + filtered_data_split: ArrayDict = {} + for split_name, data in data_split.items(): + row_mask = valid_masks[split_name] + if row_mask.ndim != 1 or row_mask.shape[0] != data.shape[0]: + raise ValueError(f"Mask for split '{split_name}' has shape {row_mask.shape}; expected ({data.shape[0]},)") + filtered_data_split[split_name] = data[row_mask] + return filtered_data_split + + +def process_nans_in_categorical_features(data_splits: ArrayDict, policy: CategoricalNaNPolicy | None) -> ArrayDict: + """ + Process the NaN values in the categorical features of the datasets provided. Supports only string or float arrays. + + Args: + data_splits: A dictionary containing data to process, split into different partitions. One of which must + be keys with DataSplit.TRAIN.value. + policy: The policy to use to process the NaN values. If none, will no-op. + + Returns: + The processed data. + """ + if policy is None: + log(INFO, "No NaN processing policy specified.") + return data_splits + + assert len(data_splits) > 0, "data_splits is empty, processing will fail." + + # Determine whether the arrays are float or string typed. We assume all arrays in data_splits have the same type + train_data_split = data_splits[DataSplit.TRAIN.value] + is_float_array = np.issubdtype(train_data_split.dtype, np.floating) + # Value that we're looking for to replace + missing_values = float("nan") if is_float_array else CAT_MISSING_VALUE + + # If there are any NaN values, try to apply a the policy. + nan_values = [ + np.isnan(data).any() if is_float_array else (data == CAT_MISSING_VALUE).any() for data in data_splits.values() + ] + if any(nan_values): + if policy == CategoricalNaNPolicy.MOST_FREQUENT: + imputer = SimpleImputer(missing_values=missing_values, strategy=policy.value) + imputer.fit(data_splits[DataSplit.TRAIN.value]) + return {k: imputer.transform(v) for k, v in data_splits.items()} + raise ValueError(f"Unsupported cat_nan_policy: {policy.value}") + + # If no nan values are present. We do nothing. + return data_splits + + +def collapse_rare_categories(data_splits: ArrayDict, min_frequency: float) -> ArrayDict: + """ + Collapses rare categories in each column of the datasets under ``data_splits`` into a single category encoded by + the global variable CAT_RARE_VALUE. Categories considered rare are those not satisfying the ``min_frequency`` + threshold within the training split of ``data_splits``. + + NOTE: Arrays must be of type string + + Args: + data_splits: A dictionary containing data to process, split into different partitions. One of which must + be keys with DataSplit.TRAIN.value. + min_frequency: The minimum frequency threshold of the categories to keep. Has to be between 0 and 1. + + Returns: + The processed data. + """ + assert 0.0 < min_frequency < 1.0, "min_frequency has to be between 0 and 1" + + training_data = data_splits[DataSplit.TRAIN.value] + min_count = max(1, int(np.ceil(len(training_data) * min_frequency))) + # Creating a container to hold each of the edited columns of each data split. During transformation each column + # of the data becomes a list of entries (one for each row). The outer list holds all the columns in order. + new_data_split: dict[str, list[list[str]]] = {key: [] for key in data_splits} + + # Run through each of the columns in the training data + for column_idx in range(training_data.shape[1]): + counter = Counter(training_data[:, column_idx].tolist()) + popular_categories = {k for k, v in counter.items() if v >= min_count} + + for split, data_split in data_splits.items(): + data_split_column: list[str] = data_split[:, column_idx].tolist() + collapsed_categories = [ + (cat if cat in popular_categories else CAT_RARE_VALUE) for cat in data_split_column + ] + new_data_split[split].append(collapsed_categories) + + return {k: np.array(v).T for k, v in new_data_split.items()} + + +def encode_categorical_features( + datasets: ArrayDict, + encoding: CategoricalEncoding | None, + y_train: np.ndarray | None, + seed: int | None, + return_encoder: bool = False, +) -> tuple[ArrayDict, bool, Any | None]: + """ + Encode the categorical features of the dataset splits using the encoding strategy specified in the encoding + argument. + + Args: + datasets: The data to encode. + encoding: The kind of encoding to use. If None, will use CatEncoding.ORDINAL. + y_train: The target values. Will only be used for the "counter" encoding. Optional + seed: The seed to use for the random state. Only applied when using ``CategoricalEncoding.COUNTER``. Optional + return_encoder: Whether to return the encoder. Optional, default is False. + + Returns: + A tuple with the following values: + - The encoded data. + - A boolean value indicating if the data was converted to numerical. + - The encoder, if ``return_encoder`` is True. None otherwise. + """ + encoding = CategoricalEncoding.ORDINAL if encoding is None else encoding + y_train = None if encoding != CategoricalEncoding.COUNTER else y_train + + train_split = datasets[DataSplit.TRAIN.value] + + if encoding is None or encoding == CategoricalEncoding.ORDINAL: + unknown_value = np.iinfo("int64").max - 3 + ordinal_encoder = OrdinalEncoder( + handle_unknown="use_encoded_value", + unknown_value=unknown_value, + dtype="int64", + ) + encoder = make_pipeline(ordinal_encoder) + encoder.fit(train_split) + datasets = {k: encoder.transform(v) for k, v in datasets.items()} + + max_values = datasets[DataSplit.TRAIN.value].max(axis=0) + for split_name, data_split in datasets.items(): + # No corrections for train split + if split_name == DataSplit.TRAIN.value: + continue + for column_idx in range(data_split.shape[1]): + # Rows that match the unknown value for the column + unknown_value_rows = data_split[:, column_idx] == unknown_value + # Make unknown values in split one larger than max value in train + data_split[unknown_value_rows, column_idx] = max_values[column_idx] + 1 + + if return_encoder: + return datasets, False, encoder + return datasets, False, None + + if encoding == CategoricalEncoding.ONE_HOT: + one_hot_encoder = OneHotEncoder( + handle_unknown="ignore", + sparse_output=False, + dtype=np.float32, + ) + encoder = make_pipeline(one_hot_encoder) + encoder.fit(train_split) + datasets = {k: encoder.transform(v) for k, v in datasets.items()} + + elif encoding == CategoricalEncoding.COUNTER: + assert y_train is not None + leave_one_out = LeaveOneOutEncoder(sigma=0.1, random_state=seed, return_df=False) + encoder = make_pipeline(leave_one_out) + encoder.fit(train_split, y_train) + datasets = {k: encoder.transform(v).astype("float32") for k, v in datasets.items()} + else: + raise ValueError(f"Unsupported encoding: {encoding.value}") + + if return_encoder: + return datasets, True, encoder + return datasets, True, None + + +def transform_targets( + target_datasets: ArrayDict, policy: TargetPolicy | None, task_type: TaskType +) -> tuple[ArrayDict, dict[str, Any]]: + """ + Applies a transformation to the provided target values across data splits based on the policy specified in + ``policy``. If no policy is provided or the task type is not Regression, nothing is done. If the policy is + default and the task_type is regression the targets are centered and normalized using the mean and standard + deviation of the train targets. + + The info dictionary is meant to store the parameters used in the transformations so that they may be inverted + later. + + Args: + target_datasets: The target values across the dataset splits. + policy: The policy to use to build the target. Can be TargetPolicy.DEFAULT. If none, it will no-op. + task_type: The type of the task. + + Returns: + A tuple with the transformed target values across datasets and the metadata that stores information about + how the transformation was performed. + """ + info: dict[str, Any] = {"policy": policy} + if policy is None: + return target_datasets, info + + if policy == TargetPolicy.DEFAULT: + if task_type == TaskType.REGRESSION: + train_split = target_datasets[DataSplit.TRAIN.value] + mean = float(train_split.mean()) + std = float(train_split.std()) + target_datasets = {split: (target_data - mean) / std for split, target_data in target_datasets.items()} + info["mean"] = mean + info["std"] = std + else: + raise ValueError(f"Unsupported policy: {policy.value}") + + return target_datasets, info diff --git a/src/midst_toolkit/models/clavaddpm/dataset_utils.py b/src/midst_toolkit/models/clavaddpm/dataset_utils.py new file mode 100644 index 00000000..444328e7 --- /dev/null +++ b/src/midst_toolkit/models/clavaddpm/dataset_utils.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import pickle +from pathlib import Path +from typing import Any + +import numpy as np +import torch +from sklearn.preprocessing import LabelEncoder + +from midst_toolkit.common.enumerations import DataSplit +from midst_toolkit.models.clavaddpm.enumerations import ArrayDict, IsTargetConditioned + + +def load_pickle(path: Path | str, **kwargs: Any) -> Any: + """ + Load a pickle file. + + Args: + path: The path to the pickle file. + **kwargs: Additional arguments to pass to the pickle.loads function. + + Returns: + The loaded pickle file. + """ + return pickle.loads(Path(path).read_bytes(), **kwargs) + + +def dump_pickle(x: Any, path: Path | str, **kwargs: Any) -> None: + """ + Dump an object into a pickle file. + + Args: + x: The object to dump. + path: The path to the pickle file. + **kwargs: Additional arguments to pass to the pickle.dumps function. + """ + Path(path).write_bytes(pickle.dumps(x, **kwargs)) + + +def get_category_sizes(features: torch.Tensor | np.ndarray) -> list[int]: + """ + Get the size of the categories in the features tensor or array provided by counting the number of + unique values in each column. + + Args: + features: The data from which to extract category sizes. + + Returns: + A list with the category sizes in the data. + """ + columns_list = features.T.cpu().tolist() if isinstance(features, torch.Tensor) else features.T.tolist() + return [len(set(column)) for column in columns_list] + + +def get_categorical_and_numerical_column_names( + info: dict[str, Any], + is_target_conditioned: IsTargetConditioned, +) -> tuple[list[str], list[str]]: + """ + Get the categorical and numerical column names from the info dictionary. It will also consider whether the target + variable should be considered a categorical column or not. If ``is_target_conditioned`` is + ``IsTargetConditioned.CONCAT``, then the label column is considered part of the categorical or numerical columns. + If info["n_classes"] > 0, it is deemed a categorical column. If not, then it is deemed a numerical column. + + Args: + info: The info dictionary containing metadata for a dataset, including the names of the categorical and + numerical columns. + is_target_conditioned: The condition on the y column. + + Returns: + A tuple of lists with the categorical column names, followed by the numerical column names + """ + numerical_columns = info["num_cols"] if info["num_cols"] is not None else [] + categorical_columns = info["cat_cols"] if info["cat_cols"] is not None else [] + + if is_target_conditioned == IsTargetConditioned.CONCAT: + if info["n_classes"] > 0: + categorical_columns += [info["y_col"]] + else: + numerical_columns += [info["y_col"]] + + return categorical_columns, numerical_columns + + +def encode_and_merge_features( + categorical_features: ArrayDict | None, + numerical_features: ArrayDict | None, + noise_scale: float, +) -> tuple[ArrayDict, dict[int, LabelEncoder]]: + """ + Merge the categorical with the numerical features for train, validation, and test datasets. Numerical features + are first, followed by categorical features. + + The categorical features are encoded and then merged with the numerical features. The label encoders used to do + that are also returned. + + If ``noise_scale`` is greater than 0, noise from a normal distribution with a standard deviation of + ``noise_scale`` is added to the categorical features. + + Args: + categorical_features: A dictionary with the categorical features data for train, validation, and test datasets. + keys are "train", "val", "test" from the DataSplit enumeration + numerical_features: A dictionary with the numerical features data for train, validation, and test datasets. + keys are "train", "val", "test" from the DataSplit enumeration + noise_scale: The scale of the noise to add to the categorical features. Noise is drawn from a normal + distribution with standard deviation of ``noise_scale``. + + Returns: + The merged features for train, validation, and test datasets and the label encoders used to do so. The label + encoders are returned as a dictionary mapping column INDEX within the categorical columns to a label encoder + for that column. + """ + # if no categorical features, just return the numerical features + if categorical_features is None: + assert numerical_features is not None, "Both categorical and numerical features is empty." + return numerical_features, {} + + # Otherwise, encode the categorical features + all_categorical_data = np.vstack( + ( + categorical_features[DataSplit.TRAIN.value], + categorical_features[DataSplit.VALIDATION.value], + categorical_features[DataSplit.TEST.value], + ) + ) + + categorical_data_encoded = [] + label_encoders = {} + for column in range(all_categorical_data.shape[1]): + label_encoder = LabelEncoder() + encoded_labels = label_encoder.fit_transform(all_categorical_data[:, column]).astype(float) + if noise_scale > 0: + # add noise + encoded_labels += np.random.normal(0, noise_scale, encoded_labels.shape) + categorical_data_encoded.append(encoded_labels) + label_encoders[column] = label_encoder + + categorical_data_transposed = np.vstack(categorical_data_encoded).T + + # Split the encoded data back into the train, validation, and test splits. + num_train_samples = categorical_features[DataSplit.TRAIN.value].shape[0] + num_validation_samples = categorical_features[DataSplit.VALIDATION.value].shape[0] + + categorical_features[DataSplit.TRAIN.value] = categorical_data_transposed[:num_train_samples, :] + categorical_features[DataSplit.VALIDATION.value] = categorical_data_transposed[ + num_train_samples : num_train_samples + num_validation_samples, : + ] + categorical_features[DataSplit.TEST.value] = categorical_data_transposed[ + num_train_samples + num_validation_samples :, : + ] + + # if no numerical features then no need to merge, just return the categorical features + if numerical_features is None: + return categorical_features, label_encoders + + # Otherwise, merge the categorical and numerical features + merged_features = { + DataSplit.TRAIN.value: np.concatenate( + (numerical_features[DataSplit.TRAIN.value], categorical_features[DataSplit.TRAIN.value]), axis=1 + ), + DataSplit.VALIDATION.value: np.concatenate( + (numerical_features[DataSplit.VALIDATION.value], categorical_features[DataSplit.VALIDATION.value]), + axis=1, + ), + DataSplit.TEST.value: np.concatenate( + (numerical_features[DataSplit.TEST.value], categorical_features[DataSplit.TEST.value]), axis=1 + ), + } + + return merged_features, label_encoders diff --git a/src/midst_toolkit/models/clavaddpm/metrics.py b/src/midst_toolkit/models/clavaddpm/metrics.py new file mode 100644 index 00000000..fcb188ca --- /dev/null +++ b/src/midst_toolkit/models/clavaddpm/metrics.py @@ -0,0 +1,139 @@ +from typing import Any + +import numpy as np +from scipy.special import expit, softmax +from sklearn.metrics import classification_report, r2_score, roc_auc_score, root_mean_squared_error + +from midst_toolkit.common.enumerations import PredictionType, TaskType + + +def calculate_rmse(y_true: np.ndarray, y_pred: np.ndarray, standard_deviation: float | None) -> float: + """ + Calculate the root mean squared error (RMSE) of the predictions. + + Args: + y_true: The true labels as a numpy array. + y_pred: The predicted labels as a numpy array. + standard_deviation: The standard deviation of the labels. If provided, the RMSE is scaled by the standard + deviation. This is typically done if the original targets were scaled down by the standard deviation during + fitting/prediction. If None, the RMSE is calculated without this scaling. + + Returns: + The RMSE of the predictions. + """ + rmse = root_mean_squared_error(y_true, y_pred) + if standard_deviation is not None: + return rmse * standard_deviation + return rmse + + +def get_predicted_labels_and_probs( + y_pred: np.ndarray, task_type: TaskType, prediction_type: PredictionType | None +) -> tuple[np.ndarray, np.ndarray | None]: + """ + Get the labels and probabilities from the predictions. If ``prediction_type`` is None, will return the predicted + labels as is and the probabilities as None. + + Args: + y_pred: The predicted labels as a numpy array. + task_type: The type of the task. Can be ``TaskType.BINCLASS`` or ``TaskType.MULTICLASS`` or None. + Other task types are not supported. + prediction_type: The type of the predictions. Currently supported types are ``PredictionType.LOGITS`` and + ``PredictionType.PROBS``. If ``PredictionType.LOGITS`` then either they will be converted to binary + probabilities with a sigmoid for ``TaskType.BINCLASS`` or multi-class probabilities with a softmax for + ``TaskType.MULTICLASS``. ``PredictionType.PROBS`` implies the predictions are already in probability form. + If None, will return the predictions as labels and probabilities as None. + + Returns: + A tuple with the labels and probabilities. The probabilities are None if the ``prediction_type`` is None. + """ + assert task_type in {TaskType.BINCLASS, TaskType.MULTICLASS}, f"Unsupported task type: {task_type.value}" + + if prediction_type is None: + return y_pred, None + + if prediction_type == PredictionType.LOGITS: + # expit applies a sigmoid + prediction_probabilities = expit(y_pred) if task_type == TaskType.BINCLASS else softmax(y_pred, axis=1) + elif prediction_type == PredictionType.PROBS: + prediction_probabilities = y_pred + else: + raise ValueError(f"Unsupported prediction_type: {prediction_type.value}") + + assert prediction_probabilities is not None + predicted_labels = ( + np.round(prediction_probabilities) + if task_type == TaskType.BINCLASS + else prediction_probabilities.argmax(axis=1) + ) + return predicted_labels.astype("int64"), prediction_probabilities + + +def calculate_metrics( + y_true: np.ndarray, + y_pred: np.ndarray, + task_type: TaskType, + prediction_type: PredictionType | None, + y_info: dict[str, Any], +) -> dict[str, Any]: + """ + Calculate the metrics of the predictions. + + Example Usage: calculate_metrics(y_true, y_pred, TaskType.BINCLASS, PredictionType.LOGITS, {}) + + Args: + y_true: The true labels as a numpy array. + y_pred: The predicted labels as a numpy array. + task_type: The type of the task. + prediction_type: The type of the predictions. + y_info: A dictionary with metadata about the labels. + + Returns: + The metrics of the predictions as a dictionary with the following keys: + If the task type is TaskType.REGRESSION: + { + "rmse": The root mean squared error. + "r2": The R^2 score. + } + + If the task type is TaskType.MULTICLASS, it will have a key for each label + with the following metrics (result of sklearn.metrics.classification_report): + { + "label-1": { + "precision": The precision of the label. + "recall": The recall of the label. + "f1-score": The F1 score of the label. + "support": The number of occurrences of this label in y_true. + }, + "label-2": {...} + ... + } + + If the task type is TaskType.BINCLASS, it will have a key for each label + with the following metrics ((result of sklearn.metrics.classification_report), + and an additional ROC AUC metric: + { + "label-1": { + "precision": The precision of the label. + "recall": The recall of the label. + "f1-score": The F1 score of the label. + "support": The number of occurrences of this label in y_true. + }, + "label-2": {...} + ... + "roc_auc": The ROC AUC score. + } + """ + if task_type == TaskType.REGRESSION: + assert prediction_type is None + assert "std" in y_info + rmse = calculate_rmse(y_true, y_pred, y_info["std"]) + r2 = r2_score(y_true, y_pred) + return {"rmse": rmse, "r2": r2} + + labels, probs = get_predicted_labels_and_probs(y_pred, task_type, prediction_type) + result = classification_report(y_true, labels, output_dict=True) + assert isinstance(result, dict) + if task_type == TaskType.BINCLASS: + result["roc_auc"] = roc_auc_score(y_true, probs) + return result diff --git a/src/midst_toolkit/models/clavaddpm/train.py b/src/midst_toolkit/models/clavaddpm/train.py index fef421e2..5da742d5 100644 --- a/src/midst_toolkit/models/clavaddpm/train.py +++ b/src/midst_toolkit/models/clavaddpm/train.py @@ -15,7 +15,7 @@ from midst_toolkit.common.enumerations import DataSplit from midst_toolkit.common.logger import KeyValueLogger, log from midst_toolkit.models.clavaddpm.data_loaders import prepare_fast_dataloader -from midst_toolkit.models.clavaddpm.dataset import Dataset, Transformations, make_dataset_from_df +from midst_toolkit.models.clavaddpm.dataset import Dataset, Transformations from midst_toolkit.models.clavaddpm.enumerations import ( CategoricalEncoding, Configs, @@ -299,11 +299,11 @@ def train_model( - dataset: The dataset. - column_orders: The column orders. """ - dataset, label_encoders, column_orders = make_dataset_from_df( + dataset, label_encoders, column_orders = Dataset.make_dataset_from_df( data_frame, transformations, is_target_conditioned=model_params.is_target_conditioned, - data_split_ratios=data_split_ratios, + data_split_percentages=data_split_ratios, info=data_frame_info, noise_scale=0, ) @@ -415,12 +415,11 @@ def train_classifier( Returns: The trained classifier model. """ - # ruff: noqa: N806 - dataset, label_encoders, column_orders = make_dataset_from_df( + dataset, _, _ = Dataset.make_dataset_from_df( data_frame, transformations, is_target_conditioned=model_params.is_target_conditioned, - data_split_ratios=data_split_ratios, + data_split_percentages=data_split_ratios, info=data_frame_info, noise_scale=0, ) diff --git a/tests/unit/models/clavaddpm/test_dataset.py b/tests/unit/models/clavaddpm/test_dataset.py index 4b3b7073..028ca66d 100644 --- a/tests/unit/models/clavaddpm/test_dataset.py +++ b/tests/unit/models/clavaddpm/test_dataset.py @@ -1,20 +1,45 @@ from copy import deepcopy +from pathlib import Path import numpy as np import pytest +from midst_toolkit.common.enumerations import TaskType from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds from midst_toolkit.models.clavaddpm.dataset import ( - CAT_MISSING_VALUE, - CAT_RARE_VALUE, Dataset, - NumericalNaNPolicy, - TaskType, - collapse_rare_categories, - process_nans_in_categorical_features, + Transformations, + get_cached_dataset, process_nans_in_numerical_features, + setup_cache_path, +) +from midst_toolkit.models.clavaddpm.dataset_utils import dump_pickle +from midst_toolkit.models.clavaddpm.enumerations import ( + CategoricalEncoding, + Normalization, + NumericalNaNPolicy, ) -from midst_toolkit.models.clavaddpm.enumerations import CategoricalNaNPolicy + + +def test_load_dataset(tmp_path: Path) -> None: + train_array = np.random.randn(3, 3) + val_array = np.random.randn(3, 3) + test_array = np.random.randn(3, 3) + + np.save(tmp_path / "test_dataset_train.npy", train_array) + np.save(tmp_path / "test_dataset_val.npy", val_array) + np.save(tmp_path / "test_dataset_test.npy", test_array) + + # Need to also save label arrays, as thats how the load works... + np.save(tmp_path / "y_train.npy", np.random.randn(3, 1)) + np.save(tmp_path / "y_val.npy", np.random.randn(3, 1)) + np.save(tmp_path / "y_test.npy", np.random.randn(3, 1)) + + datasets = Dataset._load_datasets(tmp_path, "test_dataset") + + assert np.allclose(datasets["train"], train_array, atol=1e-8) + assert np.allclose(datasets["val"], val_array, atol=1e-8) + assert np.allclose(datasets["test"], test_array, atol=1e-8) def _compare_data_splits(test_splits: dict[str, np.ndarray], reference_splits: dict[str, np.ndarray]) -> bool: @@ -55,86 +80,6 @@ def _get_test_dataset() -> Dataset: return dataset -def test_process_nans_in_categorical_features() -> None: - set_all_random_seeds(42) - data_splits = { - "train": np.random.randint(low=0, high=2, size=(3, 3)).astype(float), - "val": np.random.randint(low=0, high=2, size=(3, 3)).astype(float), - "test": np.random.randint(low=0, high=2, size=(3, 3)).astype(float), - } - # Test when there are no NaNs - processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) - assert np.all(data_splits["train"] == processed_data_splits["train"]) - assert np.all(data_splits["val"] == processed_data_splits["val"]) - assert np.all(data_splits["test"] == processed_data_splits["test"]) - - # Make one of the train data set NaN but no others - data_splits["train"][0, 1] = float("nan") - processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) - assert processed_data_splits["train"][0, 1] == 0 - assert np.all(data_splits["val"] == processed_data_splits["val"]) - assert np.all(data_splits["test"] == processed_data_splits["test"]) - - # Try when test has a NaN - data_splits["test"][1, 1] = float("nan") - processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) - assert processed_data_splits["train"][0, 1] == 0 - assert processed_data_splits["test"][1, 1] == 0 - assert np.all(data_splits["val"] == processed_data_splits["val"]) - - # Try when no policy is provided - processed_data_splits = process_nans_in_categorical_features(data_splits, policy=None) - # NaNs should be left alone - assert np.isnan(data_splits["train"][0, 1]) - assert np.isnan(data_splits["test"][1, 1]) - - # Try with string values rather than numbers - data_splits = {k: v.astype(str).astype(object) for k, v in data_splits.items()} - data_splits["train"][0, 1] = CAT_MISSING_VALUE - data_splits["test"][1, 1] = CAT_MISSING_VALUE - processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) - assert processed_data_splits["train"][0, 1] == "0.0" - assert processed_data_splits["test"][1, 1] == "0.0" - - unset_all_random_seeds() - - -def test_collapse_rare_values() -> None: - set_all_random_seeds(42) - data_splits = { - "train": np.random.randint(low=0, high=2, size=(10, 10)).astype(str), - "val": np.random.randint(low=0, high=2, size=(10, 10)).astype(str), - "test": np.random.randint(low=0, high=2, size=(10, 10)).astype(str), - } - # Based on these settings, column index 6 in the train split ends up with 0 being rare (1 entry of 10) - # So it should be replaced with CAT_RARE_VALUE in all datasets. Otherwise, everywhere else should be equal - processed_data_splits = collapse_rare_categories(data_splits, 0.2) - assert processed_data_splits["train"][0, 6] == CAT_RARE_VALUE - assert processed_data_splits["val"][1, 6] == CAT_RARE_VALUE - assert processed_data_splits["val"][3, 6] == CAT_RARE_VALUE - assert processed_data_splits["val"][4, 6] == CAT_RARE_VALUE - assert processed_data_splits["val"][5, 6] == CAT_RARE_VALUE - assert processed_data_splits["val"][7, 6] == CAT_RARE_VALUE - assert processed_data_splits["test"][2, 6] == CAT_RARE_VALUE - assert processed_data_splits["test"][4, 6] == CAT_RARE_VALUE - assert processed_data_splits["test"][5, 6] == CAT_RARE_VALUE - assert processed_data_splits["test"][7, 6] == CAT_RARE_VALUE - assert processed_data_splits["test"][8, 6] == CAT_RARE_VALUE - # Make sure there are no other rares - assert np.sum(processed_data_splits["train"] != data_splits["train"]) == 1 - assert np.sum(processed_data_splits["val"] != data_splits["val"]) == 5 - assert np.sum(processed_data_splits["test"] != data_splits["test"]) == 5 - - # Now we create rare ones in both the train and validation sets - data_splits["train"][0, 1] = "5" - data_splits["val"][2, 1] = "5" - processed_data_splits = collapse_rare_categories(data_splits, 0.2) - assert processed_data_splits["train"][0, 1] == CAT_RARE_VALUE - assert processed_data_splits["val"][2, 1] == CAT_RARE_VALUE - - unset_all_random_seeds() - - def test_process_nans_in_numerical_features_drop() -> None: dataset = _get_test_dataset() numerical_data_splits = deepcopy(dataset.x_num) @@ -207,3 +152,33 @@ def test_process_nans_in_numerical_features_mean() -> None: dataset = process_nans_in_numerical_features(dataset=dataset, policy=NumericalNaNPolicy.MEAN) unset_all_random_seeds() + + +def test_setup_cache_path(tmp_path: Path) -> None: + transformations_1 = Transformations(seed=2, normalization=Normalization.QUANTILE) + transformations_2 = Transformations(seed=2, normalization=Normalization.MINMAX) + transformations_3 = Transformations(seed=2, normalization=Normalization.QUANTILE) + transformations_4 = Transformations(seed=2, categorical_encoding=CategoricalEncoding.ONE_HOT) + + path_1 = setup_cache_path(transformations_1, tmp_path) + path_2 = setup_cache_path(transformations_2, tmp_path) + path_3 = setup_cache_path(transformations_3, tmp_path) + path_4 = setup_cache_path(transformations_4, tmp_path) + assert path_1 == path_3 + assert path_1 != path_2 + assert path_1 != path_4 + + no_path = setup_cache_path(transformations_1, None) + assert no_path is None + + +def test_get_cached_dataset(tmp_path: Path) -> None: + transformations_1 = Transformations(seed=2, normalization=Normalization.QUANTILE) + dataset = _get_test_dataset() + + cache_path = setup_cache_path(transformations_1, tmp_path) + dump_pickle((transformations_1, dataset), cache_path) + + dataset_cache = get_cached_dataset(cache_path, transformations_1) + + assert np.allclose(dataset_cache.x_num["train"], dataset.x_num["train"], atol=1e-8) diff --git a/tests/unit/models/clavaddpm/test_dataset_transformations.py b/tests/unit/models/clavaddpm/test_dataset_transformations.py new file mode 100644 index 00000000..fc84dbbb --- /dev/null +++ b/tests/unit/models/clavaddpm/test_dataset_transformations.py @@ -0,0 +1,174 @@ +import numpy as np +import pytest + +from midst_toolkit.common.enumerations import TaskType +from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds +from midst_toolkit.models.clavaddpm.dataset_transformations import ( + CAT_MISSING_VALUE, + CAT_RARE_VALUE, + collapse_rare_categories, + encode_categorical_features, + process_nans_in_categorical_features, + transform_targets, +) +from midst_toolkit.models.clavaddpm.enumerations import CategoricalEncoding, CategoricalNaNPolicy, TargetPolicy + + +def test_process_nans_in_categorical_features() -> None: + set_all_random_seeds(42) + data_splits = { + "train": np.random.randint(low=0, high=2, size=(3, 3)).astype(float), + "val": np.random.randint(low=0, high=2, size=(3, 3)).astype(float), + "test": np.random.randint(low=0, high=2, size=(3, 3)).astype(float), + } + # Test when there are no NaNs + processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) + assert np.all(data_splits["train"] == processed_data_splits["train"]) + assert np.all(data_splits["val"] == processed_data_splits["val"]) + assert np.all(data_splits["test"] == processed_data_splits["test"]) + + # Make one of the train data set NaN but no others + data_splits["train"][0, 1] = float("nan") + processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) + assert processed_data_splits["train"][0, 1] == 0 + assert np.all(data_splits["val"] == processed_data_splits["val"]) + assert np.all(data_splits["test"] == processed_data_splits["test"]) + + # Try when test has a NaN + data_splits["test"][1, 1] = float("nan") + processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) + assert processed_data_splits["train"][0, 1] == 0 + assert processed_data_splits["test"][1, 1] == 0 + assert np.all(data_splits["val"] == processed_data_splits["val"]) + + # Try when no policy is provided + processed_data_splits = process_nans_in_categorical_features(data_splits, policy=None) + # NaNs should be left alone + assert np.isnan(data_splits["train"][0, 1]) + assert np.isnan(data_splits["test"][1, 1]) + + # Try with string values rather than numbers + data_splits = {k: v.astype(str).astype(object) for k, v in data_splits.items()} + data_splits["train"][0, 1] = CAT_MISSING_VALUE + data_splits["test"][1, 1] = CAT_MISSING_VALUE + processed_data_splits = process_nans_in_categorical_features(data_splits, CategoricalNaNPolicy.MOST_FREQUENT) + assert processed_data_splits["train"][0, 1] == "0.0" + assert processed_data_splits["test"][1, 1] == "0.0" + + unset_all_random_seeds() + + +def test_collapse_rare_values() -> None: + set_all_random_seeds(42) + data_splits = { + "train": np.random.randint(low=0, high=2, size=(10, 10)).astype(str), + "val": np.random.randint(low=0, high=2, size=(10, 10)).astype(str), + "test": np.random.randint(low=0, high=2, size=(10, 10)).astype(str), + } + # Based on these settings, column index 6 in the train split ends up with 0 being rare (1 entry of 10) + # So it should be replaced with CAT_RARE_VALUE in all datasets. Otherwise, everywhere else should be equal + processed_data_splits = collapse_rare_categories(data_splits, 0.2) + assert processed_data_splits["train"][0, 6] == CAT_RARE_VALUE + assert processed_data_splits["val"][1, 6] == CAT_RARE_VALUE + assert processed_data_splits["val"][3, 6] == CAT_RARE_VALUE + assert processed_data_splits["val"][4, 6] == CAT_RARE_VALUE + assert processed_data_splits["val"][5, 6] == CAT_RARE_VALUE + assert processed_data_splits["val"][7, 6] == CAT_RARE_VALUE + assert processed_data_splits["test"][2, 6] == CAT_RARE_VALUE + assert processed_data_splits["test"][4, 6] == CAT_RARE_VALUE + assert processed_data_splits["test"][5, 6] == CAT_RARE_VALUE + assert processed_data_splits["test"][7, 6] == CAT_RARE_VALUE + assert processed_data_splits["test"][8, 6] == CAT_RARE_VALUE + # Make sure there are no other rares + assert np.sum(processed_data_splits["train"] != data_splits["train"]) == 1 + assert np.sum(processed_data_splits["val"] != data_splits["val"]) == 5 + assert np.sum(processed_data_splits["test"] != data_splits["test"]) == 5 + + # Now we create rare ones in both the train and validation sets + data_splits["train"][0, 1] = "5" + data_splits["val"][2, 1] = "5" + processed_data_splits = collapse_rare_categories(data_splits, 0.2) + assert processed_data_splits["train"][0, 1] == CAT_RARE_VALUE + assert processed_data_splits["val"][2, 1] == CAT_RARE_VALUE + + unset_all_random_seeds() + + +def test_encode_categorical_features() -> None: + set_all_random_seeds(42) + categorical_features = { + "train": np.array([["cat", "dog"], ["lion", "wolf"], ["panther", "wolf"]]), + "val": np.array([["lion", "wolf"], ["lion", "wolf"], ["panther", "wolf"]]), + "test": np.array([["panther", "dog"], ["panther", "dingo"], ["panther", "coyote"]]), + } + + encoded_features, is_numeric, encoder = encode_categorical_features( + categorical_features, CategoricalEncoding.ORDINAL, None, None, True + ) + assert not is_numeric + assert encoder is not None + assert encoder["ordinalencoder"].categories_[0].tolist() == ["cat", "lion", "panther"] + assert encoder["ordinalencoder"].categories_[1].tolist() == ["dog", "wolf"] + + assert np.all(encoded_features["train"] == np.array([[0, 0], [1, 1], [2, 1]])) + assert np.all(encoded_features["val"] == np.array([[1, 1], [1, 1], [2, 1]])) + # Because dingo and coyote are unknown, they should be 1 "higher" than the largest encoded value in train (1) + assert np.all(encoded_features["test"] == np.array([[2, 0], [2, 2], [2, 2]])) + + encoded_features, is_numeric, encoder = encode_categorical_features(categorical_features, None, None, None, False) + assert not is_numeric + assert encoder is None + # Values should be the same as above, since ordinal is the default + assert np.all(encoded_features["train"] == np.array([[0, 0], [1, 1], [2, 1]])) + assert np.all(encoded_features["val"] == np.array([[1, 1], [1, 1], [2, 1]])) + # Because dingo and coyote are unknown, they should be 1 "higher" than the largest encoded value in train (1) + assert np.all(encoded_features["test"] == np.array([[2, 0], [2, 2], [2, 2]])) + + encoded_features, is_numeric, encoder = encode_categorical_features( + categorical_features, CategoricalEncoding.ONE_HOT, None, None, False + ) + assert is_numeric + assert encoder is None + assert np.all( + encoded_features["train"] + == np.array([[1.0, 0.0, 0.0, 1.0, 0.0], [0.0, 1.0, 0.0, 0.0, 1.0], [0.0, 0.0, 1.0, 0.0, 1.0]]).astype("float") + ) + assert np.all( + encoded_features["val"] + == np.array([[0.0, 1.0, 0.0, 0.0, 1.0], [0.0, 1.0, 0.0, 0.0, 1.0], [0.0, 0.0, 1.0, 0.0, 1.0]]).astype("float") + ) + # Note that the two examples with dingo and coyote or full-zero one-hot vectors because they are unknowns from + # the perspective of train and onehotencoder has handle_unknown as ignore + assert np.all( + encoded_features["test"] + == np.array([[0.0, 0.0, 1.0, 1.0, 0.0], [0.0, 0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 1.0, 0.0, 0.0]]).astype("float") + ) + unset_all_random_seeds() + + +def test_transform_targets() -> None: + set_all_random_seeds(42) + target_splits = { + "train": np.random.randn(3, 1).astype(float), + "val": np.random.randn(3, 1).astype(float), + "test": np.random.randn(3, 1).astype(float), + } + # Nothing should happen in these settings + new_targets, _ = transform_targets(target_splits, TargetPolicy.DEFAULT, task_type=TaskType.BINCLASS) + assert np.allclose(target_splits["train"], new_targets["train"], atol=1e-9) + assert np.allclose(target_splits["val"], new_targets["val"], atol=1e-9) + assert np.allclose(target_splits["test"], new_targets["test"], atol=1e-9) + new_targets, _ = transform_targets(target_splits, None, task_type=TaskType.BINCLASS) + assert np.allclose(target_splits["train"], new_targets["train"], atol=1e-9) + assert np.allclose(target_splits["val"], new_targets["val"], atol=1e-9) + assert np.allclose(target_splits["test"], new_targets["test"], atol=1e-9) + + new_targets, info = transform_targets(target_splits, TargetPolicy.DEFAULT, task_type=TaskType.REGRESSION) + assert pytest.approx(info["mean"], abs=1e-5) == 0.335379 + assert pytest.approx(info["std"], abs=1e-5) == 0.340540 + + assert np.allclose(new_targets["train"], np.array([[0.473763, -1.39086, 0.917101]]).T, atol=1e-4) + assert np.allclose(new_targets["val"], np.array([[3.48755, -1.67244, -1.67239]]).T, atol=1e-4) + assert np.allclose(new_targets["test"], np.array([[3.65253, 1.26874, -2.36346]]).T, atol=1e-4) + + unset_all_random_seeds() diff --git a/tests/unit/models/clavaddpm/test_dataset_utils.py b/tests/unit/models/clavaddpm/test_dataset_utils.py new file mode 100644 index 00000000..c9344a3c --- /dev/null +++ b/tests/unit/models/clavaddpm/test_dataset_utils.py @@ -0,0 +1,93 @@ +import numpy as np +import torch + +from midst_toolkit.common.enumerations import DataSplit +from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds +from midst_toolkit.models.clavaddpm.dataset_utils import ( + encode_and_merge_features, + get_categorical_and_numerical_column_names, + get_category_sizes, +) +from midst_toolkit.models.clavaddpm.enumerations import IsTargetConditioned + + +def test_get_category_sizes() -> None: + data = [[1, 0, 0], [1, 0, 0], [2, 1, 0], [3, 0, 0]] + tensor_to_process = torch.Tensor(data) + array_to_process = np.array(data) + + assert get_category_sizes(tensor_to_process) == [3, 2, 1] + assert get_category_sizes(array_to_process) == [3, 2, 1] + + +def test_get_categorical_and_numerical_column_names() -> None: + info_1 = {"num_cols": ["col_1", "col_3"], "cat_cols": ["col_2"], "y_col": "target", "n_classes": 0} + info_2 = {"num_cols": ["col_1", "col_2"], "cat_cols": ["col_3"], "y_col": "target", "n_classes": 2} + + categorical_columns, numerical_columns = get_categorical_and_numerical_column_names( + info_1, is_target_conditioned=IsTargetConditioned.NONE + ) + assert categorical_columns == ["col_2"] + assert numerical_columns == ["col_1", "col_3"] + categorical_columns, numerical_columns = get_categorical_and_numerical_column_names( + info_2, is_target_conditioned=IsTargetConditioned.NONE + ) + assert categorical_columns == ["col_3"] + assert numerical_columns == ["col_1", "col_2"] + + categorical_columns, numerical_columns = get_categorical_and_numerical_column_names( + info_1, is_target_conditioned=IsTargetConditioned.CONCAT + ) + assert categorical_columns == ["col_2"] + assert numerical_columns == ["col_1", "col_3", "target"] + categorical_columns, numerical_columns = get_categorical_and_numerical_column_names( + info_2, is_target_conditioned=IsTargetConditioned.CONCAT + ) + assert categorical_columns == ["col_3", "target"] + assert numerical_columns == ["col_1", "col_2"] + + +def test_encode_and_merge_features() -> None: + set_all_random_seeds(42) + + numerical_features = { + DataSplit.TRAIN.value: np.random.randn(3, 3), + DataSplit.VALIDATION.value: np.random.randn(3, 3), + DataSplit.TEST.value: np.random.randn(3, 3), + } + + categorical_features = { + DataSplit.TRAIN.value: np.array([["cat", "dog"], ["lion", "wolf"], ["panther", "wolf"]]), + DataSplit.VALIDATION.value: np.array([["lion", "wolf"], ["lion", "wolf"], ["panther", "wolf"]]), + DataSplit.TEST.value: np.array([["panther", "dog"], ["panther", "dingo"], ["panther", "coyote"]]), + } + + merged_data, encoders = encode_and_merge_features( + categorical_features=categorical_features, numerical_features=numerical_features, noise_scale=0.1 + ) + + assert np.all(merged_data[DataSplit.TRAIN.value][:, 0:3] == numerical_features[DataSplit.TRAIN.value]) + assert np.all(merged_data[DataSplit.VALIDATION.value][:, 0:3] == numerical_features[DataSplit.VALIDATION.value]) + assert np.all(merged_data[DataSplit.TEST.value][:, 0:3] == numerical_features[DataSplit.TEST.value]) + + assert np.allclose( + merged_data[DataSplit.TRAIN.value][:, 3:5], + np.array([[0.0375698, 2.02088636], [0.93993613, 2.80403299], [1.97083063, 2.8671814]]), + atol=1e-5, + ) + assert np.allclose( + merged_data[DataSplit.VALIDATION.value][:, 3:5], + np.array([[0.93982934, 3.01968612], [1.18522782, 3.07384666], [1.99865028, 3.01713683]]), + atol=1e-5, + ) + assert np.allclose( + merged_data[DataSplit.TEST.value][:, 3:5], + np.array([[1.89422891, 1.98843517], [2.08225449, 0.96988963], [1.87791564, -0.1478522]]), + atol=1e-5, + ) + + assert len(encoders) == 2 + assert encoders[0].classes_.tolist() == ["cat", "lion", "panther"] + assert encoders[1].classes_.tolist() == ["coyote", "dingo", "dog", "wolf"] + + unset_all_random_seeds() From 17944bbe98c56ad73f23d6d80179217245583840 Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Fri, 31 Oct 2025 14:51:30 -0400 Subject: [PATCH 02/11] Small cleanup --- src/midst_toolkit/models/clavaddpm/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/midst_toolkit/models/clavaddpm/dataset.py b/src/midst_toolkit/models/clavaddpm/dataset.py index 9f365720..6a03749c 100644 --- a/src/midst_toolkit/models/clavaddpm/dataset.py +++ b/src/midst_toolkit/models/clavaddpm/dataset.py @@ -8,7 +8,7 @@ from dataclasses import astuple, dataclass, replace from logging import INFO from pathlib import Path -from typing import Any, Self +from typing import Any import numpy as np import pandas as pd @@ -76,7 +76,7 @@ class Dataset: numerical_transform: StandardScaler | None = None @classmethod - def from_dir(cls, directory: Path) -> Self: + def from_dir(cls, directory: Path) -> Dataset: """ Load a dataset from a directory. From 5f024b07d080f031483ba22064e6ed028cae7b95 Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Fri, 31 Oct 2025 15:44:47 -0400 Subject: [PATCH 03/11] Addressing some coderabbit comments --- .../models/clavaddpm/dataset_transformations.py | 5 ++++- src/midst_toolkit/models/clavaddpm/metrics.py | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/midst_toolkit/models/clavaddpm/dataset_transformations.py b/src/midst_toolkit/models/clavaddpm/dataset_transformations.py index 635f9810..ab623882 100644 --- a/src/midst_toolkit/models/clavaddpm/dataset_transformations.py +++ b/src/midst_toolkit/models/clavaddpm/dataset_transformations.py @@ -56,9 +56,12 @@ def normalize( elif normalization == Normalization.MINMAX: normalizer = MinMaxScaler() elif normalization == Normalization.QUANTILE: + n_samples = train_split.shape[0] + n_quantiles = max(min(n_samples // 30, 1000), 10) + n_quantiles = min(n_quantiles, n_samples) normalizer = QuantileTransformer( output_distribution="normal", - n_quantiles=max(min(train_split.shape[0] // 30, 1000), 10), + n_quantiles=n_quantiles, subsample=int(1e9), random_state=seed, ) diff --git a/src/midst_toolkit/models/clavaddpm/metrics.py b/src/midst_toolkit/models/clavaddpm/metrics.py index fcb188ca..02d20d05 100644 --- a/src/midst_toolkit/models/clavaddpm/metrics.py +++ b/src/midst_toolkit/models/clavaddpm/metrics.py @@ -61,6 +61,7 @@ def get_predicted_labels_and_probs( raise ValueError(f"Unsupported prediction_type: {prediction_type.value}") assert prediction_probabilities is not None + assert prediction_probabilities.ndim == 1 or prediction_probabilities.shape[1] == 1 predicted_labels = ( np.round(prediction_probabilities) if task_type == TaskType.BINCLASS @@ -135,5 +136,6 @@ def calculate_metrics( result = classification_report(y_true, labels, output_dict=True) assert isinstance(result, dict) if task_type == TaskType.BINCLASS: + assert probs is not None, "Probabilities need to be defined to compute roc_acu" result["roc_auc"] = roc_auc_score(y_true, probs) return result From 72d5a3f02ddc11e21659e93a100d2a2210419cfa Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Sat, 1 Nov 2025 11:16:02 -0400 Subject: [PATCH 04/11] Addressing some comments from Behnoosh --- .../attacks/ensemble/clavaddpm_fine_tuning.py | 1 + src/midst_toolkit/models/clavaddpm/dataset.py | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py b/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py index a41ca4cd..8ce13192 100644 --- a/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py +++ b/src/midst_toolkit/attacks/ensemble/clavaddpm_fine_tuning.py @@ -85,6 +85,7 @@ def fine_tune_model( fine_tuning_data, transformations, is_target_conditioned=model_params.is_target_conditioned, + # TODO change data_split_ratios to percentage in other parts of the code. data_split_percentages=data_split_ratios, info=fine_tuning_data_info, noise_scale=0, diff --git a/src/midst_toolkit/models/clavaddpm/dataset.py b/src/midst_toolkit/models/clavaddpm/dataset.py index 6a03749c..eaa98faf 100644 --- a/src/midst_toolkit/models/clavaddpm/dataset.py +++ b/src/midst_toolkit/models/clavaddpm/dataset.py @@ -101,10 +101,8 @@ def from_dir(cls, directory: Path) -> Dataset: @classmethod def _load_datasets(cls, directory: Path, dataset_name: str) -> ArrayDict: """ - Load all the dataset splits from a directory. - - Will check which of the splits exist in the directory for the - given dataset_name and load all of them. + Load all the dataset splits from a directory. Will check which of the splits exist in the directory for the + given ``dataset_name`` and load all of them. Args: directory: The directory to load the dataset from. @@ -114,6 +112,8 @@ def _load_datasets(cls, directory: Path, dataset_name: str) -> ArrayDict: The loaded datasets with all the splits. """ splits = [k.value for k in list(DataSplit) if directory.joinpath(f"y_{k.value}.npy").exists()] + if not len(splits) > 0: + raise ValueError("Splits to be loaded is empty!") datasets: ArrayDict = {} for split in splits: dataset = np.load(directory / f"{dataset_name}_{split}.npy", allow_pickle=True) @@ -370,9 +370,9 @@ def make_dataset_from_df( assert isinstance(info["n_classes"], int) dataset = Dataset( - features, - None, - target, + x_num=features, + x_cat=None, + y=target, y_info={}, task_type=TaskType(info["task_type"]), n_classes=info["n_classes"], @@ -388,11 +388,12 @@ def setup_cache_path(transformations: Transformations, cache_dir: Path | None) - Args: transformations: Set of transformations to be cached. - cache_dir: Directory to look for the cached transformations and datasets pickle. This will be used as the - stub and the path will be determined by the specified transformations + cache_dir: Directory to look for the tuple of cached transformations and dataset pickle. This will be used as + the stub and the path will be determined by the specified transformations Returns: - _description_ + A path to the cache file based on the hash of the transformations and their names. It may exist already + (will be loaded from there if so) or represent the name of the cache to be saved. """ if cache_dir is None: log(INFO, "No cache_dir provided. Will not attempt to load or save transformed dataset from/to cache") From 1cebf9a12d9c7605b21b4b34c6a01af617d92177 Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Mon, 3 Nov 2025 07:33:22 -0500 Subject: [PATCH 05/11] A few more PR comments --- .../models/clavaddpm/dataset_transformations.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/midst_toolkit/models/clavaddpm/dataset_transformations.py b/src/midst_toolkit/models/clavaddpm/dataset_transformations.py index ab623882..7e72bea9 100644 --- a/src/midst_toolkit/models/clavaddpm/dataset_transformations.py +++ b/src/midst_toolkit/models/clavaddpm/dataset_transformations.py @@ -102,8 +102,8 @@ def process_nans_in_categorical_features(data_splits: ArrayDict, policy: Categor Process the NaN values in the categorical features of the datasets provided. Supports only string or float arrays. Args: - data_splits: A dictionary containing data to process, split into different partitions. One of which must - be keys with DataSplit.TRAIN.value. + data_splits: A dictionary containing data to process, split into different partitions. One of the keys must + be DataSplit.TRAIN.value. policy: The policy to use to process the NaN values. If none, will no-op. Returns: @@ -121,7 +121,7 @@ def process_nans_in_categorical_features(data_splits: ArrayDict, policy: Categor # Value that we're looking for to replace missing_values = float("nan") if is_float_array else CAT_MISSING_VALUE - # If there are any NaN values, try to apply a the policy. + # If there are any NaN values, try to apply the policy. nan_values = [ np.isnan(data).any() if is_float_array else (data == CAT_MISSING_VALUE).any() for data in data_splits.values() ] @@ -145,8 +145,8 @@ def collapse_rare_categories(data_splits: ArrayDict, min_frequency: float) -> Ar NOTE: Arrays must be of type string Args: - data_splits: A dictionary containing data to process, split into different partitions. One of which must - be keys with DataSplit.TRAIN.value. + data_splits: A dictionary containing data to process, split into different partitions. One of the keys must be + DataSplit.TRAIN.value.. min_frequency: The minimum frequency threshold of the categories to keep. Has to be between 0 and 1. Returns: From 5e0edaccdec6ee49a1d1de0d66bf542b7d6d1358 Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Mon, 3 Nov 2025 14:00:05 -0500 Subject: [PATCH 06/11] Addressing normalization todo comment --- .../models/clavaddpm/clustering.py | 126 +++++++++--------- .../models/clavaddpm/enumerations.py | 6 +- tests/{ => unit}/common/test_random.py | 0 .../unit/models/clavaddpm/test_clustering.py | 83 ++++++++++++ 4 files changed, 146 insertions(+), 69 deletions(-) rename tests/{ => unit}/common/test_random.py (100%) create mode 100644 tests/unit/models/clavaddpm/test_clustering.py diff --git a/src/midst_toolkit/models/clavaddpm/clustering.py b/src/midst_toolkit/models/clavaddpm/clustering.py index b43e7396..a98b659c 100644 --- a/src/midst_toolkit/models/clavaddpm/clustering.py +++ b/src/midst_toolkit/models/clavaddpm/clustering.py @@ -18,8 +18,8 @@ from midst_toolkit.models.clavaddpm.enumerations import ( ClusteringMethod, Configs, + DataAndKeyNormalizationType, GroupLengthsProbDicts, - KeyScalingType, RelationOrder, Tables, ) @@ -166,6 +166,7 @@ def _pair_clustering( num_clusters: int, parent_scale: float, key_scale: float, + data_and_key_normalization: DataAndKeyNormalizationType = DataAndKeyNormalizationType.MINMAX, clustering_method: ClusteringMethod = ClusteringMethod.KMEANS, ) -> tuple[pd.DataFrame, pd.DataFrame, dict[int, dict[int, float]]]: """ @@ -178,11 +179,12 @@ def _pair_clustering( parent_name: Name of the parent table. child_name: Name of the child table. num_clusters: Number of clusters. - parent_scale: Scaling factor applied to the parent table, provided by the config. - It will be applied to the features to weight their importance during clustering. - key_scale: Scaling factor applied to the foreign key values that link - the child table to the parent table. This will weight how much influence - the parent-child relationship has in the clustering algorithm. + parent_scale: Scaling factor applied to the parent table, provided by the config. It will be applied to the + features to weight their importance during clustering. + key_scale: Scaling factor applied to the foreign key values that link the child table to the parent table. + This will weight how much influence the parent-child relationship has in the clustering algorithm. + data_and_key_normalization: Type of normalization for the child and parent data and keys. Default is + ``DataAndKeyNormalizationType.MINMAX.`` clustering_method: Method of clustering. Default is ClusteringMethod.KMEANS. Returns: @@ -229,6 +231,7 @@ def _pair_clustering( parent_primary_key, parent_scale, key_scale, + data_and_key_normalization, ) cluster_labels = _get_cluster_labels(cluster_data, clustering_method, num_clusters) @@ -335,35 +338,42 @@ def _merge_parent_data_with_child_data( return merged_parent_data -def _get_min_max_and_quantile_for_numerical_columns( +def get_normalized_numerical_columns( child_numerical_data: np.ndarray, parent_numerical_data: np.ndarray, parent_scale: float, -) -> tuple[np.ndarray, np.ndarray]: + normalization_method: DataAndKeyNormalizationType = DataAndKeyNormalizationType.MINMAX, +) -> np.ndarray: """ - Get the min-max and quantile values for the numerical columns in both the - child and parent data. + The child and parent table numerical data are merged and then normalized together according to the normalization + scheme specified by ``normalization_method``. After normalization, data in the parent numerical data is scaled + by the ``parent_scale`` float. Args: - child_numerical_data: Numpy array of the child numerical data. - parent_numerical_data: Numpy array of the parent numerical data. - parent_scale: Scaling factor applied to the parent data. + child_numerical_data: Numpy array of the child table numerical data. + parent_numerical_data: Numpy array of the parent table numerical data. + parent_scale: Scaling factor applied to the parent data AFTER normalization. + normalization_method: The approach to be used to normalized the combined data. Defaults to + DataAndKeyNormalizationType.MINMAX. Returns: - A tuple with two numpy arrays, one with the min-max values and one with the quantile - values for the numerical columns. + A numpy array containing the merged child and parent table (in that order) numerical data, normalized using + the specified strategy and while child data scaled by the provided ``parent_scale`` """ joint_matrix = np.concatenate([child_numerical_data, parent_numerical_data], axis=1) - matrix_p_index = child_numerical_data.shape[1] + parent_start_index = child_numerical_data.shape[1] - # Perform quantile normalization using QuantileTransformer - numerical_quantile = _quantile_normalize_sklearn(joint_matrix) - numerical_min_max = _min_max_normalize_sklearn(joint_matrix) + if normalization_method == DataAndKeyNormalizationType.MINMAX: + normalized_data = _min_max_normalize_sklearn(joint_matrix) + elif normalization_method == DataAndKeyNormalizationType.QUANTILE: + normalized_data = _quantile_normalize_sklearn(joint_matrix) + else: + raise ValueError(f"Unrecognized Normalization Method: {normalization_method}") - numerical_quantile[:, matrix_p_index:] = parent_scale * numerical_quantile[:, matrix_p_index:] - numerical_min_max[:, matrix_p_index:] = parent_scale * numerical_min_max[:, matrix_p_index:] + # Scale the parent data using the parent scale value + normalized_data[:, parent_start_index:] = parent_scale * normalized_data[:, parent_start_index:] - return numerical_min_max, numerical_quantile + return normalized_data def _one_hot_encode_categorical_columns( @@ -424,28 +434,28 @@ def _prepare_cluster_data( parent_primary_key: str, parent_scale: float, key_scale: float, - key_scaling_type: KeyScalingType = KeyScalingType.MINMAX, + data_and_key_normalization: DataAndKeyNormalizationType = DataAndKeyNormalizationType.MINMAX, ) -> np.ndarray: """ - Prepare the data for the clustering algorithm, which comprises of merging the parent - and child data, splitting the data into categorical and numerical columns, and - normalizing the data. + Prepare the data for the clustering algorithm, which comprises of merging the parent and child data, splitting + the data into categorical and numerical columns, and normalizing the data. Args: child_data: Numpy array of the child data. parent_data: Numpy array of the parent data. - child_domain: Dictionary of the domain of the child table. The domain dictionary - holds metadata about the columns of each one of the tables. - parent_domain: Dictionary of the domain of the parent table. The domain dictionary - holds metadata about the columns of each one of the tables. + child_domain: Dictionary of the domain of the child table. The domain dictionary holds metadata about the + columns of each one of the tables. + parent_domain: Dictionary of the domain of the parent table. The domain dictionary holds metadata about the + columns of each one of the tables. all_child_columns: List of all child columns. all_parent_columns: List of all parent columns. parent_primary_key: Name of the parent primary key. - parent_scale: Scaling factor applied to the parent table, provided by the config. - It will be applied to the features to weight their importance during clustering. - key_scale: Scaling factor applied to the tables' keys. This will weight how much influence - the parent-child relationship has in the clustering algorithm. - key_scaling_type: Type of scaling for the tables' keys. Default is KeyScalingType.MINMAX. + parent_scale: Scaling factor applied to the parent table, provided by the config. It will be applied to the + features to weight their importance during clustering. + key_scale: Scaling factor applied to the tables' keys. This will weight how much influence the parent-child + relationship has in the clustering algorithm. + data_and_key_normalization: Type of normalization for the child and parent data and keys. Default is + ``DataAndKeyNormalizationType.MINMAX.`` Returns: Numpy array of the data prepared for the clustering algorithm. @@ -477,21 +487,21 @@ def _prepare_cluster_data( parent_numerical_data = merged_data[:, parent_numerical_columns] parent_categorical_data = merged_data[:, parent_categorical_columns] - numerical_min_max, numerical_quantile = _get_min_max_and_quantile_for_numerical_columns( + numerical_normalized = get_normalized_numerical_columns( child_numerical_data, parent_numerical_data, parent_scale, + data_and_key_normalization, ) - reshaped_parent_data = merged_data[:, parent_primary_key_index].reshape(-1, 1) - if key_scaling_type == KeyScalingType.MINMAX: - key_normalized = _min_max_normalize_sklearn(reshaped_parent_data) - numerical_normalized = numerical_min_max - elif key_scaling_type == KeyScalingType.QUANTILE: - key_normalized = _quantile_normalize_sklearn(reshaped_parent_data) - numerical_normalized = numerical_quantile + # Normalizing the parent table primary key data. + reshaped_parent_primary_key_data = merged_data[:, parent_primary_key_index].reshape(-1, 1) + if data_and_key_normalization == DataAndKeyNormalizationType.MINMAX: + key_normalized = _min_max_normalize_sklearn(reshaped_parent_primary_key_data) + elif data_and_key_normalization == DataAndKeyNormalizationType.QUANTILE: + key_normalized = _quantile_normalize_sklearn(reshaped_parent_primary_key_data) else: - raise ValueError(f"Unsupported foreign key scaling type: {key_scaling_type}") + raise ValueError(f"Unsupported data and key scaling type: {data_and_key_normalization}") key_scaled = key_scale * key_normalized @@ -727,13 +737,12 @@ def _get_group_data( return np.array(group_data_list, dtype=object) -# TODO: Refactor the functions below to be a single one with a "method" parameter. - - def _quantile_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: """ Quantile normalize the input matrix using Sklearn's QuantileTransformer. + NOTE: Each column in normalized INDIVIDUALLY + Args: matrix: Numpy array of the matrix data. @@ -745,21 +754,15 @@ def _quantile_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: random_state=42, # TODO: do we really need to hardcode the random state? ) # Change output_distribution as needed - normalized_data = np.empty((matrix.shape[0], 0)) - - # Apply QuantileTransformer to each column and concatenate the results - for col in range(matrix.shape[1]): - column = matrix[:, col].reshape(-1, 1) - transformed_column = transformer.fit_transform(column) - normalized_data = np.concatenate((normalized_data, transformed_column), axis=1) - - return normalized_data + return transformer.fit_transform(matrix) def _min_max_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: """ Min-max normalize the input matrix using Sklearn's MinMaxScaler. + NOTE: Each column in normalized INDIVIDUALLY + Args: matrix: Numpy array of the matrix data. @@ -767,16 +770,7 @@ def _min_max_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: Numpy array of the normalized data. """ scaler = MinMaxScaler(feature_range=(-1, 1)) - - normalized_data = np.empty((matrix.shape[0], 0)) - - # Apply MinMaxScaler to each column and concatenate the results - for col in range(matrix.shape[1]): - column = matrix[:, col].reshape(-1, 1) - transformed_column = scaler.fit_transform(column) - normalized_data = np.concatenate((normalized_data, transformed_column), axis=1) - - return normalized_data + return scaler.fit_transform(matrix) def _aggregate_and_sample( diff --git a/src/midst_toolkit/models/clavaddpm/enumerations.py b/src/midst_toolkit/models/clavaddpm/enumerations.py index 645d4fc7..200002e2 100644 --- a/src/midst_toolkit/models/clavaddpm/enumerations.py +++ b/src/midst_toolkit/models/clavaddpm/enumerations.py @@ -15,7 +15,7 @@ class ClusteringMethod(Enum): - """Possioble clustering methods for multi-table training.""" + """Possible clustering methods for multi-table training.""" KMEANS = "kmeans" GMM = "gmm" @@ -101,8 +101,8 @@ class TargetType(Enum): LONG = "long" -class KeyScalingType(Enum): - """Possible types of scaling for the foreign key.""" +class DataAndKeyNormalizationType(Enum): + """Possible types of normalization for data and primary keys when clustering.""" MINMAX = "minmax" QUANTILE = "quantile" diff --git a/tests/common/test_random.py b/tests/unit/common/test_random.py similarity index 100% rename from tests/common/test_random.py rename to tests/unit/common/test_random.py diff --git a/tests/unit/models/clavaddpm/test_clustering.py b/tests/unit/models/clavaddpm/test_clustering.py new file mode 100644 index 00000000..f88555dd --- /dev/null +++ b/tests/unit/models/clavaddpm/test_clustering.py @@ -0,0 +1,83 @@ +import numpy as np + +from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds +from midst_toolkit.models.clavaddpm.clustering import ( + _min_max_normalize_sklearn, + _quantile_normalize_sklearn, + get_normalized_numerical_columns, +) +from midst_toolkit.models.clavaddpm.enumerations import DataAndKeyNormalizationType + + +def test_quantile_normalize_sklearn() -> None: + set_all_random_seeds(42) + data_to_normalize = np.random.randint(0, 3, (5, 5)) + normalized_data = _quantile_normalize_sklearn(data_to_normalize) + assert np.allclose( + normalized_data, + np.array( + [ + [5.19933758, -5.19933758, 5.19933758, 5.19933758, -5.19933758], + [-5.19933758, 5.19933758, 0.0, 5.19933758, 5.19933758], + [5.19933758, 5.19933758, -5.19933758, 5.19933758, 0.31863936], + [-5.19933758, 0.0, 0.0, -5.19933758, 0.31863936], + [-5.19933758, -5.19933758, 0.0, -5.19933758, -5.19933758], + ] + ), + atol=1e-5, + ) + unset_all_random_seeds() + + +def test_min_max_normalize_sklearn() -> None: + set_all_random_seeds(42) + data_to_normalize = np.random.randint(0, 3, (5, 5)) + normalized_data = _min_max_normalize_sklearn(data_to_normalize) + assert np.allclose( + normalized_data, + np.array( + [ + [1.0, -1.0, 1.0, 1.0, -1.0], + [-1.0, 1.0, 0.0, 1.0, 1.0], + [1.0, 1.0, -1.0, 1.0, 0.0], + [-1.0, 0.0, 0.0, -1.0, 0.0], + [-1.0, -1.0, 0.0, -1.0, -1.0], + ] + ), + atol=1e-8, + ) + unset_all_random_seeds() + + +def test_get_normalized_numerical_columns() -> None: + set_all_random_seeds(42) + child_data = np.random.randint(0, 3, (3, 3)) + parent_data = np.random.randint(0, 3, (3, 3)) + scale = 2.0 + normalization_type = DataAndKeyNormalizationType.MINMAX + + normalized_data = get_normalized_numerical_columns(child_data, parent_data, scale, normalization_type) + assert np.allclose( + normalized_data, + np.array( + [[-1.0, -1.0, 1.0, 2.0, 2.0, 2.0], [-1.0, -1.0, -1.0, -2.0, 2.0, -2.0], [-1.0, 1.0, 1.0, -2.0, -2.0, -2.0]] + ), + atol=1e-6, + ) + + normalization_type = DataAndKeyNormalizationType.QUANTILE + normalized_data = get_normalized_numerical_columns(child_data, parent_data, scale, normalization_type) + + assert np.allclose( + normalized_data, + np.array( + [ + [-5.19933758, -5.19933758, 5.19933758, 2 * 5.19933758, 2 * 5.19933758, 2 * 5.19933758], + [-5.19933758, -5.19933758, -5.19933758, 2 * -5.19933758, 2 * 5.19933758, 2 * -5.19933758], + [-5.19933758, 5.19933758, 5.19933758, 2 * -5.19933758, 2 * -5.19933758, 2 * -5.19933758], + ] + ), + atol=1e-5, + ) + + unset_all_random_seeds() From ed705bbad455650458736eb71a09bd0cc38a657a Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Mon, 3 Nov 2025 14:05:47 -0500 Subject: [PATCH 07/11] Some small fixes --- src/midst_toolkit/models/clavaddpm/clustering.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/midst_toolkit/models/clavaddpm/clustering.py b/src/midst_toolkit/models/clavaddpm/clustering.py index a98b659c..c40e651f 100644 --- a/src/midst_toolkit/models/clavaddpm/clustering.py +++ b/src/midst_toolkit/models/clavaddpm/clustering.py @@ -501,7 +501,7 @@ def _prepare_cluster_data( elif data_and_key_normalization == DataAndKeyNormalizationType.QUANTILE: key_normalized = _quantile_normalize_sklearn(reshaped_parent_primary_key_data) else: - raise ValueError(f"Unsupported data and key scaling type: {data_and_key_normalization}") + raise ValueError(f"Unsupported data and key normalization type: {data_and_key_normalization}") key_scaled = key_scale * key_normalized @@ -741,8 +741,6 @@ def _quantile_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: """ Quantile normalize the input matrix using Sklearn's QuantileTransformer. - NOTE: Each column in normalized INDIVIDUALLY - Args: matrix: Numpy array of the matrix data. @@ -761,8 +759,6 @@ def _min_max_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: """ Min-max normalize the input matrix using Sklearn's MinMaxScaler. - NOTE: Each column in normalized INDIVIDUALLY - Args: matrix: Numpy array of the matrix data. From abe8a17fd515d505b3840cf1acb558f337b0839b Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Mon, 3 Nov 2025 17:51:15 -0500 Subject: [PATCH 08/11] Some refactors to improve the grouping code and remove the todo --- .../models/clavaddpm/clustering.py | 98 +++++--- .../unit/models/clavaddpm/test_clustering.py | 233 ++++++++++++++++++ 2 files changed, 292 insertions(+), 39 deletions(-) diff --git a/src/midst_toolkit/models/clavaddpm/clustering.py b/src/midst_toolkit/models/clavaddpm/clustering.py index c40e651f..71ac1465 100644 --- a/src/midst_toolkit/models/clavaddpm/clustering.py +++ b/src/midst_toolkit/models/clavaddpm/clustering.py @@ -316,7 +316,7 @@ def _merge_parent_data_with_child_data( child_data: Numpy array of the child data. Should be sorted by the foreign key. parent_data: Numpy array of the parent data. Should be sorted by the parent primary key. parent_primary_key_index: Index of the parent primary key. - foreign_key_index: Index of the foreign key to the child data. + foreign_key_index: Index of the foreign key in the child data. Returns: Numpy array of the parent data merged for each group of the child group data. @@ -672,69 +672,89 @@ def _get_categorical_and_numerical_columns( def _get_group_data_dict( - np_data: np.ndarray, - group_id_attrs: list[int] | None = None, + data_to_be_grouped: np.ndarray, + column_indices_to_group_by: list[int] | None = None, ) -> dict[tuple[Any, ...], list[np.ndarray]]: """ Group rows in a numpy array by their values in specified grouping columns into a dictionary. - Returns a dict where keys are tuples of grouping values and values are lists of corresponding rows. + Returns a dict where keys are tuples of grouping values and values are lists of corresponding rows (groups). Args: - np_data: Numpy array of the data. - group_id_attrs: List of attributes to group by. + data_to_be_grouped: Numpy array of the data to be grouped. + column_indices_to_group_by: List of column indices by which to group the data. Returns: - Dictionary of group data. + Dictionary of group data where the keys are tuples of unique entries in the specified columns and the values + are a list of ROWS from the ``data_to_be_grouped`` where the specified columns are shared values """ - if group_id_attrs is None: - group_id_attrs = [0] + # If no columns to group by are given, we use the first column + if column_indices_to_group_by is None: + column_indices_to_group_by = [0] - group_data_dict: dict[tuple[Any, ...], list[np.ndarray]] = {} - data_len = len(np_data) - for i in range(data_len): - row_id = tuple(np_data[i, group_id_attrs]) - if row_id not in group_data_dict: - group_data_dict[row_id] = [] - group_data_dict[row_id].append(np_data[i]) + grouped_data_dict: defaultdict[tuple[str, str], list[np.ndarray]] = defaultdict(list) + num_rows = len(data_to_be_grouped) + for row in range(num_rows): + row_id = tuple(data_to_be_grouped[row, column_indices_to_group_by]) + grouped_data_dict[row_id].append(data_to_be_grouped[row]) - return group_data_dict + return grouped_data_dict def _get_group_data( - np_data: np.ndarray, - group_id_attrs: list[int] | None = None, + data_to_be_grouped: np.ndarray, + column_indices_to_group_by: list[int] | None = None, ) -> np.ndarray: """ - Group consecutive rows in a numpy array based on specified grouping attributes. - Returns an array of arrays where each sub-array contains rows with identical - values in the grouping columns. + Group CONSECUTIVE rows in a numpy array that share entries across the columns specified in + ``column_indices_to_group_by``. Returns an array of arrays where each sub-array contains full rows sharing + identical values in the grouping columns. Args: - np_data: Numpy array of the data. - group_id_attrs: List of attributes to group by. + data_to_be_grouped: Numpy array of the data to be grouped. + column_indices_to_group_by: List of column indices by which to group the data. Returns: Numpy array of the group data. """ - if group_id_attrs is None: - group_id_attrs = [0] + # If no columns to group by are given, we use the first column + if column_indices_to_group_by is None: + column_indices_to_group_by = [0] - group_data_list = [] - data_len = len(np_data) - i = 0 - while i < data_len: + grouped_data_list = [] + number_of_rows = len(data_to_be_grouped) + row_index = 0 + while row_index < number_of_rows: group = [] - row_id = np_data[i, group_id_attrs] + row_id = data_to_be_grouped[row_index, column_indices_to_group_by] + + while row_index < number_of_rows and _current_id_matches_reference_id( + data_to_be_grouped, row_index, column_indices_to_group_by, row_id + ): + # If this is a consecutive row with the same ID as defined by column_indices_to_group_by, add the full + # row to the grouping. + group.append(data_to_be_grouped[row_index]) + row_index += 1 + grouped_data_list.append(np.array(group)) + return np.array(grouped_data_list, dtype=object) + + +def _current_id_matches_reference_id( + data: np.ndarray, row_index: int, column_indices_to_group_by: list[int], reference_id: np.ndarray +) -> bool: + """ + Determines whether the ``reference_id`` matches the id constructed by extracting the values from data at the + provided column indices in ``column_indices_to_group_by`` and row index in ``row_index``. - # TODO refactor this condition to be more readable/understandable. - while (np_data[i, group_id_attrs] == row_id).all(): - group.append(np_data[i]) - i += 1 - if i >= data_len: - break - group_data_list.append(np.array(group)) + Args: + data: Data from which to extract ID to be compared to ``reference_id`` + row_index: Row from which to extract the data. + column_indices_to_group_by: collection of column indices from which to extract data + reference_id: reference id to be compared to. - return np.array(group_data_list, dtype=object) + Returns: + Boolean as to whether the extracted data matches the reference ID + """ + return (data[row_index, column_indices_to_group_by] == reference_id).all() def _quantile_normalize_sklearn(matrix: np.ndarray) -> np.ndarray: diff --git a/tests/unit/models/clavaddpm/test_clustering.py b/tests/unit/models/clavaddpm/test_clustering.py index f88555dd..150b1566 100644 --- a/tests/unit/models/clavaddpm/test_clustering.py +++ b/tests/unit/models/clavaddpm/test_clustering.py @@ -2,6 +2,8 @@ from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds from midst_toolkit.models.clavaddpm.clustering import ( + _get_group_data, + _get_group_data_dict, _min_max_normalize_sklearn, _quantile_normalize_sklearn, get_normalized_numerical_columns, @@ -81,3 +83,234 @@ def test_get_normalized_numerical_columns() -> None: ) unset_all_random_seeds() + + +def test_get_group_data() -> None: + set_all_random_seeds(42) + data_array_with_one_foreign_keys = np.hstack( + (np.random.randn(10, 3), np.random.randint(0, 3, (10, 1)).astype(float), np.random.randn(10, 1)) + ) + data_array_with_two_foreign_keys = np.hstack( + (np.random.randn(10, 3), np.random.randint(0, 2, (10, 2)).astype(float), np.random.randn(10, 1)) + ) + data_array_with_foreign_key_in_front = np.hstack( + (np.random.randint(0, 2, (10, 1)).astype(float), np.random.randn(10, 3), np.random.randn(10, 1)) + ) + + grouped_data = _get_group_data(data_array_with_one_foreign_keys, [3]) + assert len(grouped_data) == 6 + assert len(grouped_data[0]) == 2 + assert len(grouped_data[1]) == 1 + assert len(grouped_data[2]) == 2 + assert len(grouped_data[5]) == 3 + assert np.allclose( + grouped_data[0], + np.array( + [ + [0.49671415, -0.1382643, 0.64768854, 2.0, 2.77831304], + [1.52302986, -0.23415337, -0.23413696, 2.0, 1.19363972], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[1], + np.array([[1.57921282, 0.76743473, -0.46947439, 0.0, 0.21863832]]), + atol=1e-6, + ) + assert np.allclose( + grouped_data[5], + np.array( + [ + [-0.2257763, 0.0675282, -1.42474819, 1.0, -0.53814166], + [-0.54438272, 0.11092259, -1.15099358, 1.0, -1.3466781], + [0.37569802, -0.60063869, -0.29169375, 1.0, -0.88059127], + ] + ), + atol=1e-6, + ) + + grouped_data = _get_group_data(data_array_with_one_foreign_keys, None) + # Entries are unique in the first dimension which is the default for this function. + assert len(grouped_data) == 10 + + grouped_data = _get_group_data(data_array_with_two_foreign_keys, [3, 4]) + assert len(grouped_data) == 6 + assert len(grouped_data[0]) == 3 + assert len(grouped_data[1]) == 2 + assert len(grouped_data[2]) == 1 + assert len(grouped_data[5]) == 1 + assert np.allclose( + grouped_data[0], + np.array( + [ + [-1.1305523, 0.13442888, 0.58212279, 0.0, 0.0, 0.25711687], + [0.88774846, 0.89433233, 0.7549978, 0.0, 0.0, 0.3145129], + [-0.20716589, -0.62347739, -1.50815329, 0.0, 0.0, 1.37186213], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[1], + np.array( + [ + [1.09964698, -0.17773212, -0.41038331, 1.0, 0.0, 0.17555329], + [1.17971634, -0.89820794, 0.83479542, 1.0, 0.0, -0.30928855], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[2], + np.array([[0.29656138, -1.03782988, -0.07580375, 0.0, 0.0, 0.6731255]]), + atol=1e-6, + ) + + grouped_data = _get_group_data(data_array_with_two_foreign_keys, [3]) + assert len(grouped_data) == 5 + assert len(grouped_data[0]) == 3 + assert len(grouped_data[1]) == 2 + assert len(grouped_data[2]) == 1 + assert len(grouped_data[3]) == 2 + assert np.allclose( + grouped_data[0], + np.array( + [ + [-1.1305523, 0.13442888, 0.58212279, 0.0, 0.0, 0.25711687], + [0.88774846, 0.89433233, 0.7549978, 0.0, 0.0, 0.3145129], + [-0.20716589, -0.62347739, -1.50815329, 0.0, 0.0, 1.37186213], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[1], + np.array( + [ + [1.09964698, -0.17773212, -0.41038331, 1.0, 0.0, 0.17555329], + [1.17971634, -0.89820794, 0.83479542, 1.0, 0.0, -0.30928855], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[3], + np.array( + [ + [0.97296353, 0.79559546, 1.49543425, 1.0, 1.0, -0.25663018], + [0.33818125, 3.37229625, -0.92039081, 1.0, 1.0, -0.36782572], + ] + ), + atol=1e-6, + ) + + grouped_data = _get_group_data(data_array_with_foreign_key_in_front, None) + # Because the first column is non-unique, we get proper groups. + assert len(grouped_data) == 5 + assert len(grouped_data[0]) == 3 + assert len(grouped_data[1]) == 2 + assert len(grouped_data[2]) == 2 + assert len(grouped_data[3]) == 1 + assert np.allclose( + grouped_data[0], + np.array( + [ + [0.0, -0.34271452, -0.80227727, -0.16128571, -1.06230371], + [0.0, 0.40405086, 1.8861859, 0.17457781, 0.47359243], + [0.0, 0.25755039, -0.07444592, -1.91877122, -0.91942423], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[1], + np.array( + [ + [1.0, -0.02651388, 0.06023021, 2.46324211, 1.54993441], + [1.0, -0.19236096, 0.30154734, -0.03471177, -0.78325329], + ] + ), + atol=1e-6, + ) + assert np.allclose( + grouped_data[3], + np.array( + [ + [1.0, -1.40185106, 0.58685709, 2.19045563, -1.23086432], + ] + ), + atol=1e-6, + ) + unset_all_random_seeds() + + +def test_get_group_data_dict() -> None: + set_all_random_seeds(42) + data_array_with_one_foreign_keys = np.hstack( + (np.random.randn(10, 3), np.random.randint(0, 3, (10, 1)).astype(float), np.random.randn(10, 1)) + ) + data_array_with_two_foreign_keys = np.hstack( + (np.random.randn(10, 3), np.random.randint(0, 2, (10, 2)).astype(float), np.random.randn(10, 1)) + ) + data_array_with_foreign_key_in_front = np.hstack( + (np.random.randint(0, 2, (10, 1)).astype(float), np.random.randn(10, 3), np.random.randn(10, 1)) + ) + + grouped_data = _get_group_data_dict(data_array_with_one_foreign_keys, [3]) + assert len(grouped_data) == 3 + assert len(grouped_data[(2.0,)]) == 4 + assert len(grouped_data[(0.0,)]) == 2 + assert np.allclose( + grouped_data[(0.0,)][0], np.array([1.57921282, 0.76743473, -0.46947439, 0.0, 0.21863832]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(0.0,)][1], np.array([-0.90802408, -1.4123037, 1.46564877, 0.0, 0.77370042]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(2.0,)][1], np.array([1.52302986, -0.23415337, -0.23413696, 2.0, 1.19363972]), atol=1e-6 + ) + + grouped_data = _get_group_data_dict(data_array_with_one_foreign_keys, None) + # Entries are unique in the first dimension which is the default for this function. + assert len(grouped_data) == 10 + + grouped_data = _get_group_data_dict(data_array_with_two_foreign_keys, [3, 4]) + assert len(grouped_data) == 4 + assert len(grouped_data[(0.0, 0.0)]) == 5 + assert len(grouped_data[(1.0, 0.0)]) == 2 + assert len(grouped_data[(0.0, 1.0)]) == 1 + assert len(grouped_data[(1.0, 1.0)]) == 2 + assert np.allclose( + grouped_data[(0.0, 1.0)][0], np.array([-0.39863839, -0.06086409, -1.41875046, 0.0, 1.0, 1.27373362]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(1.0, 1.0)][0], np.array([0.97296353, 0.79559546, 1.49543425, 1.0, 1.0, -0.25663018]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(1.0, 1.0)][1], np.array([0.33818125, 3.37229625, -0.92039081, 1.0, 1.0, -0.36782572]), atol=1e-6 + ) + + grouped_data = _get_group_data_dict(data_array_with_two_foreign_keys, [3]) + assert len(grouped_data) == 2 + assert len(grouped_data[(0.0,)]) == 6 + assert len(grouped_data[(1.0,)]) == 4 + assert np.allclose( + grouped_data[(0.0,)][0], np.array([-1.1305523, 0.13442888, 0.58212279, 0.0, 0.0, 0.25711687]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(0.0,)][2], np.array([-0.20716589, -0.62347739, -1.50815329, 0.0, 0.0, 1.37186213]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(1.0,)][0], np.array([1.09964698, -0.17773212, -0.41038331, 1.0, 0.0, 0.17555329]), atol=1e-6 + ) + assert np.allclose( + grouped_data[(1.0,)][3], np.array([0.33818125, 3.37229625, -0.92039081, 1.0, 1.0, -0.36782572]), atol=1e-6 + ) + + grouped_data = _get_group_data_dict(data_array_with_foreign_key_in_front, None) + # Because the first column is non-unique, we get proper groups. + assert len(grouped_data) == 2 + assert len(grouped_data[(0.0,)]) == 7 + assert len(grouped_data[(1.0,)]) == 3 + unset_all_random_seeds() From 5e6777617d747a9323b0776f02f66485f7f677a4 Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Tue, 4 Nov 2025 12:18:10 -0500 Subject: [PATCH 09/11] Edits to merge in the code that Marcelo refactored --- .../models/clavaddpm/clustering.py | 20 +- .../unit/models/clavaddpm/test_clustering.py | 213 ++++-------------- 2 files changed, 52 insertions(+), 181 deletions(-) diff --git a/src/midst_toolkit/models/clavaddpm/clustering.py b/src/midst_toolkit/models/clavaddpm/clustering.py index 95bbf910..e26195cf 100644 --- a/src/midst_toolkit/models/clavaddpm/clustering.py +++ b/src/midst_toolkit/models/clavaddpm/clustering.py @@ -233,7 +233,7 @@ def _pair_clustering( cluster_labels = _get_cluster_labels(cluster_data, clustering_method, num_clusters) - child_group_data = get_group_data_by_id(sorted_child_data, foreign_key_index) + child_group_data = group_data_by_id(sorted_child_data, foreign_key_index, sort_by_column_value=True) child_group_lengths = np.array([len(group) for group in child_group_data], dtype=int) if clustering_method == ClusteringMethod.VARIATIONAL: @@ -682,8 +682,8 @@ def group_data_by_group_id_as_dict( column_index_to_group_by: List of column indices by which to group the data. Returns: - Dictionary of group data where the keys are are values from the column to group by and the values - are a list of full ROWS from the ``data_to_be_grouped`` where the specified columns are shared values + Dictionary of group data where the keys are values from the column to group by and the values + are a list of full ROWS from the ``data_to_be_grouped`` where the specified column value is shared. """ grouped_data_dict: defaultdict[int, list[np.ndarray]] = defaultdict(list) num_rows = len(data_to_be_grouped) @@ -694,7 +694,9 @@ def group_data_by_group_id_as_dict( return grouped_data_dict -def get_group_data_by_id(data_to_be_grouped: np.ndarray, column_index_to_group_by: int) -> np.ndarray: +def group_data_by_id( + data_to_be_grouped: np.ndarray, column_index_to_group_by: int, sort_by_column_value: bool = False +) -> np.ndarray: """ Group rows in a numpy array that share entries in the column specified by ``column_index_to_group_by``. Returns an array of arrays where each sub-array contains full rows sharing identical values in the grouping column. @@ -702,12 +704,18 @@ def get_group_data_by_id(data_to_be_grouped: np.ndarray, column_index_to_group_b Args: data_to_be_grouped: Numpy array of the data to be grouped. column_index_to_group_by: List of column indices by which to group the data. + sort_by_column_value: Whether or not the returned groups are sorted by the entries in the column of + ``column_index_to_group_by``. Defaults to False. Returns: - Numpy array of the data ordered by group id. + Numpy array of the data grouped by entries in column with index ``column_index_to_group_by``. """ grouped_data_by_group_id = group_data_by_group_id_as_dict(data_to_be_grouped, column_index_to_group_by) - grouped_data_list = [np.array(group_data) for group_data in grouped_data_by_group_id.values()] + if sort_by_column_value: + grouped_data = [(key, np.array(group_data)) for key, group_data in grouped_data_by_group_id.items()] + grouped_data_list = [data for _, data in sorted(grouped_data)] + else: + grouped_data_list = [np.array(group_data) for group_data in grouped_data_by_group_id.values()] return np.array(grouped_data_list, dtype=object) diff --git a/tests/unit/models/clavaddpm/test_clustering.py b/tests/unit/models/clavaddpm/test_clustering.py index 150b1566..8d684796 100644 --- a/tests/unit/models/clavaddpm/test_clustering.py +++ b/tests/unit/models/clavaddpm/test_clustering.py @@ -2,11 +2,11 @@ from midst_toolkit.common.random import set_all_random_seeds, unset_all_random_seeds from midst_toolkit.models.clavaddpm.clustering import ( - _get_group_data, - _get_group_data_dict, _min_max_normalize_sklearn, _quantile_normalize_sklearn, get_normalized_numerical_columns, + group_data_by_group_id_as_dict, + group_data_by_id, ) from midst_toolkit.models.clavaddpm.enumerations import DataAndKeyNormalizationType @@ -85,159 +85,66 @@ def test_get_normalized_numerical_columns() -> None: unset_all_random_seeds() -def test_get_group_data() -> None: +def test_group_data_by_id() -> None: set_all_random_seeds(42) data_array_with_one_foreign_keys = np.hstack( (np.random.randn(10, 3), np.random.randint(0, 3, (10, 1)).astype(float), np.random.randn(10, 1)) ) - data_array_with_two_foreign_keys = np.hstack( - (np.random.randn(10, 3), np.random.randint(0, 2, (10, 2)).astype(float), np.random.randn(10, 1)) - ) data_array_with_foreign_key_in_front = np.hstack( (np.random.randint(0, 2, (10, 1)).astype(float), np.random.randn(10, 3), np.random.randn(10, 1)) ) - grouped_data = _get_group_data(data_array_with_one_foreign_keys, [3]) - assert len(grouped_data) == 6 - assert len(grouped_data[0]) == 2 - assert len(grouped_data[1]) == 1 - assert len(grouped_data[2]) == 2 - assert len(grouped_data[5]) == 3 + grouped_data = group_data_by_id(data_array_with_one_foreign_keys, 3) + assert len(grouped_data) == 3 + assert len(grouped_data[0]) == 4 + assert len(grouped_data[1]) == 2 + assert len(grouped_data[2]) == 4 assert np.allclose( grouped_data[0], np.array( [ [0.49671415, -0.1382643, 0.64768854, 2.0, 2.77831304], [1.52302986, -0.23415337, -0.23413696, 2.0, 1.19363972], + [0.54256004, -0.46341769, -0.46572975, 2.0, 0.88176104], + [0.24196227, -1.91328024, -1.72491783, 2.0, -1.00908534], ] ), atol=1e-6, ) assert np.allclose( grouped_data[1], - np.array([[1.57921282, 0.76743473, -0.46947439, 0.0, 0.21863832]]), - atol=1e-6, - ) - assert np.allclose( - grouped_data[5], np.array( [ - [-0.2257763, 0.0675282, -1.42474819, 1.0, -0.53814166], - [-0.54438272, 0.11092259, -1.15099358, 1.0, -1.3466781], - [0.37569802, -0.60063869, -0.29169375, 1.0, -0.88059127], - ] + [1.57921282, 0.76743473, -0.46947439, 0.0, 0.21863832], + [-0.90802408, -1.4123037, 1.46564877, 0.0, 0.77370042], + ], ), atol=1e-6, ) - grouped_data = _get_group_data(data_array_with_one_foreign_keys, None) - # Entries are unique in the first dimension which is the default for this function. - assert len(grouped_data) == 10 - - grouped_data = _get_group_data(data_array_with_two_foreign_keys, [3, 4]) - assert len(grouped_data) == 6 - assert len(grouped_data[0]) == 3 - assert len(grouped_data[1]) == 2 - assert len(grouped_data[2]) == 1 - assert len(grouped_data[5]) == 1 - assert np.allclose( - grouped_data[0], - np.array( - [ - [-1.1305523, 0.13442888, 0.58212279, 0.0, 0.0, 0.25711687], - [0.88774846, 0.89433233, 0.7549978, 0.0, 0.0, 0.3145129], - [-0.20716589, -0.62347739, -1.50815329, 0.0, 0.0, 1.37186213], - ] - ), - atol=1e-6, - ) - assert np.allclose( - grouped_data[1], - np.array( - [ - [1.09964698, -0.17773212, -0.41038331, 1.0, 0.0, 0.17555329], - [1.17971634, -0.89820794, 0.83479542, 1.0, 0.0, -0.30928855], - ] - ), - atol=1e-6, - ) - assert np.allclose( - grouped_data[2], - np.array([[0.29656138, -1.03782988, -0.07580375, 0.0, 0.0, 0.6731255]]), - atol=1e-6, - ) - - grouped_data = _get_group_data(data_array_with_two_foreign_keys, [3]) - assert len(grouped_data) == 5 - assert len(grouped_data[0]) == 3 - assert len(grouped_data[1]) == 2 - assert len(grouped_data[2]) == 1 - assert len(grouped_data[3]) == 2 - assert np.allclose( - grouped_data[0], - np.array( - [ - [-1.1305523, 0.13442888, 0.58212279, 0.0, 0.0, 0.25711687], - [0.88774846, 0.89433233, 0.7549978, 0.0, 0.0, 0.3145129], - [-0.20716589, -0.62347739, -1.50815329, 0.0, 0.0, 1.37186213], - ] - ), - atol=1e-6, - ) - assert np.allclose( - grouped_data[1], - np.array( - [ - [1.09964698, -0.17773212, -0.41038331, 1.0, 0.0, 0.17555329], - [1.17971634, -0.89820794, 0.83479542, 1.0, 0.0, -0.30928855], - ] - ), - atol=1e-6, - ) - assert np.allclose( - grouped_data[3], - np.array( - [ - [0.97296353, 0.79559546, 1.49543425, 1.0, 1.0, -0.25663018], - [0.33818125, 3.37229625, -0.92039081, 1.0, 1.0, -0.36782572], - ] - ), - atol=1e-6, - ) - - grouped_data = _get_group_data(data_array_with_foreign_key_in_front, None) + grouped_data = group_data_by_id(data_array_with_foreign_key_in_front, 0, sort_by_column_value=True) # Because the first column is non-unique, we get proper groups. - assert len(grouped_data) == 5 - assert len(grouped_data[0]) == 3 - assert len(grouped_data[1]) == 2 - assert len(grouped_data[2]) == 2 - assert len(grouped_data[3]) == 1 - assert np.allclose( - grouped_data[0], - np.array( - [ - [0.0, -0.34271452, -0.80227727, -0.16128571, -1.06230371], - [0.0, 0.40405086, 1.8861859, 0.17457781, 0.47359243], - [0.0, 0.25755039, -0.07444592, -1.91877122, -0.91942423], - ] - ), - atol=1e-6, - ) + assert len(grouped_data) == 2 + assert len(grouped_data[0]) == 9 + assert len(grouped_data[1]) == 1 assert np.allclose( grouped_data[1], - np.array( - [ - [1.0, -0.02651388, 0.06023021, 2.46324211, 1.54993441], - [1.0, -0.19236096, 0.30154734, -0.03471177, -0.78325329], - ] - ), + np.array([[1.0, -0.676922, 0.61167629, 1.03099952, 1.47789404]]), atol=1e-6, ) assert np.allclose( - grouped_data[3], + grouped_data[0], np.array( [ - [1.0, -1.40185106, 0.58685709, 2.19045563, -1.23086432], + [0.0, 0.93128012, -0.83921752, -0.30921238, -0.51827022], + [0.0, 0.33126343, 0.97554513, -0.47917424, -0.8084936], + [0.0, -0.18565898, -1.10633497, -1.19620662, -0.50175704], + [0.0, 0.81252582, 1.35624003, -0.07201012, 0.91540212], + [0.0, 1.0035329, 0.36163603, -0.64511975, 0.32875111], + [0.0, 0.36139561, 1.53803657, -0.03582604, -0.5297602], + [0.0, 1.56464366, -2.6197451, 0.8219025, 0.51326743], + [0.0, 0.08704707, -0.29900735, 0.09176078, 0.09707755], + [0.0, -1.98756891, -0.21967189, 0.35711257, 0.96864499], ] ), atol=1e-6, @@ -245,72 +152,28 @@ def test_get_group_data() -> None: unset_all_random_seeds() -def test_get_group_data_dict() -> None: +def test_group_data_by_group_id_as_dict() -> None: set_all_random_seeds(42) data_array_with_one_foreign_keys = np.hstack( (np.random.randn(10, 3), np.random.randint(0, 3, (10, 1)).astype(float), np.random.randn(10, 1)) ) - data_array_with_two_foreign_keys = np.hstack( - (np.random.randn(10, 3), np.random.randint(0, 2, (10, 2)).astype(float), np.random.randn(10, 1)) - ) data_array_with_foreign_key_in_front = np.hstack( (np.random.randint(0, 2, (10, 1)).astype(float), np.random.randn(10, 3), np.random.randn(10, 1)) ) - grouped_data = _get_group_data_dict(data_array_with_one_foreign_keys, [3]) + grouped_data = group_data_by_group_id_as_dict(data_array_with_one_foreign_keys, 3) assert len(grouped_data) == 3 - assert len(grouped_data[(2.0,)]) == 4 - assert len(grouped_data[(0.0,)]) == 2 - assert np.allclose( - grouped_data[(0.0,)][0], np.array([1.57921282, 0.76743473, -0.46947439, 0.0, 0.21863832]), atol=1e-6 - ) - assert np.allclose( - grouped_data[(0.0,)][1], np.array([-0.90802408, -1.4123037, 1.46564877, 0.0, 0.77370042]), atol=1e-6 - ) - assert np.allclose( - grouped_data[(2.0,)][1], np.array([1.52302986, -0.23415337, -0.23413696, 2.0, 1.19363972]), atol=1e-6 - ) - - grouped_data = _get_group_data_dict(data_array_with_one_foreign_keys, None) - # Entries are unique in the first dimension which is the default for this function. - assert len(grouped_data) == 10 - - grouped_data = _get_group_data_dict(data_array_with_two_foreign_keys, [3, 4]) - assert len(grouped_data) == 4 - assert len(grouped_data[(0.0, 0.0)]) == 5 - assert len(grouped_data[(1.0, 0.0)]) == 2 - assert len(grouped_data[(0.0, 1.0)]) == 1 - assert len(grouped_data[(1.0, 1.0)]) == 2 - assert np.allclose( - grouped_data[(0.0, 1.0)][0], np.array([-0.39863839, -0.06086409, -1.41875046, 0.0, 1.0, 1.27373362]), atol=1e-6 - ) - assert np.allclose( - grouped_data[(1.0, 1.0)][0], np.array([0.97296353, 0.79559546, 1.49543425, 1.0, 1.0, -0.25663018]), atol=1e-6 - ) - assert np.allclose( - grouped_data[(1.0, 1.0)][1], np.array([0.33818125, 3.37229625, -0.92039081, 1.0, 1.0, -0.36782572]), atol=1e-6 - ) - - grouped_data = _get_group_data_dict(data_array_with_two_foreign_keys, [3]) - assert len(grouped_data) == 2 - assert len(grouped_data[(0.0,)]) == 6 - assert len(grouped_data[(1.0,)]) == 4 - assert np.allclose( - grouped_data[(0.0,)][0], np.array([-1.1305523, 0.13442888, 0.58212279, 0.0, 0.0, 0.25711687]), atol=1e-6 - ) - assert np.allclose( - grouped_data[(0.0,)][2], np.array([-0.20716589, -0.62347739, -1.50815329, 0.0, 0.0, 1.37186213]), atol=1e-6 - ) - assert np.allclose( - grouped_data[(1.0,)][0], np.array([1.09964698, -0.17773212, -0.41038331, 1.0, 0.0, 0.17555329]), atol=1e-6 - ) + assert len(grouped_data[2]) == 4 + assert len(grouped_data[0]) == 2 + assert np.allclose(grouped_data[0][0], np.array([1.57921282, 0.76743473, -0.46947439, 0.0, 0.21863832]), atol=1e-6) + assert np.allclose(grouped_data[0][1], np.array([-0.90802408, -1.4123037, 1.46564877, 0.0, 0.77370042]), atol=1e-6) assert np.allclose( - grouped_data[(1.0,)][3], np.array([0.33818125, 3.37229625, -0.92039081, 1.0, 1.0, -0.36782572]), atol=1e-6 + grouped_data[2][1], np.array([1.52302986, -0.23415337, -0.23413696, 2.0, 1.19363972]), atol=1e-6 ) - grouped_data = _get_group_data_dict(data_array_with_foreign_key_in_front, None) + grouped_data = group_data_by_group_id_as_dict(data_array_with_foreign_key_in_front, 0) # Because the first column is non-unique, we get proper groups. assert len(grouped_data) == 2 - assert len(grouped_data[(0.0,)]) == 7 - assert len(grouped_data[(1.0,)]) == 3 + assert len(grouped_data[0]) == 9 + assert len(grouped_data[1]) == 1 unset_all_random_seeds() From afc3d652417267c1ae144f39b92f6eef37c8461c Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Tue, 4 Nov 2025 14:10:58 -0500 Subject: [PATCH 10/11] Some edits associated with Marcelo's PR comments --- src/midst_toolkit/models/clavaddpm/clustering.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/midst_toolkit/models/clavaddpm/clustering.py b/src/midst_toolkit/models/clavaddpm/clustering.py index e26195cf..9153eba9 100644 --- a/src/midst_toolkit/models/clavaddpm/clustering.py +++ b/src/midst_toolkit/models/clavaddpm/clustering.py @@ -679,7 +679,7 @@ def group_data_by_group_id_as_dict( Args: data_to_be_grouped: Numpy array of the data to be grouped. - column_index_to_group_by: List of column indices by which to group the data. + column_index_to_group_by: Column index by which the data should be grouped. Returns: Dictionary of group data where the keys are values from the column to group by and the values @@ -698,17 +698,17 @@ def group_data_by_id( data_to_be_grouped: np.ndarray, column_index_to_group_by: int, sort_by_column_value: bool = False ) -> np.ndarray: """ - Group rows in a numpy array that share entries in the column specified by ``column_index_to_group_by``. + Group rows in a numpy array that share values in the column specified by ``column_index_to_group_by``. Returns an array of arrays where each sub-array contains full rows sharing identical values in the grouping column. Args: data_to_be_grouped: Numpy array of the data to be grouped. - column_index_to_group_by: List of column indices by which to group the data. - sort_by_column_value: Whether or not the returned groups are sorted by the entries in the column of + column_index_to_group_by: Column index by which the data should be grouped. + sort_by_column_value: Whether or not the returned groups are sorted by the values in the column the index ``column_index_to_group_by``. Defaults to False. Returns: - Numpy array of the data grouped by entries in column with index ``column_index_to_group_by``. + Numpy array of the data grouped by values in the column with index ``column_index_to_group_by``. """ grouped_data_by_group_id = group_data_by_group_id_as_dict(data_to_be_grouped, column_index_to_group_by) if sort_by_column_value: From b921d90d54d298784b7a4fedd941f7b02ce0980c Mon Sep 17 00:00:00 2001 From: David Emerson <43939939+emersodb@users.noreply.github.com> Date: Thu, 6 Nov 2025 11:27:08 -0500 Subject: [PATCH 11/11] An update associated with a comment via Coderabbit --- src/midst_toolkit/models/clavaddpm/clustering.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/midst_toolkit/models/clavaddpm/clustering.py b/src/midst_toolkit/models/clavaddpm/clustering.py index 9153eba9..09fd4963 100644 --- a/src/midst_toolkit/models/clavaddpm/clustering.py +++ b/src/midst_toolkit/models/clavaddpm/clustering.py @@ -708,7 +708,8 @@ def group_data_by_id( ``column_index_to_group_by``. Defaults to False. Returns: - Numpy array of the data grouped by values in the column with index ``column_index_to_group_by``. + Numpy array of the data grouped by values in the column with index ``column_index_to_group_by``. The returned + array has dtype=object since groups may have different lengths. """ grouped_data_by_group_id = group_data_by_group_id_as_dict(data_to_be_grouped, column_index_to_group_by) if sort_by_column_value: