A quick guide on using Python’s multiprocessing
library to parallelize model selection using apply_async
Some common data science tasks take a long time to run, but are embarrassingly parallel. Embarrassingly parallel means that they do not depend on each other and could therefore easily be done at the same time. The best examples are training different models and cross validation. In cross validation, training the model on k-1 folds before testing it on the remaining fold and training the model on k-1 different folds before testing it on a different remaining fold are two tasks that are not connected. Because they are not connected, we can handle them to different workers and process them in parallel.
Scikit-learn has parallization implemented using its n_jobs
option, but we don’t need to rely on its ecosystem to parallelize model selection. Instead, we will use the multiprocessing
library directly.
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import StratifiedKFold
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, random_state=123)
We can use scikit-learn
to conveniently generate the indices for the training and test data for a number of cross-validation folds
splitter = StratifiedKFold(n_splits=3, shuffle=True, random_state=123)
folds = list(splitter.split(X, y))
len(folds)
3
len(folds[0])
2
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
def benchmark_models(X, y, split):
"""
Helper function to benchmark models
X : array
y : array
split : tuple
Training and test indices (train_idx, test_idx)
"""
X_train, y_train = X[split[0],:], y[split[0]]
X_test, y_test = X[split[1],:], y[split[1]]
model_library = {}
# One candidate model
model_library["logit"] = LogisticRegression(solver='liblinear')
# Another candidate model
model_library["rf"] = RandomForestClassifier(n_estimators=100, min_samples_leaf=20)
results = {}
for model_name, model in model_library.items():
# Train the model
model.fit(X_train, y_train)
# Make predictions on the test data
pred_test = model.predict_proba(X_test)[:,1]
# Evaluate the model
results[model_name] = roc_auc_score(y_test, pred_test)
return pd.DataFrame(results, index = ["ROC-AUC"])
Test the function
benchmark_models(X,y,split=folds[0])
logit | rf | |
---|---|---|
ROC-AUC | 0.865646 | 0.88949 |
The multiprocessing
library is not particularly geared towards statistics, like scikit-learn. It was difficult for me to figure out which of its functionality is useful for our problem of parallelizing cross validation. To sum up our requirements, we want to:
import multiprocessing as mp
We first specify how many processes we want to run in parallel. This number is restricted by the number of cores available. I like to use all but one core on my machine (to avoid programs freezing). On a shared computation server, make sure you understand the policy and be polite by leaving resources for others.
#pool = mp.Pool(3)
# Python can count the available cores for you in most cases
pool = mp.Pool(mp.cpu_count()-1)
Function apply_async
can be used to send function calls including additional arguments to one of the processes in the pool. In contrast to apply
(without the _async
), the program will not wait for each call to be completed before moving on. We can therefore assign the first cross-validation iteration and immidiately assign the second iteration before the first iteration is completed.
There is a drawback to apply_async
in that it does not return the result after the call complete. Instead, it returns another object with a get()
method. A more convenient solution is a callback. The callback function will be called on the result once the function call is completed. So we’ll specify a list for the results and a callback to save each result into that list.
results = []
def log_result(x):
results.append(x)
An important intuition with apply
and apply_async
is that we assign a single function call to a worker when we call the function. In contrast, the map
functionality would assign a list of tasks to available workers at once. apply_async
calls the workers into your office one by one to explain their task to them.
IMPORTANT: The results will not come back in the same order as we assigned the tasks. If we want to match the results to each fold, then we should pass an identifier to the function.
for fold in folds:
pool.apply_async(benchmark_models, args=(X, y, fold), callback = log_result)
After assigning each task the program moves on without waiting for the result from the worker. That was convenient when we assigned the tasks and didn’t want to wait for the first result before assigning the second task. But we ususally want to wait for all results before moving on with script and, for example, average the results.
We tell the program to wait for all workers to complete their tasks using the method join()
. Before we do so, we are required to make sure that no new tasks are assigned, which we do by using close()
on the pool.
# Close the pool for new tasks
pool.close()
# Wait for all tasks to complete at this point
pool.join()
After collecting the results, we can work with the data as usual.
result = pd.concat(results, axis=0, sort=True)
result
logit | rf | |
---|---|---|
ROC-AUC | 0.843773 | 0.911090 |
ROC-AUC | 0.865646 | 0.885977 |
ROC-AUC | 0.893829 | 0.909498 |
For cross validation, we would usually average the results over all splits and then compare our models.
result.index.name = "metric"
result.reset_index()
average = result.groupby(['metric']).mean()
average
logit | rf | |
---|---|---|
metric | ||
ROC-AUC | 0.867749 | 0.902188 |