Skip to content

madgik/MiniMip

Repository files navigation

Algorithm Development in MiniMip

This document describes the functionality of MiniMip for developing Federated Functions in a lightweight and developer-friendly environment.

MiniMip is a component of the ExaFlow system and is designed to enable rapid development of Federated Functions without the overhead of installing, configuring, and running the full ExaFlow infrastructure. It is intended for experimentation, prototyping, and algorithm development.


Installation

To install and run MiniMip, the following prerequisites are required:

  • Python 3.11.9 or newer
  • Poetry for dependency and environment management

Once the prerequisites are installed, the project dependencies can be managed and installed using Poetry.

Installation steps and commands will be documented here.


Project Structure

The project is organized as follows:

  • mini_mip_system/
    Contains the core system components required for executing and managing federated computations.

  • library/
    Contains statistical and machine learning libraries developed within the scope of the project.

    • utils/
      Provides the core interfaces and utilities used by clients to perform federated aggregations.
      Depending on the input data format, the following aggregator implementations are available:

      • numpy_aggregator for NumPy-based data
      • pandas_aggregator for Pandas DataFrame-based data
    • under_development/
      Contains experimental algorithms or algorithms under active development.
      These implementations are not yet considered stable.

    • metrics/
      Includes evaluation and validation metrics used to verify the correctness and consistency of federated functions and algorithms.

    • core/
      Defines the base classes (templates) used for implementing federated algorithms.
      The main base classes are:

      • statistical_function
        Template for implementing federated statistical functions.
        Each statistical function must implement the compute() method.

      • statistical_model
        Template for implementing federated statistical models.
        Provides fit() for federated training and predict() for inference.

  • grizzly/
    Contains a transpiler that converts Pandas aggregation functions into equivalent SQL queries and executes them as SQL.
    This component is currently under development.

  • tests_and_experiments/
    Used for writing tests and experiments to validate the correctness and behavior of the implemented code.


Aggregator API

Aggregators provide a high-level, client-side interface for executing common federated operations by communicating with a central aggregation server.

The NumPy Aggregator (numpy_aggregator) supports the following operations:

  • fed_union(categories)
    Computes the union of categorical values across all clients.

  • fed_sum(array)
    Computes the element-wise federated sum across clients.

  • fed_avg(array)
    Computes the element-wise federated average across clients.

  • fed_weighted_avg(array, weight)
    Computes a weighted federated average using client-provided weights (e.g. sample counts).

  • global_sum(array)
    Performs a local reduction along axis=0 and then computes the federated sum.

  • global_avg(array)
    Computes a federated average using locally reduced sums and global sample counts.

  • global_count(array)
    Returns the total number of samples across all clients.

  • global_min(array)
    Computes the element-wise federated minimum after local reduction.

  • global_max(array)
    Computes the element-wise federated maximum after local reduction.


Algorithmic Development

Algorithm development in MiniMip relies on high-level Aggregators.
These abstractions remove the complexity of client–server communication and allow developers to focus exclusively on algorithm logic.


Federated Statistical Algorithms

Federated statistical algorithms are implemented by replacing classical aggregation functions with federated equivalents.

Below is an example implementation of federated Variance and Standard Deviation, both relying on the global_avg and global_sum aggregator operations.

Note:
The appropriate Aggregator interface must be used depending on the data type (numpy_aggregator vs pandas_aggregator).
The example below uses NumPy arrays.

class Variance(StatisticalFunction):
    def compute(self, x: np.ndarray, *, ddof=0):
        if not isinstance(x, np.ndarray):
            raise TypeError("Input must be a numpy array")

        if x.ndim != 1:
            raise ValueError("Input must be a 1D array")

        agg = NumpyAggregator(self.client)
        n = agg.global_count(x)
        
        mean = agg.global_avg(x)
        sum_squared_diff = agg.global_sum((x - mean) ** 2)

        return sum_squared_diff / (n - ddof)


class StandardDeviation(StatisticalFunction):
    def compute(self, x: np.ndarray, *, ddof=0):
        variance = Variance(self.client).compute(x, ddof=ddof)
        return np.sqrt(variance)

Federated Models

Federated statistical models follow a two-phase training cycle:

  1. Local Training Phase Each client trains a local model using its private data.

  2. Aggregation Phase Model parameters are aggregated across clients using federated aggregation functions.

Below is a simplified example of a Federated Linear Regression model.

class FederatedLinearRegression(StatisticalModel):

    def __init__(self, client):
        self.client = client
        self.agg = NumpyAggregator(client)
        self.coef_ = None
        self.intercept_ = None

    def fit(self, X: np.ndarray, y: np.ndarray):
        """
        Local training phase executed independently on each client.
        """
        # Local closed-form solution
        X_bias = np.c_[np.ones(len(X)), X]
        params = np.linalg.inv(X_bias.T @ X_bias) @ X_bias.T @ y

        self.intercept_ = params[0]
        self.coef_ = params[1:]

        # Number of local samples
        n = len(X)

        # Aggregation phase
        self.intercept_ = self.agg.fed_weighted_avg(self.intercept_, n)
        self.coef_ = self.agg.fed_weighted_avg(self.coef_, n)

        return self

    def predict(self, X: np.ndarray):
        """
        Inference using globally aggregated parameters.
        """
        return X @ self.coef_ + self.intercept_

Development Guidelines

When implementing federated statistical models and algorithms, developers should reuse the existing federated operations provided by the NumpyAggregator whenever possible.
This approach ensures consistency, correctness, and efficiency, while avoiding unnecessary duplication of functionality.

Reuse Existing Federated Functions

  • Prefer using the federated functions already available in NumpyAggregator to perform aggregations.
  • Avoid introducing custom federated aggregation functions unless strictly necessary.
  • Reusing existing abstractions reduces complexity and helps maintain compatibility across algorithms.

Ordering and Ranking of Elements

If an algorithm requires ordering, ranking, or comparisons between elements, direct sorting across clients should be avoided.

Instead, consider histogram-based approaches, which can be safely aggregated in a federated manner.

  • Histograms allow approximate ordering without exposing raw data.
  • They are particularly useful for rank-based statistical tests.

Example:
The Mann–Whitney U test uses histogram-based techniques to compare distributions across clients without requiring direct access to individual data points.

By following these guidelines, federated models remain scalable, privacy-preserving, and aligned with the design principles of MiniMip.

Algorithmic Testing

To verify the correctness of an algorithm, we can create custom tests in the tests_and_experiments folder.
This folder is used for testing the algorithms and models we have developed.

It contains the following subfolders:

  • core/ – Contains base classes like PartitionedPandasTable and FederationTestTemplate, which serve as templates for creating tests.
  • datasets/ – Stores instances of PartitionedPandasTable.
  • library_tests/ – Contains the tests we create for specific algorithms or models.

Core

PartitionedPandasTable

PartitionedPandasTable is an abstract base class for managing partitioned Pandas datasets. It is intended for testing and experimentation purposes only.

  • It initializes a global dataset once via the abstract get_dataset() method. This method should return a Pandas Data frame that corresponds to the actual dataset.
  • The get_local_dataset(partition_id, num_partitions) method returns a data partition for a specific client. If num_partitions is 1, the full dataset is returned. Otherwise, the dataset is evenly split, with the last partition handling any remainder.
  • The get_global_dataset() method returns the full, unpartitioned dataset. This class is intended for federated testing scenarios.

FederationTestTemplate

The FederationTestTemplate is an abstract base class for testing federated computations using partitioned Pandas datasets. It initializes one or more simulated federated clients and splits the dataset into local and global versions.

  • The class executes both federated and centralized versions of an algorithm or model and provides an abstract method to compare the outputs.

Key abstract methods to implement in subclasses:

  • federated_computation(local_dataset)
  • centralized_computation(global_dataset)
  • compare(federated_output, global_output)

This template is intended for testing and experimentation purposes only.

Datasets

Contains various datasets created using the PartitionedPandasTable template.
For example, the TitanicDataset takes a Pandas DataFrame as input and allows it to be accessed in either federated (partitioned) or global form.

class TitanicDataset(PartitionedPandasTable):

    def get_dataset(self) -> pd.DataFrame:
        url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
        df = pd.read_csv(url)
        return df

Library Tests

To create a Library Test, you need to implement the abstract class FederationTestTemplate and define the following methods:

  • federated_computation(local_dataset)
    This method applies an algorithm or model in federated mode. It receives the local datasets for each client as input and may include preprocessing steps to bring the data into the appropriate format.
  • centralized_computation(global_dataset)
    This method applies an algorithm or model on a single node. It receives the global dataset as input, which is equivalent to the union of all local datasets. Preprocessing steps may also be applied here.
  • compare(federated_output, global_output)
    This method takes the outputs of the previous two methods and compares their results to verify correctness.

For example, the following statistical test computes the Standard Deviation for both federated and centralized computations and prints their values in the compare method without performing any actual comparison.

class StatisticsTest(FederationTestTemplate):
    def federated_computation(self, local_dataset):
        std = StandardDeviation(self.client).compute(
            local_dataset['sepal length (cm)'].values, ddof=1
        )
        return std

    def centralized_computation(self, centralized_dataset):
        attr1 = centralized_dataset['sepal length (cm)']        
        std = attr1.std(ddof=1)
        return std

    def compare(self, federated_output, global_output):
        print('Federated output:', federated_output)
        print('Centralized output:', global_output)
  • To run a test, first start the server, which expects input from 2 clients:
import asyncio
from mini_mip_system.server.grpc_agg_server import serve

if __name__ == "__main__":
    asyncio.run(serve(available_clients=2))
  • Then start client 0 and client 1, passing the dataset on which the test will run:
from tests_and_experiments.library_tests.statistics.statistics_test import StatisticsTest
from tests_and_experiments.datasets.iris import IrisDataset

# Client 0
StatisticsTest(0, 2, dataset=IrisDataset(), operation_id=10)
from tests_and_experiments.library_tests.statistics.statistics_test import StatisticsTest
from tests_and_experiments.datasets.iris import IrisDataset

# Client 1
StatisticsTest(1, 2, dataset=IrisDataset(), operation_id=10)

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages