Skip to content

From a pytorch model to a deep explainable model#

For a quick introduction to the Xpdeep APIs, this section demonstrates, on the Har dataset, how to adapt a standard deep model's PyTorch code to transition to designing an explainable deep model.

We will review the key steps involved in designing a deep model, from architecture specification and training to generating explanations (for Xpdeep).

For each step in building a deep model, we provide:

  • Tabs labeled "SOTA and Xpdeep" for code that is identical for both the SOTA deep model and the Xpdeep explainable model.
  • Tabs labeled "Xpdeep" for code specific to the Xpdeep explainable model.

1. Project Setup#

Setup Api Key and URL#

from xpdeep import init

init(api_key="MY_API_KEY", api_url="MY_API_URL")

Create a Project#

from xpdeep import set_project
from xpdeep.project import Project

set_project(Project.create_or_get(name="Har Tutorial"))

2. Data preparation#

Read Raw Data#

from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# Read train data
features_dict = {}

split_name = "train"

for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
    feature_name = feature_filepath.stem
    features_dict[feature_name] = np.squeeze(
        pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
    )

train_inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
train_targets = np.squeeze(
    pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)


# Read test data
features_dict = {}

split_name = "test"

for feature_filepath in sorted(Path(f"{split_name}/Inertial Signals/").rglob("*.txt")):
    feature_name = feature_filepath.stem
    features_dict[feature_name] = np.squeeze(
        pd.read_csv(feature_filepath, sep=r"\s+", header=None).to_numpy(dtype=np.float32)
    )

test_inputs = np.transpose(np.stack(list(features_dict.values()), axis=1), (0, 2, 1))
test_targets = np.squeeze(
    pd.read_csv(f"{split_name}/y_{split_name}.txt", sep=r"\s+", header=None).to_numpy(dtype=np.float32)
)

# Map the target to their labels
activity_mapping = {
    1: "Walking",
    2: "Walking upstairs",
    3: "Walking downstairs",
    4: "Sitting",
    5: "Standing",
    6: "Laying",
}

targets_mapper = np.vectorize(lambda x: activity_mapping[x])
train_targets = targets_mapper(train_targets)  # Map targets to their labels.

test_targets = targets_mapper(test_targets) 
test_val_data = pd.DataFrame.from_dict({"human_activity": test_inputs.tolist(), "activity": test_targets})

Split Data#

from sklearn.model_selection import train_test_split
import numpy as np

train_data = pd.DataFrame.from_dict({"human_activity": train_inputs.tolist(), "activity": train_targets})
test_data, val_data = train_test_split(test_val_data, test_size=0.5, random_state=42)

Conversion to Parquet Format#

import pyarrow as pa
import pyarrow.parquet as pq

# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data.reset_index(names="har_index"), preserve_index=False)
val_table = pa.Table.from_pandas(val_data.reset_index(names="har_index"), preserve_index=False)
test_table = pa.Table.from_pandas(test_data.reset_index(names="har_index"), preserve_index=False)

# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")

Upload#

import boto3

client = boto3.client(
    service_name="s3",
    endpoint_url=S3_DATASET_ENDPOINT_URL,
    aws_access_key_id=S3_DATASET_ACCESS_KEY_ID,
    aws_secret_access_key=S3_DATASET_SECRET_ACCESS_KEY,
    config=Config(signature_version="s3v4"),
)

client.upload_file("train.parquet", S3_DATASET_BUCKET_NAME, "har/train.parquet")
client.upload_file("val.parquet", S3_DATASET_BUCKET_NAME, "har/val.parquet")
client.upload_file("test.parquet", S3_DATASET_BUCKET_NAME, "har/test.parquet")

Preprocess Data#

from sklearn.preprocessing import OneHotEncoder, StandardScaler

input_standard_scaler_for_nn = StandardScaler().fit(np.array(train_data["human_activity"].to_list()).reshape(-1, 1))
target_one_hot_encoder_for_nn = OneHotEncoder(sparse_output=False).fit(train_data[["activity"]].values)

x_train = np.array(train_data["human_activity"].to_list())
x_train_shape_d1, x_train_shape_d2, x_train_shape_d3 = x_train.shape
x_train = input_standard_scaler_for_nn.transform(x_train.reshape(-1,1)).reshape(x_train_shape_d1, x_train_shape_d2, x_train_shape_d3)
y_train = target_one_hot_encoder_for_nn.transform(train_data["activity"].to_numpy().reshape(-1,1))

x_val = np.array(val_data["human_activity"].to_list())
x_val_shape_d1, x_val_shape_d2, x_val_shape_d3 = x_val.shape
x_val = input_standard_scaler_for_nn.transform(x_val.reshape(-1,1)).reshape(x_val_shape_d1, x_val_shape_d2, x_val_shape_d3)
y_val = target_one_hot_encoder_for_nn.transform(val_data["activity"].to_numpy().reshape(-1,1))

x_test = np.array(test_data["human_activity"].to_list())
x_test_shape_d1, x_test_shape_d2, x_test_shape_d3 = x_test.shape
x_test = input_standard_scaler_for_nn.transform(x_test.reshape(-1,1)).reshape(x_test_shape_d1, x_test_shape_d2, x_test_shape_d3)
y_test = target_one_hot_encoder_for_nn.transform(test_data["activity"].to_numpy().reshape(-1,1))
class Scaler(TorchPreprocessor):
    """Air quality, ECG, HAR and Gas Price preprocessors."""

    def __init__(self, input_size: tuple[int, ...], mean: torch.Tensor, scale: torch.Tensor):
        super().__init__(input_size=input_size)
        # Saved as buffer for torch.export: saved loaded with `state_dict` but not optimized with `optimizer.step()
        self.register_buffer("mean", mean)
        self.register_buffer("scale", scale)

    def transform(self, inputs: torch.Tensor) -> torch.Tensor:
        """Transform."""
        return (inputs - self.mean) / self.scale

    def inverse_transform(self, output: torch.Tensor) -> torch.Tensor:
        """Apply inverse transform."""
        return output * self.scale + self.mean

train_tensor = torch.tensor(train_table.column("human_activity").to_pylist())
mean = train_tensor.mean(dim=(0, 1))
std = train_tensor.std(dim=(0, 1))
preprocessor = Scaler(input_size=(128, 9), mean=mean, scale=std)

analyzed_schema = AnalyzedSchema(
    ExplainableFeature(
        name="human_activity",
        preprocessor=preprocessor,
        feature_type=MultivariateTimeSeries(
            asynchronous=True,
            channel_names=[
                "body_acc_x",
                "body_acc_y",
                "body_acc_z",
                "body_gyro_x",
                "body_gyro_y",
                "body_gyro_z",
                "total_acc_x",
                "total_acc_y",
                "total_acc_z",
            ],
        ),
    ),
    ExplainableFeature(
        is_target=True,
        name="activity",
        preprocessor=SklearnPreprocessor(preprocess_function=OneHotEncoder(sparse_output=False)),
        feature_type=CategoricalFeature(),
    ),
IndexMetadata(name="har_index"),
)

storage_options={
            "key": S3_DATASET_ACCESS_KEY_ID,
            "secret": S3_DATASET_SECRET_ACCESS_KEY,
            "client_kwargs": {
                "endpoint_url": S3_DATASET_ENDPOINT_URL,
            },
            "s3_additional_kwargs": {"addressing_style": "path"},
        }

# Create a dataset from the analyzed schema.
analyzed_train_dataset = AnalyzedParquetDataset(
    name="har_train_set",
    path=f"s3://{S3_DATASET_BUCKET_NAME}/har/train.parquet",
    storage_options=storage_options,
    analyzed_schema=analyzed_schema,
)
print(analyzed_schema)

fit_train_dataset = analyzed_train_dataset.fit()

fit_test_dataset = FittedParquetDataset(
    name="har_test_set",
    path=f"s3://{S3_DATASET_BUCKET_NAME}/har/test.parquet",
    storage_options=storage_options,
    fitted_schema=fit_train_dataset.fitted_schema,
)

fit_val_dataset = FittedParquetDataset(
    name="har_validation_set",
    path=f"s3://{S3_DATASET_BUCKET_NAME}/har/val.parquet",
    storage_options=storage_options,
    fitted_schema=fit_train_dataset.fitted_schema,
)

input_size = fit_train_dataset.fitted_schema.input_size[1:]
target_size = fit_train_dataset.fitted_schema.target_size[1]

3. Model Construction#

Architecture Specification#

from torch.nn import Sequential
import torch

class SotaModel(Sequential):

    def __init__(self):
        layers = [
            torch.nn.Conv1d(9, 32, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Flatten(),
            torch.nn.LazyLinear(out_features=6),
        ]
        super().__init__(*layers)

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
         x = inputs.transpose(1, 2)
         return super().forward(x)
class FeatureExtractor(Sequential):
    def __init__(self):
        layers = [
            torch.nn.Conv1d(9, 32, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1),
            torch.nn.ReLU(),
        ]

        super().__init__(*layers)

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
        x = inputs.transpose(1, 2)
        return super().forward(x)


feature_extractor = FeatureExtractor()
task_learner = Sequential(torch.nn.Flatten(), torch.nn.LazyLinear(out_features=6), torch.nn.Softmax(dim=-1))

Model Instantiation#

from torch.nn import Sequential

sota_model = SotaModel()
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel

model_specifications = ModelDecisionGraphParameters(
    feature_extraction_output_type=FeatureExtractionOutputType.TEMPORAL_MATRIX,
    balancing_weight=0.1,
)

# Xpdeep Model Architecture
xpdeep_model = XpdeepModel.from_torch(
    example_dataset=fit_train_dataset,
    feature_extraction=feature_extractor,
    task_learner=task_learner,
    backbone=None,
    decision_graph_parameters=model_specifications,
)

4. Training#

Training Specification#

from torch import nn

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(sota_model.parameters(), lr=1e-3)
batch_size = 128
epochs = 20
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from functools import partial
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xpdeep.trainer.trainer import Trainer
from torchmetrics.classification import MulticlassAccuracy, MulticlassConfusionMatrix

# Metrics to monitor the training.
metrics = DictMetrics(
    global_multi_class_accuracy=TorchGlobalMetric(
        partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
    ),
    leaf_multi_class_accuracy=TorchLeafMetric(
        partial(MulticlassAccuracy, num_classes=target_size, average="micro"), target_as_indexes=True
    ),
    leaf_confusion_matrix=TorchLeafMetric(
        partial(MulticlassConfusionMatrix, num_classes=target_size, normalize="all"), target_as_indexes=True
    ),
)

callbacks = [
    EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=15),
    Scheduler(
        pre_scheduler=partial(ReduceLROnPlateau, patience=10, mode="min"),
        step_method="epoch",
        monitoring_metric="Total loss",
    ),
]

# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)

trainer = Trainer(
    loss=CrossEntropyLossFromProbabilities(reduction="none"),
    optimizer=optimizer,
    callbacks=callbacks,
    start_epoch=0,
    max_epochs=60,
    metrics=metrics,
)

Model Training#

from sklearn.metrics import accuracy_score
import torch
import time

torch.manual_seed(0)

def train(X_train, y_train, model, loss_fn, optimizer):
    size = len(X_train)
    model.train()
    total_loss = 0

    for batch in range(size//batch_size):

        X_batch, y_batch = torch.tensor(X_train[batch*batch_size:(batch+1)*batch_size,:,:], dtype=torch.float32).to(device), torch.tensor(y_train[batch*batch_size:(batch+1)*batch_size,:], dtype=torch.float32).to(device)

        # Compute prediction error
        pred = model(X_batch)
        loss = loss_fn(pred, y_batch)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    average_loss = total_loss/(size//batch_size)
    return average_loss


def eval_(X_test, y_test, model, loss_fn):
    # size = len(X_test)
    model.eval()
    with torch.no_grad():
        X_test, y_test = torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)

        pred = model(X_test)
        test_loss = loss_fn(pred, y_test).item()

        accuracy = (torch.argmax(pred, 1) == torch.argmax(y_test, 1)).float().mean()

        return nn.Softmax(dim=-1)(pred).round(), test_loss, accuracy


start_time = time.time()

for t in range(epochs):

    print(f"\nEpoch {t+1}\n-------------------------------")


    training_loss = train(
        x_train, 
        y_train, 
        sota_model, 
        loss_fn, 
        optimizer
    )

    _, val_loss, _ = eval_(
        x_val, 
        y_val, 
        sota_model, 
        loss_fn
    )

    print(f"Training Loss: {training_loss}\nValidation Loss: {val_loss}")

_, _, accuracy_on_train = eval_(x_train, y_train, sota_model, loss_fn)
_, _, accuracy_on_validation = eval_(x_val, y_val, sota_model, loss_fn)
_, _, accuracy_on_test = eval_(x_test, y_test, sota_model, loss_fn)

print(f"\nTraining time : --- {time.time() - start_time:.2f} seconds --- \n")
print(f"\nAccuracies: "
      f"\nAccuracy on train set      : {accuracy_on_train}"
      f"\nAccuracy on validation set : {accuracy_on_validation}"
      f"\nAccuracy on test set       : {accuracy_on_test}"
      )
trained_model = trainer.train(
    model=xpdeep_model,
    train_set=fit_train_dataset,
    validation_set=fit_val_dataset,
    batch_size=128,
)

5. Explanation Generation#

from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, DistributionStat

statistics = DictStats(
    distribution_target=DistributionStat(on="target"), distribution_prediction=DistributionStat(on="prediction")
)

quality_metrics = [Sensitivity(), Infidelity()]

explainer = Explainer(
    description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)

model_explanations = explainer.global_explain(
    trained_model,
    train_set=fit_train_dataset,
    test_set=fit_test_dataset,
    validation_set=fit_val_dataset,
)
print(model_explanations.visualisation_link)