- Published on
Creating a Custom Cross-Validation Function in PySpark
- Authors
- Name
Introduction
Lately, I have been using PySpark in my data processing and modeling pipeline. While Spark is great for most data processing needs, the machine learning component is slightly lacking. Coming from R and Python's scikit-learn where there are so many machine learning packages available, this limitation is frustrating. Having said that, there are ongoing efforts to improve the machine learning library so hopefully there would be more functionalities in the future.
One of the problems that I am solving involves a time series component to the task of prediction. As such, k-fold cross-validation techniques, which is available in PySpark, would not give an accurate representation of the model's performance. For such problems doing a rolling window approach to cross-validation is much better i.e. repeating the process of training the model on a lagged time period and testing the performance on a recent period.
However, other variants of cross-validation is not supported by PySpark. As of PySpark 2.3 it supports a k-fold version and a simple random split into train / test dataset. Normally, it would be difficult to create a customise algorithm on PySpark as most of the functions call their Scala equivalent, which is the native language of Spark. Thankfully, the cross-validation function is largely written using base PySpark functions before being parallelise as tasks and distributed for computation. The rest of this post discusses my implementation of a custom cross-validation class.
Implementation
First, we will use the CrossValidator
class as a template to base our new class on. The two main portions that need to be changed are the __init__
and _fit
functions. Let's take a look at the __init__
function first.
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
seed=None, parallelism=1):
super(CrossValidator, self).__init__()
self._setDefault(numFolds=3, parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)
Rather than the typical self.input = input
kind of statements, PySpark uses a decorator (@keyword_only
) to assign the inputs as params. So this means that we would have to define additional params before assigning them as inputs when initialising the class.
Now let us examine the _fit
function:
def _fit(self, dataset):
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
nFolds = self.getOrDefault(self.numFolds)
seed = self.getOrDefault(self.seed)
h = 1.0 / nFolds
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
metrics = [0.0] * numModels
pool = ThreadPool(processes=min(self.getParallelism(), numModels))
for i in range(nFolds):
validateLB = i * h
validateUB = (i + 1) * h
condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
validation = df.filter(condition).cache()
train = df.filter(~condition).cache()
tasks = _parallelFitTasks(est, train, eva, validation, epm)
for j, metric in pool.imap_unordered(lambda f: f(), tasks):
metrics[j] += (metric / nFolds)
validation.unpersist()
train.unpersist()
The main thing to note here is the way to retrieve the value of a parameter using the getOrDefault
function. We also see how PySpark implements the k-fold cross-validation by using a column of random numbers and using the filter
function to select the relevant fold to train and test on. That would be the main portion which we will change when implementing our custom cross-validation function. In addition, I would also like to print some information on the progress status of the task as well as the results of the cross-validation.
Here's the full custom cross-validation class. It loops through a dictionary of datasets and identifies which column to train and test via the cvCol and splitWord inputs. This is actually the second version of my cross-validation class. The first one runs on a merged dataset but in some cases the union operation messes up the metadata so I modified the code to take in a dictionary as an input insted.
class CustomCrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
"""
Modifies CrossValidator allowing custom train and test dataset to be passed into the function
Bypass generation of train/test via numFolds
instead train and test set is user define
"""
splitWord = Param(Params._dummy(), "splitWord", "Tuple to split train and test set e.g. ('train', 'test')",
typeConverter=TypeConverters.toListString)
cvCol = Param(Params._dummy(), "cvCol", "Column name to filter train and test list",
typeConverter=TypeConverters.toString)
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None,
splitWord = ('train', 'test'), cvCol = 'cv', seed=None, parallelism=1):
super(CustomCrossValidator, self).__init__()
self._setDefault(parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)
def _fit(self, dataset):
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
nFolds = len(dataset)
seed = self.getOrDefault(self.seed)
metrics = [0.0] * numModels
matrix_metrics = [[0 for x in range(nFolds)] for y in range(len(epm))]
pool = ThreadPool(processes=min(self.getParallelism(), numModels))
for i in range(nFolds):
validation = dataset[list(dataset.keys())[i]].filter(col(self.getOrDefault(self.cvCol))==(self.getOrDefault(self.splitWord))[0]).cache()
train = dataset[list(dataset.keys())[i]].filter(col(self.getOrDefault(self.cvCol))==(self.getOrDefault(self.splitWord))[1]).cache()
print('fold {}'.format(i))
tasks = _parallelFitTasks(est, train, eva, validation, epm)
for j, metric in pool.imap_unordered(lambda f: f(), tasks):
# print(j, metric)
matrix_metrics[j][i] = metric
metrics[j] += (metric / nFolds)
# print(metrics)
validation.unpersist()
train.unpersist()
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
for i in range(len(metrics)):
print(epm[i], 'Detailed Score {}'.format(matrix_metrics[i]), 'Avg Score {}'.format(metrics[i]))
print('Best Model: ', epm[bestIndex], 'Detailed Score {}'.format(matrix_metrics[bestIndex]),
'Avg Score {}'.format(metrics[bestIndex]))
### Do not bother to train on full dataset, just the latest train supplied
# bestModel = est.fit(dataset, epm[bestIndex])
bestModel = est.fit(train, epm[bestIndex])
return self._copyValues(CrossValidatorModel(bestModel, metrics))
Let's test it out on a similar example as the one in the source code:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext()
spark = SQLContext(sc)
from CustomCrossValidatorDict import CustomCrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder
d = {}
d['df1'] = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0, 'train'),
(Vectors.dense([0.4]), 1.0, 'train'),
(Vectors.dense([0.5]), 0.0, 'train'),
(Vectors.dense([0.6]), 1.0, 'train'),
(Vectors.dense([1.0]), 1.0, 'train'),
(Vectors.dense([0.0]), 0.0, 'test'),
(Vectors.dense([0.4]), 1.0, 'test'),
(Vectors.dense([0.5]), 0.0, 'test'),
(Vectors.dense([0.6]), 1.0, 'test'),
(Vectors.dense([1.0]), 1.0, 'test')] * 10,
["features", "label", 'cv'])
d['df2'] = spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0, 'train'),
(Vectors.dense([0.4]), 1.0, 'train'),
(Vectors.dense([0.5]), 0.0, 'train'),
(Vectors.dense([0.6]), 1.0, 'train'),
(Vectors.dense([1.0]), 1.0, 'train'),
(Vectors.dense([0.0]), 0.0, 'test'),
(Vectors.dense([0.4]), 1.0, 'test'),
(Vectors.dense([0.5]), 0.0, 'test'),
(Vectors.dense([0.6]), 1.0, 'test'),
(Vectors.dense([1.0]), 1.0, 'test')] * 10,
["features", "label", 'cv'])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1, 5]).build()
evaluator = BinaryClassificationEvaluator()
cv = CustomCrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
splitWord = ('train', 'test'), cvCol = 'cv', parallelism=4)
cv.extractParamMap()
{Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='parallelism', doc='the number of threads to use when running parallel algorithms (>= 1).'): 4,
Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='seed', doc='random seed.'): 7665653429569288359,
Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='estimator', doc='estimator to be cross-validated'): LogisticRegression_487fb6aaeb91e051211c,
Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='estimatorParamMaps', doc='estimator param maps'): [{Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 0},
{Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 1},
{Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 5}],
Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='evaluator', doc='evaluator used to select hyper-parameters that maximize the validator metric'): BinaryClassificationEvaluator_44cc9ebbba7a7a85e22e,
Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='splitWord', doc="Tuple to split train and test set e.g. ('train', 'test')"): ['train',
'test'],
Param(parent='CustomCrossValidator_4acca941d35632cf8f28', name='cvCol', doc='Column name to filter train and test list'): 'cv'}
cvModel = cv.fit(d)
fold 0
fold 1
{Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 0} Detailed Score [0.5, 0.5] Avg Score 0.5
{Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 1} Detailed Score [0.8333333333333333, 0.8333333333333333] Avg Score 0.8333333333333333
{Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 5} Detailed Score [0.8333333333333333, 0.8333333333333333] Avg Score 0.8333333333333333
Best Model: {Param(parent='LogisticRegression_487fb6aaeb91e051211c', name='maxIter', doc='max number of iterations (>= 0).'): 1} Detailed Score [0.8333333333333333, 0.8333333333333333] Avg Score 0.8333333333333333`
Concluding Thoughts
Hope this post has been useful! The custom cross-validation class is really quite handy. It can be used for time series problems as well as times when you want to test a model's performance over different geographical areas or customer segments. Took some time to work through the PySpark source code but my understanding of it has definitely improved after this episode.