Batch scoring on big data using scikit-learn and Spell workflows

One of the most common tasks in production ML is batch scoring.

Batch scoring is the process of using a trained ML model to generate predictions on a concrete dataset. If you’ve ever scored on a model on a test set to evaluate model performance or to save to disk for later use (perhaps to serve offline later), you’ve performed batch scoring.

It’s pretty straightforward — embed the scoring job directly into the model training job. However, mature pipelines on larger datasets almost inevitably manage batch scoring as a separate step in a pipeline. This is the principle of separation of concerns in action: it allows for the decoupled development of training and serving steps, eases job scaling, and makes the pipeline easier to debug if something goes wrong.

In this article, we will learn how to implement a scikit-learn batch scoring pipeline using Spell workflows. Workflows are Spell’s pipelines feature: they allow you to construct and execute a DAG of individual Spell runs. Throughout this article we will also discuss various tips and tricks for getting the most out of your batch scoring jobs. These tools and techniques are, taken together, the secret to running high-performance, scalable batch scoring jobs on Spell.

Note that this article will assume familiarity with Spell runs.

Storing and training on "big data"

For purposes of demonstration, we will use the wta-matches dataset from Kaggle. This dataset records matches played on the WTA women’s tennis tour in the years 2000 through 2016. Here is Kaggle's preview of this dataset:

A common technique when working with "big data" is training on just a subset of the data. This is because when the dataset is extremely large, it’s rarely necessary to train the model on the entire thing. Usually a small (but representative) subset of the data can achieve the same performance in a fraction of the total training time. This relationship between converged model accuracy and the number of training set samples is called the learning curve of the model, and it’s a function of the learning capacity of the model modulo the noisiness of the dataset.

Deep learning models famously have extremely deep learning curves: you really can’t feed them too much data. Meanwhile, classical machine learning techniques like regression and support vector machines, have very limited learning capacity, and will level out much sooner.

To learn more about learning curves, check out my Kaggle notebook on the subject. In this article, we will demonstrate a pipeline using subset training on a sklearn LogisticRegression model using just 2015 match data.

Another common big data technique is partitioning. Partitioning a dataset makes it easier to store and manage. The WTA matches dataset is an example of a dataset partitioned on year — each wta_matches_*.csv file corresponds with a single year of play on the tour.

If you know that your dataset will often be grouped by a certain key, it makes sense to partition the dataset by that key, allowing a cluster computing framework (like Spark or Dask) to bucket data by that key "for free". E.g. if a user asks for every tennis match from 2000, the system can just read and return the contents of the wta_matches_2000.csv file. If we had partitioned the dataset using a different key, or not at all, the system would have to scan through the entire dataset to find and extract every single record from 2000.

This obviously has significant performance implications for large-scale data queries. As a result, the manner in which you partition your datasets is a key component of your data warehousing strategy.

wta-matches is not a "big data" dataset. However, it has already been partitioned for us, making it a convenient use case for our demo.

One final thing to note is file format. In this demo we will be using CSV for both input and output. CSV is a simple, human-readable format, making it an appropriate choice for simple datasets like this one, but it doesn’t have any compression (beyond in-browser gzip) or other performance-oriented features, so it’s not a very fast file format.

Almost all "big data" pipelines these days make heavy use of Parquet instead. Apache Parquet is a highly compressed columnar data storage format, part of the larger Hadoop ecosystem, that has better read/write performance (thanks to its columnar layout) and smaller on-disk size (thanks to its use of compression) than raw CSV files. To keep things simple, we will not be using Parquet here, but keep in mind that Parquet is the data format of choice at scale.

Preparing the dataset

Spell uses S3 object storage as our storage layer. To prepare the data, we will download the dataset (using Kaggle's Python API client) and upload it to an S3 bucket we own:

$ kaggle datasets download "gmadevs/wta-matches"
$ unzip wta-matches.zip -d wta-matches/
$ rm wta-matches.zip
$ aws s3 sync wta-matches/ s3://spell-datasets-share/wta-matches/
$ rm -rf wta-matches/

In the aws s3 sync command above, replace s3://spell-datasets-share with the name of a bucket you own. Next, mount the bucket into your Spell organization, if you haven't already:

$ prodspell cluster add-bucket --bucket spell-datasets-share

This will give your Spell cluster access to this bucket, allowing you to mount objects from this bucket into any of your Spell runs.

Preparing the training script

Here is the training script we will use:

import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from joblib import dump

matches = pd.read_csv("/mnt/wta-matches/wta_matches_2015.csv")
point_diff = (matches.winner_rank_points - matches.loser_rank_points).dropna()
X = point_diff.values[:, np.newaxis]
y = (point_diff > 0).values.astype(int).reshape(-1, 1)

sort_order = np.argsort(X[:, 0])
X = X[sort_order, :]
y = y[sort_order, :]

clf = LogisticRegression()
clf.fit(X, y.ravel())

dump(clf, 'wta-matches-model.joblib')

This training script loads wta_matches_2015.csv from the /mnt/wta-matches/ directory, trains a LogisticRegression model on it, and writes it to disk using joblib (if you're not familiar with joblib, it's the library sklearn uses for training parallelization and model I/O). Because we’re training on the 2015 partition of the dataset only, this is an example of subset training in action.

To execute this training on Spell, you would run:

$ spell run \
    --machine-type cpu \
    --pip pandas --pip scikit-learn \
    --mount s3://spell-datasets-share/wta-matches/:/mnt/wta-matches/ \
    -- python train.py

Preparing the scoring script

Next, we will write the scoring script. This script loads the filename dataset off of disk, runs a scoring job on it, and writes the result back to disk:

import numpy as np
import argparse
from joblib import load

parser = argparse.ArgumentParser()
parser.add_argument('--filename', type=str, dest='filename', help='path to the dataset to be scored')
args = parser.parse_args()

if __name__ == "__main__":
    from distributed import Client, LocalCluster
    from dask_ml.wrappers import ParallelPostFit
    import dask.dataframe as dd

    cluster = LocalCluster()
    client = Client(cluster)

    clf = load('wta-matches-model.joblib')
    clf = ParallelPostFit(clf)

    matches = dd.read_csv(args.filename, assume_missing=True)
    point_diff = (matches.winner_rank_points - matches.loser_rank_points).dropna()
    X_test = point_diff.compute().values[:, np.newaxis]

    y_test_pred = clf.predict(X_test)
    np.save("predictions.npy", y_test_pred)

Notice that this script is using Dask to do the heavy lifting. Dask is a cluster computing framework that lets you execute Python code on larger-than-memory dataset across one or many machines.

We’re big fans of Dask at Spell. Whether working alone on CPU or paired with NVIDIA’s RAPIDS on GPU, Dask makes it easy to scale ML jobs on huge workloads.

In this script we initialize a single-node Dask cluster. Then, after loading the LogisticRegression model artifact into memory we wrap it using Dask's ParallelPostFit and score it on a dataset read into memory using dask.dataframe.read_csv (instead of using e.g. pandas.read_csv). This does two important things for us:

  1. It parallelizes the scoring process. Dask will launch a separate scoring task for each vCPU on the machine, greatly speeding up the scoring process (this is, in essence, all ParallelPostFit does — hence the name!).
  2. It removes the dataset size bottleneck. If we were to use an all-at-once dataset reader like pandas.read_csv instead, we would have to be cognizant of the memory limits of the machine, as the entire dataset needs to fit into memory for read_csv to succeed. Dask reads data one chunk at a time, allowing us to scale the process to a larger-than-memory data with ease.

To learn more about Dask check out some of our previous articles on the subject: "Getting started in the RAPIDS distributed ML ecosystem, part 1: ETL" and "Getting started with large-scale ETL jobs using Dask and AWS EMR".

Here’s how we would run this code on Spell:

$ spell run --machine-type cpu \
    --docker-image residentmario/dask-cpu-workspace:latest \
    --mount runs/285/wta-matches-model.joblib \
    --mount s3://spell-datasets-share/wta-matches/wta_matches_2015.csv \
    -- python score.py --filename wta_matches_2015.csv

This spell run command mounts the model saved to disk in our previous (training) run and 2015 partition of the dataset into the run, then executes our scoring job with that model on that data.

Combining training and scoring into a workflow

Now that we have working prototypes of our training and scoring runs, we are ready to combine the two into a pipeline using a Spell workflow!

Spell workflows are defined using the Spell Python API and launched from the CLI using the spell workflow command. Here is our complete demo workflow—we’ll walk through how it works step-by-step:

import spell.client
client = spell.client.from_environment()

train = client.runs.new(
    machine_type="cpu",
    github_url="https://github.com/ResidentMario/spell-batch.git",
    pip_packages=["pandas", "scikit-learn"],
    attached_resources={
        "s3://spell-datasets-share/wta-matches/": "/mnt/wta-matches/"
    },
    command="python train.py"
)
train.wait_status(*client.runs.FINAL)
train.refresh()
if train.status != client.runs.COMPLETE:
    raise OSError(f"Failed at training run {train.id}.")

test = []
for partition in range(2000, 2017):
    r = client.runs.new(
        machine_type="cpu",
        github_url="https://github.com/ResidentMario/spell-batch.git",
        docker_image="residentmario/dask-cpu-workspace:latest",
        attached_resources={
            f"runs/{train.id}/wta-matches-model.joblib": "wta-matches-model.joblib",
            f"s3://spell-datasets-share/wta-matches/wta_matches_{partition}.csv": \
                f"wta_matches_{partition}.csv"
        },
        command=f"python score.py --filename wta_matches_{partition}.csv"
    )
    test.append(r)

for run in test:
    run.wait_status(*client.runs.FINAL)
    run.refresh()
    if run.status != client.runs.COMPLETE:
        raise OSError(f"Failed at scoring run {run.id}.")

combine = client.runs.new(
    machine_type="cpu",
    github_url="https://github.com/ResidentMario/spell-batch.git",
    docker_image="residentmario/dask-cpu-workspace:latest",
    attached_resources={
        f"runs/{run.id}/predictions.npy": f"/spell/data/{run.id}.npy" for run in test
    },
    command="python combine.py"
)
combine.wait_status(*client.runs.FINAL)
combine.refresh()
if combine.status != client.runs.COMPLETE:
    raise OSError(f"Failed at combining run {combine.id}.")

print("Finished workflow!")

The first thing this script does is initialize our training run using the client.runs.new method, which is to the Python API equivalent to the spell run CLI command.

We can't proceed to scoring until the training run has completed successfully. The next section of the script blocks until the training run completes, then checks the status of the run to ensure that it completed successfully:

train.wait_status(*client.runs.FINAL)
train.refresh()
if train.status != client.runs.COMPLETE:
    raise OSError(f"Failed at training run {train.id}.")

Assuming the run is successful, the workflow moves onto the next task: launching the scoring jobs!

Notice that the code launches a new model scoring job for each partition in the dataset (e.g. for every year from 2000 through 2016 inclusive). At the end of the workflow, a combine run (code omitted; see here if you’re curious) stitches the results of each individual scoring run back together again, and saves it to disk for long-term storage or upstream consumption.

Astute readers will notice that we’ve basically just implemented a MapReduce job of Spell runs. 🙂 This workflow demonstrates the power of using Spell for horizontal scaling: splitting the scoring job among many runs to speed up the scoring process. You can also just as easily use Spell to perform vertical scaling: improving the speed of scoring job by moving into one a more powerful instance.

For example, in this run we scored our data using a baseline cpu instance. It would be trivially easy to do client.runs.new(machine_type="cpu-large", ...) or client.runs.new(machine_type="t4") instead, to move the job to a bigger CPU instance or a GPU instance instead, respectively.

Which combination of horizontal and vertical scaling works best for you will depend on your exact use case. Spell is flexible enough to support either (or both).

Executing the workflow

With all of the code done, we can execute our workflow using the following spell workflow CLI command:

$ spell workflow create -- python workflow.py

You can monitor the progress of the workflow in the workflows page in the web console:

Conclusion

And that’s it — that’s our complete scikit-learn batch processing workflow on Spell!

Although this tutorial uses scikit-learn, the same basic principles discussed here apply regardless of framework. For example, Dask has a recipe for scaling batch scoring using PyTorch, and XGBoost has native support for training and scoring on Dask that works pretty much out-of-the-box.

If you enjoyed this article, you may also like:

Ready to Get Started?

Create an account in minutes or connect with our team to learn how Spell can accelerate your business.