# Importing Libraries
import copy
import math
import torch
from abc import ABC, abstractmethod
from typing import Literal, Union, Callable, Tuple, Optional, Dict, Any
from sklearn.model_selection import train_test_split
from ..attacker_models.ANN import simpleDenseModel
from ..utils.datacreator import dataCreator
from ..utils.losses import ModifiedBCELoss, ModifiedMSELoss
from ..utils import config
# ============================================================================
# BASE CLASS
# ============================================================================
[docs]class BasePredictabilityMetric(ABC):
"""
Base class for predictability metrics.
It provides base functionality for computing leakage with fairness evaluation.
"""
# ============================================================================
# INITIALIAL SETUP
# ============================================================================
[docs] def __init__(
self,
model_params: Dict[str, torch.nn.Module],
train_params: Dict[str, Any],
eval_metric: Union[Callable, str] = config.DEFAULT_EVAL_METRIC,
threshold: bool = True,
normalized: bool = False,
test_size=config.DEFAULT_TEST_SIZE,
):
self.model_params = model_params
self.train_params = train_params
self.threshold = threshold
self.normalized = normalized
self.test_size = test_size
self.loss_functions = {
"mse": torch.nn.MSELoss(),
"cross-entropy": torch.nn.CrossEntropyLoss(),
"bce": torch.nn.BCELoss(),
}
self.eval_functions = {
"accuracy": lambda y_pred, y: (y_pred == y).float().mean(),
"mse": ModifiedMSELoss,
"bce": ModifiedBCELoss,
}
self.initialize_eval_metrics(eval_metric)
self.defineModel()
# ============================================================================
# TRAINING METHODS
# ============================================================================
[docs] def train(self, x: torch.tensor, y: torch.tensor, attacker_mode: str) -> torch.tensor:
"""
Trains the attacker model for a given mode (eg. AtoT, TtoA).
Parameters
----------
x : torch.tensor
Input data.
y : torch.tensor
Target data.
attacker_mode : str
Mode of the attacker model.
Returns
-------
None
Trains the attacker model for a given mode.
"""
attacker_model, optimizer, criterion = self.train_setup(attacker_mode)
print(f"Training Activated for Mode: {attacker_mode}")
# Training loop
for epoch in range(1, self.train_params["epochs"] + 1):
x, y = self.shuffle_data(x, y)
avg_loss = self.train_epochs(attacker_model, optimizer, criterion, x, y)
if epoch % config.EPOCH_LOG_INTERVAL == 0:
print(f"\rCurrent Epoch {epoch}: Loss = {avg_loss}", end="")
print("\nModel training completed!!")
[docs] def train_setup(
self, attacker_mode: str
) -> Tuple[torch.nn.Module, torch.nn.Module, torch.optim.Optimizer]:
"""
Setup model, criterion and optimizer for training.
Parameters
----------
attacker_mode : str
Mode identifier for the attacker model
Returns
-------
Tuple[Attacker Model, loss criterion, and optimizer]
"""
self.defineModel()
attacker_model = getattr(self, "attacker_" + attacker_mode)
optimizer = torch.optim.Adam(
attacker_model.parameters(), lr=self.train_params["learning_rate"]
)
criterion = self.loss_functions[self.train_params["loss_function"]]
return attacker_model, optimizer, criterion
[docs] def train_batch_with_loss(
self,
attacker_model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
criterion: torch.nn.Module,
x_batch: torch.tensor,
y_batch: torch.tensor,
) -> float:
"""
Trains a batch of data with a given loss function.
Parameters
----------
attacker_model : torch.nn.Module
The attacker model to be trained.
optimizer : torch.optim.Optimizer
The optimizer to be used.
criterion : torch.nn.Module
The loss function to be used.
x_batch : torch.tensor
The input data batch.
y_batch : torch.tensor
The target data batch.
Returns
-------
loss : float
The loss value.
"""
optimizer.zero_grad()
# Forward pass
outputs = attacker_model(x_batch)
loss = criterion(outputs, y_batch)
# Backward pass and optimization
loss.backward()
optimizer.step()
return loss.item()
[docs] def train_epochs(
self,
attacker_model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
criterion: torch.nn.Module,
x: torch.tensor,
y: torch.tensor,
) -> float:
"""
Trains the attacker model for an epoch and returns the average loss.
Parameters
----------
attacker_model : torch.nn.Module
The attacker model to be trained.
optimizer : torch.optim.Optimizer
The optimizer to be used.
criterion : torch.nn.Module
The loss function to be used.
x : torch.tensor
The input data.
y : torch.tensor
The target data.
Returns
-------
avg_loss : float
The average loss.
"""
batches = math.ceil(len(x) / self.train_params["batch_size"])
running_loss = 0.0
for batch_num in range(batches):
start = batch_num * self.train_params["batch_size"]
end = start + self.train_params["batch_size"]
x_batch = x[start:end]
y_batch = y[start:end]
loss = self.train_batch_with_loss(
attacker_model, optimizer, criterion, x_batch, y_batch
)
running_loss += loss
return running_loss / batches
# ============================================================================
# DATA MANIPULATION METHODS
# ============================================================================
[docs] def shuffle_data(self, x: torch.tensor, y: torch.tensor) -> Tuple[torch.tensor, torch.tensor]:
"""
Randomly shuffles x and y data.
"""
perm = torch.randperm(x.shape[0])
return x[perm], y[perm]
def _validate_model_acc_value(
self, value: Union[float, int, torch.Tensor], value_name: str = "model_acc"
) -> None:
"""
Validates a single model accuracy value based on its type.
Parameters
----------
value : Union[float, int, torch.Tensor]
The model accuracy value to validate
value_name : str
Name of the value for error messages (default: "model_acc")
Raises
------
ValueError
If the value type is invalid or out of range
"""
if isinstance(value, float):
if not (0.0 <= value <= 1.0):
raise ValueError(
f"{value_name} must be between 0.0 and 1.0 for float type, got {value}"
)
elif isinstance(value, int):
if not (1 <= value <= 100):
raise ValueError(
f"{value_name} must be between 1 and 100 for int type, got {value}"
)
elif isinstance(value, torch.Tensor):
if value.numel() != 1:
raise ValueError(
f"{value_name} tensor must have exactly one element, got shape {value.shape}"
)
value_scalar = value.item()
if not (0.0 <= value_scalar <= 1.0):
raise ValueError(
f"{value_name} tensor value must be between 0.0 and 1.0, got {value_scalar}"
)
else:
raise ValueError(
f"{value_name} must be of type float, int, or torch.Tensor, got {type(value)}"
)
[docs] def permuteData(self, data: torch.tensor, model_acc: float, mode: str = "AtoT") -> torch.tensor:
"""
This function permutes data for quality equalization to maintain the accuracy of the model.
Ground truth data assumed to be binary values in a pytorch tensor but intended to work for any NxM type array.
Parameters
----------
data : torch.tensor
Original ground truth data.
model_acc : float
Accuracy of the model (A' or T') w.r.t (A or T) - used for quality equalization.
mode : str
Mode identifier for the attacker model
Returns
-------
new_data : torch.tensor
Randomly pertubed data matching the specified model accuracy.
"""
if isinstance(model_acc, (float, int, torch.Tensor)):
self._validate_model_acc_value(model_acc, "model_acc")
model_acc = config.normalise(model_acc)
curr_model_acc = (
model_acc.item() if isinstance(model_acc, torch.Tensor) else float(model_acc)
)
elif isinstance(model_acc, dict):
if mode not in model_acc:
raise ValueError(
f"mode '{mode}' not found in model_acc dictionary. Available keys: {list(model_acc.keys())}"
)
if not isinstance(model_acc[mode], (float, int, torch.Tensor)):
raise ValueError(
f"model_acc['{mode}'] must be float, int, or torch.Tensor, got {type(model_acc[mode])}"
)
self._validate_model_acc_value(model_acc[mode], f"model_acc['{mode}']")
model_acc[mode] = config.normalise(model_acc[mode])
curr_model_acc = (
model_acc[mode].item()
if isinstance(model_acc[mode], torch.Tensor)
else float(model_acc[mode])
)
else:
raise ValueError(
"Invalid model accuracy type given. Expected float, int, torch.Tensor, or dict with mode as key and float, int, torch.Tensor as value."
)
num_observations = data.shape[0]
rand_vect = torch.zeros((num_observations, 1))
rand_vect[: int(curr_model_acc * num_observations)] = 1
rand_vect = rand_vect[torch.randperm(num_observations)]
new_data = rand_vect * (data) + (1 - rand_vect) * (1 - data)
return new_data
[docs] def split(
self,
feat: torch.tensor,
data: torch.tensor,
pred: torch.tensor,
test_size: Union[float, None] = None,
) -> Tuple[torch.tensor, torch.tensor, torch.tensor, torch.tensor, torch.tensor, torch.tensor]:
"""
Splits the data into training and testing sets.
"""
if test_size == None:
test_size = self.test_size
feat_train, feat_test, data_train, data_test, pred_train, pred_test = train_test_split(
feat, data, pred, test_size=test_size
)
return feat_train, feat_test, data_train, data_test, pred_train, pred_test
# ============================================================================
# LEAKAGE CALCULATION METHODS
# ============================================================================
[docs] def calcLeak(
self,
feat_train: torch.tensor,
data_train: torch.tensor,
pred_train: torch.tensor,
feat_test: torch.tensor,
data_test: torch.tensor,
pred_test: torch.tensor,
mode: Optional[str] = None,
) -> torch.tensor:
"""
Calculates the leakage of the attacker model for a given mode and given data splits.
Parameters
----------
feat : torch.tensor
Protected Attribute.
data : torch.tensor
Ground truth data.
pred : torch.tensor
Predicted Values.
mode : Literal["AtoT","TtoA"]
Sets Direction of calculation.
Returns
-------
leakage : torch.tensor
Evaluated as if normalized, returns
(λ_M - λ_D) / (λ_M + λ_D), otherwise returns λ_M - λ_D
"""
mode_suffix = "_" + mode if mode else ""
model_acc_train = torch.sum(data_train == pred_train) / data_train.shape[0]
model_acc_test = torch.sum(data_test == pred_test) / data_test.shape[0]
# compute data
pert_data_train = self.permuteData(data_train, model_acc_train, mode)
pert_data_test = self.permuteData(data_test, model_acc_test, mode)
self.train(feat_train, pert_data_train, "D" + mode_suffix)
lambda_d = self.calcLambda(
getattr(self, "attacker_D" + mode_suffix), feat_test, pert_data_test
)
print(f"lambda_d: {lambda_d}")
# compute model leakage
self.train(feat_train, pred_train, "M" + mode_suffix)
lambda_m = self.calcLambda(getattr(self, "attacker_M" + mode_suffix), feat_test, pred_test)
print(f"lambda_m: {lambda_m}")
# return computed leakage
return self.compute_leakage(lambda_d, lambda_m, self.normalized)
[docs] def calcLambda(
self, model: torch.nn.Module, x: torch.tensor, y: torch.tensor, **kwargs
) -> torch.tensor:
"""Calculate the lambda value for a given attacker model, input data and target data."""
y_pred = model(x)
if self.threshold:
y_pred = (y_pred > config.DEFAULT_PREDICTION_THRESHOLD).float()
return self.eval_metric(y_pred, y)
[docs] def compute_leakage(
self, lambda_d: torch.tensor, lambda_m: torch.tensor, normalized: bool = False
) -> torch.tensor:
"""Compute the leakage value for a given predictability metric."""
if normalized:
return (lambda_m - lambda_d) / (lambda_m + lambda_d)
else:
return lambda_m - lambda_d
[docs] def getAmortizedLeakage(
self,
feat_train: torch.tensor,
data_train: torch.tensor,
pred_train: torch.tensor,
mode: Optional[str] = None,
num_trials: int = config.DEFAULT_NUM_TRIALS,
method: str = config.DEFAULT_AGGREGATION_METHOD,
feat_test: torch.tensor = None,
data_test: torch.tensor = None,
pred_test: torch.tensor = None,
) -> Tuple[torch.tensor, torch.tensor]:
"""
This function computes the average amortized leakage for a given protected attribute,
ground truth data and predicted values.
If the given dataset is not split into training and testing sets, the data is split into training and testing sets.
The leakage is calculated for each trial.
The leakage is then averaged based on provided method and the standard deviation is calculated.
Parameters
----------
feat_train: torch.tensor
The protected attribute training data.
data_train: torch.tensor
The ground truth training data.
pred_train: torch.tensor
The predicted training data.
mode: Optional[str]
The mode to be used.
num_trials: int
The number of trials to be used.
method: str
The method to be used.
feat_test: torch.tensor
The protected attribute testing data.
data_test: torch.tensor
The ground truth testing data.
pred_test: torch.tensor
The predicted testing data.
Returns
-------
Tuple[torch.tensor, torch.tensor]
The formatted amortized leakage and the standard deviation of the form "leakage ± standard deviation".
"""
if feat_test == None:
feat_train, feat_test, data_train, data_test, pred_train, pred_test = self.split(
feat_train, data_train, pred_train, test_size=config.DEFAULT_TEST_SIZE
)
vals = torch.zeros(num_trials)
for i in range(num_trials):
print("-" * 50)
print(f"Working on Trial: {i}")
vals[i] = self.calcLeak(
feat_train,
data_train,
pred_train,
feat_test,
data_test,
pred_test,
mode,
)
print(f"Leakage for Trial {i}: {vals[i]}")
if method == "mean":
return self._getFormattedLeakage(torch.mean(vals), torch.std(vals))
elif method == "median":
return self._getFormattedLeakage(torch.median(vals), torch.std(vals))
else:
raise ValueError("Invalid Method given for Amortization.")
def _getFormattedLeakage(self, agg: torch.tensor, std: torch.tensor) -> str:
return f"{agg:.4f} ± {std:.4f}"
# ============================================================================
# EVALUATION METRICS INITIALIZATION METHODS
# ============================================================================
[docs] def initialize_eval_metrics(self, metric: Union[Callable, str]) -> None:
"""
Initializes the evaluation metric for the predictor model.
"""
if callable(metric):
self.eval_metric = metric
elif type(metric) == str:
if metric in self.eval_functions:
self.eval_metric = self.eval_functions[metric]
else:
raise ValueError(
f"Metric '{metric}' Option is unavailable. User must choose from {list(self.eval_functions.keys())}"
)
else:
raise ValueError(f"Invalid Metric '{metric}' Given.")
# ============================================================================
# ABSTRACT METHODS
# ============================================================================
[docs] @abstractmethod
def defineModel(self):
"""Define the attacker_model_D and attacker_model_M models by the subclasses."""
pass
# ============================================================================
# LEAKAGE Predictability Metric
# ============================================================================
[docs]class Leakage(BasePredictabilityMetric):
"""
This class inherits from the BasePredictabilityMetric class and implements the Leakage metric.
"""
[docs] def __init__(
self,
attacker_model: torch.nn.Module,
train_params: Dict[str, Any],
eval_metric: Union[Callable, str] = config.DEFAULT_EVAL_METRIC,
threshold: bool = True,
normalized: bool = False,
) -> None:
"""
Parameters
----------
model_params : dict
{"attacker" : model}
train_params : dict
{
"learning_rate": The learning rate hyperparameter,
"loss_function": The loss function to be used.
Existing options: ["mse", "cross-entropy"],
"epochs": Number of training epochs to be set,
"batch_size: Number of batches per epoch
}
model_acc : float
The accuracy of the model being used for quality equalization.
eval_metric : Union[Callable,str]
This metric is used to evaluate the attacker model accuracy.
Options include ["accuracy", "mse", "bce"].
threshold : bool
Whether to use a threshold on the predictions.
normalized : bool
Default is False to use the raw leakage value.
Returns
-------
None
Initializes the class.
"""
model_params = {"attacker": attacker_model}
super().__init__(model_params, train_params, eval_metric, threshold, normalized)
[docs] def defineModel(self) -> None:
"""
This function defines the attacker models for the Leakage metric.
The attacker model for ground truth data is defined as the attacker_D model.
The attacker model for predicted values is defined as the attacker_M model.
"""
self.attacker_D = self.model_params["attacker"]
self.attacker_M = copy.deepcopy(self.attacker_D)
[docs] def getAmortizedLeakage(
self,
feat_train: torch.tensor,
data_train: torch.tensor,
pred_train: torch.tensor,
num_trials: int = config.DEFAULT_NUM_TRIALS,
method: str = config.DEFAULT_AGGREGATION_METHOD,
feat_test: torch.tensor = None,
data_test: torch.tensor = None,
pred_test: torch.tensor = None,
) -> Tuple[torch.tensor, torch.tensor]:
"""
This function calls the base class method to compute the amortized leakage
for the Leakage metric.
Parameters
----------
feat_train: torch.tensor
The protected attribute training data.
data_train: torch.tensor
The ground truth training data.
pred_train: torch.tensor
The predicted training data.
num_trials: int
The number of trials to be used.
method: str
The method to be used.
feat_test: torch.tensor
The protected attribute testing data.
data_test: torch.tensor
The ground truth testing data.
pred_test: torch.tensor
The predicted testing data.
Returns
-------
Tuple[torch.tensor, torch.tensor]
The formatted amortized leakage and the standard deviation of the form "leakage ± standard deviation"
for the Leakage metric.
"""
return super().getAmortizedLeakage(
feat_train=feat_train,
data_train=data_train,
pred_train=pred_train,
mode=None,
num_trials=num_trials,
method=method,
feat_test=feat_test,
data_test=data_test,
pred_test=pred_test,
)
# ============================================================================
# DPA Predictability Metric
# ============================================================================
[docs]class DPA(BasePredictabilityMetric):
[docs] def __init__(
self,
attacker_AtoT: torch.nn.Module,
attacker_TtoA: torch.nn.Module,
train_params: Dict[str, Any],
eval_metric: Union[Callable, str] = config.DEFAULT_EVAL_METRIC,
threshold: bool = True,
normalized: bool = True,
) -> None:
"""
Parameters
----------
model_params : dict
Dictionary of the following forms-
{"attacker_AtoT" : attacker_AtoT, "attacker_TtoA" : attacker_TtoA}
train_params : dict
{
"learning_rate": The learning rate hyperparameter,
"loss_function": The loss function to be used.
Existing options: ["mse", "cross-entropy"],
"epochs": Number of training epochs to be set,
"batch_size: Number of batches per epoch
}
model_acc : Union[float, dict]
The accuracy of the model being used for quality equalization.
For bidirectional case, send dict of the form {'AtoT': acc_AtoT, 'TtoA': acc_TtoA}
eval_metric : Union[Callable,str]
This metric is used to evaluate the attacker model accuracy.
Options include ["accuracy", "mse", "bce"].
threshold : bool
Whether to use a threshold on the predictions.
normalized : bool
Default is True in order to use normalization for the leakage calculation.
Returns
-------
None
Initializes the class.
"""
model_params = {"attacker_AtoT": attacker_AtoT, "attacker_TtoA": attacker_TtoA}
super().__init__(model_params, train_params, eval_metric, threshold, normalized)
[docs] def defineModel(self) -> None:
"""
This function defines the attacker models for the DPA metric.
The attacker models for ground truth data are defined as the attacker_D_AtoT and attacker_D_TtoA models.
The attacker models for predicted values are defined as the attacker_M_AtoT and attacker_M_TtoA models.
"""
if type(self.model_params.get("attacker_AtoT", None)) == None:
raise Exception("attacker_AtoT Model Missing!")
if type(self.model_params.get("attacker_TtoA", None)) == None:
raise Exception("attacker_TtoA Model Missing!")
self.attacker_D_AtoT = self.model_params["attacker_AtoT"]
self.attacker_M_AtoT = copy.deepcopy(self.attacker_D_AtoT)
self.attacker_D_TtoA = self.model_params["attacker_TtoA"]
self.attacker_M_TtoA = copy.deepcopy(self.attacker_D_TtoA)
[docs] def getAmortizedLeakage(
self,
feat_train: torch.tensor,
data_train: torch.tensor,
pred_train: torch.tensor,
mode: Literal["AtoT", "TtoA"],
num_trials: int = config.DEFAULT_NUM_TRIALS,
method: str = config.DEFAULT_AGGREGATION_METHOD,
feat_test: torch.tensor = None,
data_test: torch.tensor = None,
pred_test: torch.tensor = None,
) -> Tuple[torch.tensor, torch.tensor]:
"""
This function calls the base class method to compute the amortized leakage
for the DPA metric.
The mode is used to determine the direction of the leakage calculation.
If mode is "AtoT", the leakage is calculated from the protected attribute to the ground truth data.
If mode is "TtoA", the leakage is calculated from the ground truth data to the protected attribute.
Parameters
----------
feat_train: torch.tensor
The protected attribute training data.
data_train: torch.tensor
The ground truth training data.
pred_train: torch.tensor
The predicted training data.
mode: Literal["AtoT", "TtoA"]
The mode to be used.
num_trials: int
The number of trials to be used.
method: str
The method to be used.
feat_test: torch.tensor
The protected attribute testing data.
data_test: torch.tensor
The ground truth testing data.
pred_test: torch.tensor
The predicted testing data.
Returns
-------
Tuple[torch.tensor, torch.tensor]
The formatted amortized leakage and the standard deviation of the form "leakage ± standard deviation"
for the DPA metric.
"""
return super().getAmortizedLeakage(
feat_train=feat_train,
data_train=data_train,
pred_train=pred_train,
mode=mode,
num_trials=num_trials,
method=method,
feat_test=feat_test,
data_test=data_test,
pred_test=pred_test,
)
[docs] def calcBidirectional(
self,
A: torch.tensor,
T: torch.tensor,
A_pred: torch.tensor,
T_pred: torch.tensor,
num_trials: int = config.DEFAULT_NUM_TRIALS,
method: str = config.DEFAULT_AGGREGATION_METHOD,
) -> Tuple[Tuple[torch.tensor, torch.tensor], Tuple[torch.tensor, torch.tensor]]:
"""
This function computes the bidirectional leakage for a given protected attribute,
ground truth data and predicted values.
Parameters
----------
A: torch.tensor
The protected attribute.
T: torch.tensor
The ground truth data.
A_pred: torch.tensor
The predicted protected attribute.
T_pred: torch.tensor
The predicted ground truth data.
num_trials: int
The number of trials to be used.
method: str
The method to be used.
Returns
-------
Tuple[Tuple[torch.tensor, torch.tensor], Tuple[torch.tensor, torch.tensor]]
The tuple contains (AtoT_leakage, TtoA_leakage).
AtoT_leakage gives the average amortized leakage for the AtoT direction.
TtoA_leakage gives the average amortized leakage for the TtoA direction.
"""
AtoT_vals = self.getAmortizedLeakage(A, T, T_pred, num_trials, method)
TtoA_vals = self.getAmortizedLeakage(T, A, A_pred, num_trials, method)
return (AtoT_vals, TtoA_vals)
# ============================================================================
# LIC Predictability Metric
# ============================================================================
class LIC(BasePredictabilityMetric):
def __init__(self):
raise NotImplementedError("LIC is not implemented yet.")
if __name__ == "__main__":
# Test case
P, D, D2, M1, M2 = dataCreator(16384, 0.2, False, 0.05)
P = torch.tensor(P, dtype=torch.float).reshape(-1, 1)
D = torch.tensor(D, dtype=torch.float).reshape(-1, 1)
D2 = torch.tensor(D2, dtype=torch.float).reshape(-1, 1)
M1 = torch.tensor(M1, dtype=torch.float).reshape(-1, 1)
M2 = torch.tensor(M2, dtype=torch.float).reshape(-1, 1)
# Calculating Params
model_1_acc = torch.sum(D == M1) / D.shape[0]
model_2_acc = torch.sum(D == M2) / D.shape[0]
# Attacker Model Initialization
attackerModel = simpleDenseModel(
1, 1, 1, numFirst=1, activations=["sigmoid", "sigmoid", "sigmoid"]
)
train_config = {
"learning_rate": 0.05,
"loss_function": "bce",
"epochs": config.DEFAULT_EPOCHS,
"batch_size": config.DEFAULT_BATCH_SIZE,
}
# Leakage Parameter Initialization
leakage_obj = Leakage(
attacker_model=attackerModel,
train_params=train_config,
eval_metric="accuracy",
)
print("=" * 50)
print(f"Getting Amortized Leakage for Leakage Metric")
print("=" * 50)
print("Calculating Leakage for case 1")
leakage_1 = leakage_obj.getAmortizedLeakage(P, D, M1)
print(f"Leakage for case 1: {leakage_1}")
print("=" * 50)
# print("="*50)
# print("Calculating Leakage for case 2")
# leakage_2 = leakage_obj.getAmortizedLeakage(P, D, M2)
# print("="*50)
# print(f"Amortised Leakage for case 2: {leakage_2}")
# print("="*50)
# print("="*50)
# print("Calculating Leakage for case 3")
# leakage_3 = leakage_obj.getAmortizedLeakage(P, D2, M1)
# print(f"Leakage for case 3: {leakage_3}")
# print("="*50)
# print("="*50)
# print("Calculating Leakage for case 4")
# leakage_4 = leakage_obj.getAmortizedLeakage(P, D2, M2)
# print(f"Leakage for case 4: {leakage_4}")
# print("="*50)
# print("="*50)
# # Parameter Initialization
# dpa_obj = DPA(
# attacker_AtoT=attackerModel,
# attacker_TtoA=attackerModel,
# train_params=train_config,
# model_acc=model_1_acc,
# eval_metric="accuracy"
# )
# print("="*50)
# print(f"Getting Amortized Leakage for DPA Metric")
# print("="*50)
# print("Calculating DPA for case 1")
# dpa_1 = dpa_obj.getAmortizedLeakage(P, D, M1, "AtoT")
# print(f"DPA for case 1: {dpa_1}")
# print("="*50)
# print("="*50)
# print("Calculating DPA for case 2")
# dpa_2 = dpa_obj.getAmortizedLeakage(P, D, M2, "AtoT")
# print(f"DPA for case 2: {dpa_2}")
# print("="*50)
# print("="*50)
# print("Calculating DPA for case 3")
# dpa_3 = dpa_obj.getAmortizedLeakage(P, D2, M1, "AtoT")
# print(f"DPA for case 3: {dpa_3}")
# print("="*50)
# print("="*50)
# print("Calculating DPA for case 4")
# dpa_4 = dpa_obj.getAmortizedLeakage(P, D2, M2, "AtoT")
# print(f"DPA for case 4: {dpa_4}")
# print("="*50)