Log Sklearn Pipelines with MLflow & Deploy 🚀

Jun 25, 2023 · 1242 words · 6 minutes read sklearn mlflow

I am a simple person. I just want to neatly log all the parameters, metrics, and data pertaining to a sk-learn pipeline to MLflow, including cross-validation. Why? Because sometimes, the metrics are quite similar, but the two candidate models are vastly different w.r.t. how their pipeline pre-processes the data. For example:

  • You don’t know which scaler is the best option given the data—normalise or standardise?

  • You are not sure if you should include a dimensionality reduction technique prior to model fitting (e.g. PCA)

  • You want to try out a variety of estimators in a multitude of settings and record the results in one go.

MLflow allows you to fit and log one model at a time (one can loop), but their automatic logger mlflow.autolog() doesn’t save the results of the intermediate stages when tracking meta-estimators (i.e. sklearn pipelines).

I browsed 20+ tutorials, blog posts, and what-have-you. None of them does it the way I need it.

So here is my solution 🤷

Disclaimer: Remains to be seen if the trouble of resurrecting a defunct R blog to post about model registry in Python after 6 years, on a website written in Hugo, using reticulate in R Studio, is indeed worth it.


Setup & Data

Let’s start with importing the required libraries and regular house-keeping

import mlflow
import requests
import warnings
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import balanced_accuracy_score
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import GridSearchCV, StratifiedKFold, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.datasets import make_classification

warnings.filterwarnings('ignore') #for aesthetics
RANDOM_STATE = 1895

The only step here is to create a minimalist synthetic dataset and split it into train and test sets

X, y = make_classification(n_features=4, random_state=RANDOM_STATE)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, random_state=RANDOM_STATE)

Modelling

Next, a pipeline object to prevent data leakage and make our lives generally easier

estimator = Pipeline([('scaler', StandardScaler()),
                      ('lr', LogisticRegression(random_state=RANDOM_STATE))])

Now, I want to do a hyper-parameter grid search using cross-validation. To illustrate the point succinctly, I include both MinMaxScaler() and StandardScaler(), as well as the option to skip this step altogether with passthrough in the pre-processing. I also supply some options for the logistic regression as well.

param_grid = {
    'scaler': [MinMaxScaler(), StandardScaler(), 'passthrough'],
    'lr__C': np.arange(0.0, 1.01, 0.25),
    'lr__penalty': ['l1', 'l2'],
    'lr__solver': ['liblinear', 'saga']
}

Setting some reasonable scoring and CV options

scoring = ['balanced_accuracy', 'f1_weighted', 'precision', 'recall', 'roc_auc']
cv = StratifiedKFold(n_splits=3)

and putting everything together inside GridSearchCV(). This will refit the best model candidate with the hyper-parameters based on the highest ROC-AUC score.

gs = GridSearchCV(
    estimator=estimator,
    param_grid=param_grid,
    scoring=scoring,
    n_jobs=-1,
    refit='roc_auc',
    cv=cv,
    verbose=1,
    return_train_score=True,
)

Sipping coffee while this trains ☕ This fits about 180 models, so should be fast.

gs.fit(X_train, y_train)

Record out-of-sample predictions and the balanced accuracy score, although it could be any other classification scorer.

y_pred = gs.predict(X_test)
round(balanced_accuracy_score(y_test, y_pred), 3)

The CV and the hyper-parameter search results are contained in gs.cv_results_. I only want the columns that either contain the substring mean (all metrics) or param_ (hyper-parameters). Also sorting the df column names alphabetically to make it easier downstream—i.e. when viewing them later on the MLflow UI.

cv_results = pd.DataFrame(gs.cv_results_).filter(
  regex=r'mean|param_'
  ).sort_index(
    axis=1
    )

The next bit is a bit messy, but essentially:

  • gs_best_index_ contains the index number for the best model; e.g. gs.cv_results_['params'][gs.best_index_] provides the hyper-parameters for the best model
  • Using that, we can also get key-value pairs of the best model (they all contain the string mean)
best_metrics_values = [result[1][gs.best_index_] for result in gs.cv_results_.items()]
best_metrics_keys = [metric for metric in gs.cv_results_]
best_metrics_dict = {k:v for k,v in zip(best_metrics_keys, best_metrics_values) if 'mean' in k}

MLflow Tracking

Let’s fire up the MLflow UI from the relevant directory using a terminal. You might be tempted to !mlflow ui inside a notebook, however it will halt execution as long as it serves the UI.

mlflow ui

Alright, finally the fun part. I display the whole chunk first to give the idea. Still feeling dirty about the .iterrows() 😬

experiment_name = 'Sklearn Pipeline Tutorial'
registered_model_name = 'lr_predictor'
description = 'A toy logistic regression model on synthetic data'
tags = {'version': 'v0.1', 'type': 'demo'}

mlflow.set_tracking_uri('http://127.0.0.1:5000')
mlflow.create_experiment(name=experiment_name,
                         tags=tags,
                         artifact_location=Path.cwd().joinpath('mlruns').as_uri())
mlflow.set_experiment(experiment_name=experiment_name)

with mlflow.start_run(run_name='Parent Run', description=description) as run:
    run_id = run.info.run_id
    mlflow.log_params(gs.best_params_)
    mlflow.log_metrics(best_metrics_dict)
    mlflow.log_input(mlflow.data.from_numpy(X), context='features')
    mlflow.log_input(mlflow.data.from_numpy(y), context='labels')
    signature = mlflow.models.infer_signature(X_train, gs.predict(X_test))
    mlflow.sklearn.log_model(gs, 'best_model',
                             signature=signature,
                             input_example=X_train[0],
                             registered_model_name=registered_model_name)
    for _, result in cv_results.iterrows():
        with mlflow.start_run(run_name='Child Run',
        description='CV models', nested=True):
            mlflow.log_params(result.filter(regex='param_').to_dict())
            mlflow.log_metrics(result.filter(regex='mean').to_dict())

Let’s break down what is happening.

  1. We set some hopefully self-explanatory metadata about the experiment.

  2. Next, we start tracking by invoking mlflow.set_tracking_uri(). This step is crucial to create the linkage between your code and the backend.

  3. We create the experiment with mlflow.create_experiment() and set it as active with mlflow.set_experiment(). In both cases, we use the same experiment name.

  4. We use the context manager with mlflow.start_run(): to start logging a series of runs, which is nice (it will close itself). Normally, you would fit the model inside, and MLflow would capture it there. Here, I only want to upload the CV results and deploy the best model online, so I’m OK with fitting the model outside.

  5. We save the run_id because we will need it to make the call for model serving.

  6. In the meanwhile, we log the bits we are interested in, including the data—features and labels—with mlflow.log_input().

  7. We log the model using mlflow.sklearn.log_model()—most popular frameworks exist as ‘flavours’ in MLflow. We supply the model signature, which is inferred from the training and predicted data using mlflow.models.infer_signature(). This enforces a contract on the shape of input and output data, so when you invoke the model, you need to adhere to them. Finally, we register the model by giving it a name so that we can refer to it easily during staging.

  8. Next, we loop the child runs under the parent. Note that these have their own context manager. With nested=True, they will be grouped under the parent run in the MLflow UI for easier viewing.

MLflow Deployment

We want to verify that the model works as intended before deploying it to production, so let’s change it status to staging. Of course, we can also do this manually in the UI.

stage = 'Staging'

client = mlflow.MlflowClient()
client.transition_model_version_stage(
    name=registered_model_name,
    version=1,
    stage=stage)

We can now load the model from the registry and start making inferences. The new data point is reshaped to match the inferred signature for input data; that is (-1, 4).

model = mlflow.pyfunc.load_model(model_uri=f'models:/{registered_model_name}/{stage}')
new_data = np.random.uniform(-2, 2, 4).reshape((-1,4))
model.predict(new_data)

Let’s serve the best model at port 5001. Make sure to have the run_id ready to make the call to serve the model—this is why we saved it earlier. We need to supply the path to the model; this is where the MLmodel file is stored. We need a yet another terminal for this, as it will stay busy serving the model.

mlflow models serve -m <wd>/mlruns/<run_id>/artifacts/best_model -h 0.0.0.0 -p 5001

Once the model is live, we can make inferences using the command line.

curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{"dataframe_records": [{"a":1, "b":2, "c": 3, "d": 4}]}'

which should return Predictions: {"predictions": [0]}. In this case, the prediction is a non-event.

If you prefer to do it in Python, here’s an option using requests. This time I picked all negatives values for all four features, so the predicted outcome should be [1].

host = '127.0.0.1'
port = '5001'

url = f'http://{host}:{port}/invocations'
headers = {'Content-Type': 'application/json'}
data = '{"dataframe_records": [{"a": -1, "b": -2, "c": -2, "d": -1}]}'

r = requests.post(url=url, headers=headers, data=data)

print(f'Predictions: {r.text}')

Happy deploying!