From a pytorch model to a deep explainable model#
For a quick introduction to the Xpdeep APIs, this section demonstrates, on the Har dataset, how to adapt a standard deep model's PyTorch code to transition to designing an explainable deep model.
We will review the key steps involved in designing a deep model, from architecture specification and training to generating explanations (for Xpdeep).
For each step in building a deep model, we provide:
- Tabs labeled "SOTA and Xpdeep" for code that is identical for both the SOTA deep model and the Xpdeep explainable model.
- Tabs labeled "Xpdeep" for code specific to the Xpdeep explainable model.
1. Project Setup#
Setup Api Key and URL#
Create a Project#
2. Data preparation#
Read Raw Data#
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
# Read train data
features_dict = {}
split_name = "train"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
train_inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
train_targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Read test data
features_dict = {}
split_name = "test"
for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
feature_name = feature_filepath.stem
features_dict[feature_name] = np.squeeze(
pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
test_inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
test_targets = np.squeeze(
pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)
# Map the target to their labels
activity_mapping = {
1: "Walking",
2: "Walking upstairs",
3: "Walking downstairs",
4: "Sitting",
5: "Standing",
6: "Laying",
}
targets_mapper = np.vectorize(lambda x: activity_mapping[x])
train_targets = targets_mapper(train_targets) # Map targets to their labels.
test_targets = targets_mapper(test_targets)
test_val_data = pd.DataFrame.from_dict({"human_activity": test_inputs.tolist(), "activity": test_targets})
Split Data#
Conversion to Parquet Format#
import pyarrow as pa
import pyarrow.parquet as pq
# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data.reset_index(names="har_index"), preserve_index=False)
val_table = pa.Table.from_pandas(val_data.reset_index(names="har_index"), preserve_index=False)
test_table = pa.Table.from_pandas(test_data.reset_index(names="har_index"), preserve_index=False)
# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")
Upload#
import boto3
client = boto3.client(
service_name="s3",
endpoint_url=S3_DATASET_ENDPOINT_URL,
aws_access_key_id=S3_DATASET_ACCESS_KEY_ID,
aws_secret_access_key=S3_DATASET_SECRET_ACCESS_KEY,
config=Config(signature_version="s3v4"),
)
client.upload_file("train.parquet", S3_DATASET_BUCKET_NAME, "har/train.parquet")
client.upload_file("val.parquet", S3_DATASET_BUCKET_NAME, "har/val.parquet")
client.upload_file("test.parquet", S3_DATASET_BUCKET_NAME, "har/test.parquet")
Preprocess Data#
from sklearn.preprocessing import OneHotEncoder, StandardScaler
input_standard_scaler_for_nn = StandardScaler().fit(np.array(train_data["human_activity"].to_list()).reshape(-1, 1))
target_one_hot_encoder_for_nn = OneHotEncoder(sparse_output=False).fit(train_data[["activity"]].values)
x_train = np.array(train_data["human_activity"].to_list())
x_train_shape_d1, x_train_shape_d2, x_train_shape_d3 = x_train.shape
x_train = input_standard_scaler_for_nn.transform(x_train.reshape(-1,1)).reshape(x_train_shape_d1, x_train_shape_d2, x_train_shape_d3)
y_train = target_one_hot_encoder_for_nn.transform(train_data["activity"].to_numpy().reshape(-1,1))
x_val = np.array(val_data["human_activity"].to_list())
x_val_shape_d1, x_val_shape_d2, x_val_shape_d3 = x_val.shape
x_val = input_standard_scaler_for_nn.transform(x_val.reshape(-1,1)).reshape(x_val_shape_d1, x_val_shape_d2, x_val_shape_d3)
y_val = target_one_hot_encoder_for_nn.transform(val_data["activity"].to_numpy().reshape(-1,1))
x_test = np.array(test_data["human_activity"].to_list())
x_test_shape_d1, x_test_shape_d2, x_test_shape_d3 = x_test.shape
x_test = input_standard_scaler_for_nn.transform(x_test.reshape(-1,1)).reshape(x_test_shape_d1, x_test_shape_d2, x_test_shape_d3)
y_test = target_one_hot_encoder_for_nn.transform(test_data["activity"].to_numpy().reshape(-1,1))
class Scaler(TorchPreprocessor):
"""Air quality, ECG, HAR and Gas Price preprocessors."""
def __init__(self, input_size: tuple[int, ...], mean: torch.Tensor, scale: torch.Tensor):
super().__init__(input_size=input_size)
# Saved as buffer for torch.export: saved loaded with `state_dict` but not optimized with `optimizer.step()
self.register_buffer("mean", mean)
self.register_buffer("scale", scale)
def transform(self, inputs: torch.Tensor) -> torch.Tensor:
"""Transform."""
return (inputs - self.mean) / self.scale
def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
"""Apply inverse transform."""
return output * self.scale + self.mean
train_tensor = torch.tensor(train_table.column("human_activity").to_pylist())
mean = train_tensor.mean(dim=(0, 1))
std = train_tensor.std(dim=(0, 1))
preprocessor = Scaler(input_size=(128, 9), mean=mean, scale=std)
analyzed_schema = AnalyzedSchema(
ExplainableFeature(
name="human_activity",
preprocessor=preprocessor,
feature_type=MultivariateTimeSeries(
asynchronous=True,
channel_names=[
"body_acc_x",
"body_acc_y",
"body_acc_z",
"body_gyro_x",
"body_gyro_y",
"body_gyro_z",
"total_acc_x",
"total_acc_y",
"total_acc_z",
],
),
),
ExplainableFeature(
is_target=True,
name="activity",
preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
feature_type=CategoricalFeature(),
),
IndexMetadata(name="har_index"),
)
storage_options={
"key": S3_DATASET_ACCESS_KEY_ID,
"secret": S3_DATASET_SECRET_ACCESS_KEY,
"client_kwargs": {
"endpoint_url": S3_DATASET_ENDPOINT_URL,
},
"s3_additional_kwargs": {"addressing_style": "path"},
}
# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
name="har_train_set",
path=f"s3://{S3_DATASET_BUCKET_NAME}/har/train.parquet",
storage_options=storage_options,
analyzed_schema=analyzed_schema,
)
print(analyzed_schema)
fit_train_dataset = analyzed_train_dataset.fit()
fit_test_dataset = FittedParquetDataset(
name="har_test_set",
path=f"s3://{S3_DATASET_BUCKET_NAME}/har/test.parquet",
storage_options=storage_options,
fitted_schema=fit_train_dataset.fitted_schema,
)
fit_val_dataset = FittedParquetDataset(
name="har_validation_set",
path=f"s3://{S3_DATASET_BUCKET_NAME}/har/val.parquet",
storage_options=storage_options,
fitted_schema=fit_train_dataset.fitted_schema,
)
input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]
3. Model Construction#
Architecture Specification#
from torch.nn import Sequential
import torch
class SotaModel(Sequential):
def __init__(self):
layers = [
torch.nn.Conv1d(9, 32, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Flatten(),
torch.nn.LazyLinear(out_features=6),
]
super().__init__(*layers)
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
x = inputs.transpose(1, 2)
return super().forward(x)
class FeatureExtractor(Sequential):
def __init__(self):
layers = [
torch.nn.Conv1d(9, 32, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
torch.nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1),
torch.nn.ReLU(),
]
super().__init__(*layers)
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
x = inputs.transpose(1, 2)
return super().forward(x)
feature_extractor = FeatureExtractor()
task_learner = Sequential(torch.nn.Flatten(), torch.nn.LazyLinear(out_features=6), torch.nn.Softmax(dim=-1))
Model Instantiation#
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel
model_specifications = ModelDecisionGraphParameters(
feature_extraction_output_type=FeatureExtractionOutputType.TEMPORAL_MATRIX,
balancing_weight=0.1,
)
# Xpdeep Model Architecture
xpdeep_model = XpdeepModel.from_torch(
example_dataset=fit_train_dataset,
feature_extraction=feature_extractor,
task_learner=task_learner,
backbone=None,
decision_graph_parameters=model_specifications,
)
4. Training#
Training Specification#
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from functools import partial
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xpdeep.trainer.trainer import Trainer
from torchmetrics.classification import MulticlassAccuracy, MulticlassConfusionMatrix
# Metrics to monitor the training.
metrics = DictMetrics(
global_multi_class_accuracy=TorchGlobalMetric(
partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
),
leaf_multi_class_accuracy=TorchLeafMetric(
partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
),
leaf_confusion_matrix=TorchLeafMetric(
partial(MulticlassConfusionMatrix, num_classes=target_size, normalize="all"), target_as_indexes=True
),
)
callbacks = [
EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=15),
Scheduler(
pre_scheduler=partial(ReduceLROnPlateau, patience=10, mode="min"),
step_method="epoch",
monitoring_metric="Total loss",
),
]
# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)
trainer = Trainer(
loss=CrossEntropyLossFromProbabilities(reduction="none"),
optimizer=optimizer,
callbacks=callbacks,
start_epoch=0,
max_epochs=60,
metrics=metrics,
)
Model Training#
from sklearn.metrics import accuracy_score
import torch
import time
torch.manual_seed(0)
def train(X_train, y_train, model, loss_fn, optimizer):
size = len(X_train)
model.train()
total_loss = 0
for batch in range(size//batch_size):
X_batch, y_batch = torch.tensor(X_train[batch*batch_size:(batch+1)*batch_size,:,:], dtype=torch.float32).to(device), torch.tensor(y_train[batch*batch_size:(batch+1)*batch_size,:], dtype=torch.float32).to(device)
# Compute prediction error
pred = model(X_batch)
loss = loss_fn(pred, y_batch)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
average_loss = total_loss/(size//batch_size)
return average_loss
def eval_(X_test, y_test, model, loss_fn):
# size = len(X_test)
model.eval()
with torch.no_grad():
X_test, y_test = torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)
pred = model(X_test)
test_loss = loss_fn(pred, y_test).item()
accuracy = (torch.argmax(pred, 1) == torch.argmax(y_test, 1)).float().mean()
return nn.Softmax(dim=-1)(pred).round(), test_loss, accuracy
start_time = time.time()
for t in range(epochs):
print(f"\nEpoch {t+1}\n-------------------------------")
training_loss = train(
x_train,
y_train,
sota_model,
loss_fn,
optimizer
)
_, val_loss, _ = eval_(
x_val,
y_val,
sota_model,
loss_fn
)
print(f"Training Loss: {training_loss}\nValidation Loss: {val_loss}")
_, _, accuracy_on_train = eval_(x_train, y_train, sota_model, loss_fn)
_, _, accuracy_on_validation = eval_(x_val, y_val, sota_model, loss_fn)
_, _, accuracy_on_test = eval_(x_test, y_test, sota_model, loss_fn)
print(f"\nTraining time : --- {time.time() - start_time:.2f} seconds --- \n")
print(f"\nAccuracies: "
f"\nAccuracy on train set : {accuracy_on_train}"
f"\nAccuracy on validation set : {accuracy_on_validation}"
f"\nAccuracy on test set : {accuracy_on_test}"
)
5. Explanation Generation#
from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat
statistics = DictStats(
distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)
quality_metrics = [Sensitivity(), Infidelity()]
explainer = Explainer(
description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)
model_explanations = explainer.global_explain(
trained_model,
train_set=fit_train_dataset,
test_set=fit_test_dataset,
validation_set=fit_val_dataset,
)
print(model_explanations.visualisation_link)