Skip to content

From a pytorch model to a deep explainable model#

For a quick introduction to the Xpdeep APIs, this section demonstrates, on the Bike dataset, how to adapt a standard deep model's PyTorch code to transition to designing an explainable deep model.

We will review the key steps involved in designing a deep model, from architecture specification and training to generating explanations (for Xpdeep).

For each step in building a deep model, we provide:

  • Tabs labeled "SOTA and Xpdeep" for code that is identical for both the SOTA deep model and the Xpdeep explainable model.
  • Tabs labeled "Xpdeep" for code specific to the Xpdeep explainable model.

1. Project Setup#

Setup Api Key and URL#

from xpdeep import init

init(api_key="MY_API_KEY", api_url="MY_API_URL")

Create a Project#

from xpdeep import set_project
from xpdeep.project import Project

set_project(Project.create_or_get(name="Bike Tutorial"))

2. Data preparation#

Read Raw Data#

import pandas as pd

train_val_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")

test_data = test_data.drop(columns=["atemp"])
train_val_data = train_val_data.drop(columns=["casual", "atemp", "registered"])

for dataset in [test_data, train_val_data]:
    dataset["datetime"] = pd.to_datetime(dataset["datetime"])
    dataset["year"] = dataset["datetime"].dt.year
    dataset["month"] = dataset["datetime"].dt.month
    dataset["hour"] = dataset["datetime"].dt.hour
    dataset["weekday"] = dataset["datetime"].dt.weekday
    dataset.drop(columns=["datetime"], inplace=True)  # noqa: PD002

Split Data#

from sklearn.model_selection import train_test_split

# Split the data into training and validation sets
train_data, val_data = train_test_split(train_val_data, test_size=0.2, random_state=42)

Conversion to Parquet Format#

# Conversion to Parquet 

import pyarrow as pa
import pyarrow.parquet as pq
from xpdeep.dataset.upload import upload

# Convert to pyarrow Table format
train_table = pa.Table.from_pandas(train_data, preserve_index=True)
val_table = pa.Table.from_pandas(val_data, preserve_index=True)
test_table = pa.Table.from_pandas(test_data, preserve_index=True)

# Save each split as ".parquet" file
pq.write_table(train_table, "train.parquet")
pq.write_table(val_table, "val.parquet")
pq.write_table(test_table, "test.parquet")

Upload#

import boto3

client = boto3.client(
    service_name="s3",
    endpoint_url=S3_DATASET_ENDPOINT_URL,
    aws_access_key_id=S3_DATASET_ACCESS_KEY_ID,
    aws_secret_access_key=S3_DATASET_SECRET_ACCESS_KEY,
    config=Config(signature_version="s3v4"),
)

client.upload_file("train.parquet", S3_DATASET_BUCKET_NAME, "bike/train.parquet")
client.upload_file("val.parquet", S3_DATASET_BUCKET_NAME, "bike/val.parquet")
client.upload_file("test.parquet", S3_DATASET_BUCKET_NAME, "bike/test.parquet")

Preprocess Data#

from sklearn.preprocessing import OneHotEncoder, StandardScaler
import numpy as np

# Fit preprocessors
numerical_features = ["temp", "humidity", "windspeed"]
categorical_features = ["season", "holiday", "workingday", "weather", "year", "month", "hour", "weekday"]
target_feature = "count"

numerical_features_standard_scaler = StandardScaler().fit(train_data[numerical_features])

categorical_features_encoders = {}
for category in categorical_features:
    categorical_features_encoders[category] = OneHotEncoder(sparse_output=False).fit(train_data[[category]])

target_feature_encoder = StandardScaler().fit(train_data[[target_feature]])

# Transform data
x_train = np.concatenate(
    [numerical_features_standard_scaler.transform(train_data[numerical_features])] 
    + 
    [categorical_features_encoders[feature].transform(train_data[[feature]]) for feature in categorical_features], 
    axis=1
)
y_train = target_feature_encoder.transform(train_data[[target_feature]])

x_test = np.concatenate(
    [numerical_features_standard_scaler.transform(test_data[numerical_features])] 
    + 
    [categorical_features_encoders[feature].transform(test_data[[feature]]) for feature in categorical_features], 
    axis=1
)
y_test = target_feature_encoder.transform(test_data[[target_feature]])

x_val = np.concatenate(
    [numerical_features_standard_scaler.transform(val_data[numerical_features])] 
    + 
    [categorical_features_encoders[feature].transform(val_data[[feature]]) for feature in categorical_features], 
    axis=1
)
y_val = target_feature_encoder.transform(val_data[[target_feature]])


# input and output sizes
input_size = x_train.shape[1]
target_size = y_train.shape[1]
storage_options={
        "key": S3_DATASET_ACCESS_KEY_ID,
        "secret": S3_DATASET_SECRET_ACCESS_KEY,
        "client_kwargs": {
            "endpoint_url": S3_DATASET_ENDPOINT_URL,
        },
        "s3_additional_kwargs": {"addressing_style": "path"},
    }
train_dataset = ParquetDataset(
name="bike_train_set",
path=f"s3://{S3_DATASET_BUCKET_NAME}/bike/train.parquet",
storage_options=storage_options,
)

analyzed_train_dataset = train_dataset.analyze(target_names=["count"])
print(analyzed_train_dataset.analyzed_schema)

fit_train_dataset = analyzed_train_dataset.fit()

fit_test_dataset = FittedParquetDataset(
    name="bike_test_set",
    path=f"s3://{S3_DATASET_BUCKET_NAME}/bike/test.parquet",
    storage_options=storage_options,
    fitted_schema=fit_train_dataset.fitted_schema,
)

fit_val_dataset = FittedParquetDataset(
    name="bike_validation_set",
    path=f"s3://{S3_DATASET_BUCKET_NAME}/bike/val.parquet",
    storage_options=storage_options,
    fitted_schema=fit_train_dataset.fitted_schema,
)

input_size = fit_train_dataset.fitted_schema.input_size[1]
target_size = fit_train_dataset.fitted_schema.target_size[1]

3. Model Construction#

Architecture Specification#

import torch

layers = [
    torch.nn.Linear(input_size, 512),
    torch.nn.ReLU(),        
    torch.nn.Linear(512, 128),
    torch.nn.ReLU(),
    torch.nn.Linear(128, target_size),
]
from torch.nn import Sequential
import torch

feature_extractor = Sequential(
    torch.nn.Linear(input_size, 512),
    torch.nn.ReLU(),
    torch.nn.Linear(512, 128),
    torch.nn.ReLU(),
)

task_learner = Sequential(
    torch.nn.Linear(128, target_size),
)

Model Instantiation#

from torch.nn import Sequential

sota_model = Sequential(*layers)
from xpdeep.model.model_builder import ModelDecisionGraphParameters
from xpdeep.model.xpdeep_model import XpdeepModel

# Model specifications and hyperparameters.
model_specifications = ModelDecisionGraphParameters(
    feature_extraction_output_type=FeatureExtractionOutputType.VECTOR,
)

# Xpdeep Model Architecture
xpdeep_model = XpdeepModel.from_torch(
    example_dataset=fit_train_dataset,
    feature_extraction=feature_extractor,
    task_learner=task_learner,
    backbone=None,
    decision_graph_parameters=model_specifications,
)

4. Training#

Training Specification#

from torch import nn

loss_fn = nn.MSELoss()
optimizer = torch.optim.AdamW(sota_model.parameters(), lr=1e-3)
batch_size = 128
epochs = 60
from xpdeep.trainer.callbacks import EarlyStopping, Scheduler
from functools import partial
from xpdeep.metric import DictMetrics, TorchGlobalMetric, TorchLeafMetric
from torch.optim.lr_scheduler import ReduceLROnPlateau
from xpdeep.trainer.trainer import Trainer
from torchmetrics import MeanSquaredError

metrics = DictMetrics(
    mse=TorchGlobalMetric(metric=partial(MeanSquaredError), on_raw_data=True),
    leaf_metric_mse=TorchLeafMetric(metric=partial(MeanSquaredError), on_raw_data=True),
)

callbacks = [
    EarlyStopping(monitoring_metric="Total loss", mode="minimize", patience=5),
    Scheduler(pre_scheduler=partial(ReduceLROnPlateau), step_method="epoch", monitoring_metric="Total loss"),
]

# Optimizer is a partial object as pytorch needs to give the model as optimizer parameter.
optimizer = partial(torch.optim.AdamW, lr=0.001, foreach=False, fused=False)

trainer = Trainer(
    loss=torch.nn.MSELoss(reduction="none"),
    optimizer=optimizer,
    callbacks=callbacks,
    start_epoch=0,
    max_epochs=120,
    metrics=metrics,
)

Model Training#

from sklearn.metrics import mean_squared_error, root_mean_squared_error
import torch
import time

device = "cpu"
torch.manual_seed(0)

def train(X_train, y_train, model, loss_fn, optimizer):
    size = len(X_train)
    model.train()
    total_loss = 0

    for batch in range(size//batch_size):

        X_batch, y_batch = torch.tensor(X_train[batch*batch_size:(batch+1)*batch_size,:], dtype=torch.float32).to(device), torch.tensor(y_train[batch*batch_size:(batch+1)*batch_size,:], dtype=torch.float32).to(device)

        # Compute prediction error
        pred = model(X_batch)
        loss = loss_fn(pred, y_batch)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    average_loss = total_loss/(size//batch_size)
    return average_loss


def eval_(X_test, y_test, model, loss_fn):
    model.eval()
    with torch.no_grad():
        X_test, y_test = torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.float32).to(device)

        pred = model(X_test)
        test_loss = loss_fn(pred, y_test).item()

        mse = mean_squared_error(target_feature_encoder.inverse_transform(y_test), target_feature_encoder.inverse_transform(pred))
        rmse = root_mean_squared_error(target_feature_encoder.inverse_transform(y_test), target_feature_encoder.inverse_transform(pred))


        return target_feature_encoder.inverse_transform(pred), test_loss, mse, rmse

start_time = time.time()

for t in range(epochs):

    print(f"\nEpoch {t+1}\n-------------------------------")


    training_loss = train(
        x_train, 
        y_train, 
        sota_model, 
        loss_fn, 
        optimizer
    )

    _, val_loss, _, _ = eval_(
        x_val, 
        y_val, 
        sota_model, 
        loss_fn
    )

    print(f"Training Loss: {training_loss}\nValidation Loss: {val_loss}")

_, _, mse_on_train, rmse_on_train = eval_(x_train, y_train, sota_model, loss_fn)
_, _, mse_on_validation, rmse_on_validation = eval_(x_val, y_val, sota_model, loss_fn)
_, _, mse_on_test, rmse_on_test = eval_(x_test, y_test, sota_model, loss_fn)

print(f"\nTraining time : --- {time.time() - start_time:.2f} seconds --- \n")
print(f"\nMSEs: "
      f"\nMSE on train set       : {mse_on_train}"
      f"\nMSE on validation set  : {mse_on_validation}"
      f"\nMSE on test set        : {mse_on_test}"
      )
trained_model = trainer.train(
    model=xpdeep_model,
    train_set=fit_train_dataset,
    validation_set=fit_val_dataset,
    batch_size=128,
)

5. Explanation Generation#

from xpdeep.explain.explainer import Explainer
from xpdeep.explain.quality_metrics import Infidelity, Sensitivity
from xpdeep.explain.statistic import DictStats, HistogramStat, VarianceStat

statistics = DictStats(
    histogram_target=HistogramStat(on="target", num_bins=20, num_items=1000, on_raw_data=True),
    histogram_prediction=HistogramStat(on="prediction", num_bins=20, num_items=1000, on_raw_data=True),
    histogram_error=HistogramStat(on="prediction_error", num_bins=20, num_items=1000, on_raw_data=True),
    variance_target=VarianceStat(on="target", on_raw_data=True),
    variance_prediction=VarianceStat(on="prediction", on_raw_data=True),
)

quality_metrics = [Sensitivity(), Infidelity()]

explainer = Explainer(
    description_representativeness=1000, quality_metrics=quality_metrics, metrics=metrics, statistics=statistics
)

model_explanations = explainer.global_explain(
    trained_model,
    train_set=fit_train_dataset,
    test_set=fit_test_dataset,
    validation_set=fit_val_dataset,
)
print(model_explanations.visualisation_link)