Spark ML Model Tuning

The purpose of this post is to provide some basic guidelines and supporting code for the tuning of an ML pipeline based on a random forest algorithm using cross-validation along with some final consideration about the computational cost involved and possibility to train/tuning the model in parallel.

Tuning

A random forest algorithm takes few hyperparameters. The most important ones are probably numTrees and maxDepth (but in code example there is also some reference to maxBins). The number of trees in the forest (numTrees) is inversely proportional to the variance in predictions: more trees generally improve the accuracy but also the computational load of the training phase increase proportionally. Analogously, a deep tree generally lead to better metrics but also increase the risk of overfitting and the training computational cost. Tuning the model is the process of working out the optimal values for those hyperparameters and striking the right balance between costs and benefits.

Cross Validation

The idea is to split a given labelled dataset in chunks (folds) then iterates through them and, at each iteration, one chunk will be used as a validation set and the remaining as a training set. For example, with fold = 3, the process splits the data in three folds: a, b, c and then iterates three times:

1st iteration -> a + b = training, c = validation
2nd iteration -> a + c = training, b = validation
3rd iteration -> b + c = training, a = validation

To note that the whole process is indeed training and evaluating three times (as in this example folds = 3).

An Estimator can then be plugged in the cross validation process with the intention of keeping track of the relationship between certain hyperparameters values and some metrics and pick up the best performing model.

Data and Code

Code can be found here. Inspired by the Cambridge Analytica scandal, and as a warning that great power comes with great responsibility, the example uses a completely fictional and auto generated dataset where made up citizens are listed with the party they voted for (the label) along with some features (the usual suspects: age, income, gender, etc.):

+----+------+---+----+------+---------+-------+
|id  |gender|age|area|income|education|vote   |
+----+------+---+----+------+---------+-------+
|650 |female|99 |A   |57000 |1        |tory   |
|1523|male  |24 |B   |5500  |3        |labour |
|434 |male  |82 |D   |5500  |3        |liberal|
|174 |male  |26 |C   |69000 |1        |liberal|
+----+------+---+----+------+---------+-------+

Road Map

As mentioned, the data in the example is not only fictional but some statistical correlation has been artificially introduced in the dataset and the features engineering part is simplified but on the other hand that is all what is needed in order to illustrate the advantages of tuning. The focus need to be on the different outcome between a tuned versus a not tuned model. Quite obviously, a tuned model is expected to perform better than a non tuned one and the following road map should prove the point:

  1. Given a training dataset, train the model without any tuning
  2. Using the same training dataset, train the model using cross validation and an evaluator to work out the optimal hyperparameters values
  3. Train a new model using the optimal hyperparameters values and same training dataset at point 1
  4. Evaluate the model at point 1 (not tuned) and model at point 3 (tuned) against a given validation set and compare the results

Evaluation will be based on f1 metric (which takes into consideration precision and recall). Besides being the default, f1 seems the most natural choice for the example.

Tuning the Model

First point in the road map is to train a random forest model without any specific tuning (ie. default value for numTrees and maxDepth will be used)1. In the example that is done in a training method that takes as a parameter a given training set.

val vectorAssembler = new VectorAssembler().setInputCols(Array("f_gender", "age", "f_area", "income", "f_education"))
    .setOutputCol("features")
val genderIndexer = new StringIndexer().setInputCol("gender").setOutputCol("f_gender")
val areaIndexer = new StringIndexer().setInputCol("area").setOutputCol("f_area")
val educationIndexer = new StringIndexer().setInputCol("education").setOutputCol("f_education")
val voteIntentionIndexer = new StringIndexer().setInputCol("vote").setOutputCol("label")

def training(trainingSet: DataFrame): PipelineModel = {
  val transformationPipeline = new Pipeline()
  val classifier = new RandomForestClassifier().setSeed(5043) // set the seed for reproducibility
  val trainingPipeline: transformationPipeline.type = transformationPipeline.setStages(
    Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, classifier))
  trainingPipeline.fit(trainingSet)
}

Then the model can be evaluated against a given validation set and its performance expressed in terms of f1:

def validation(model: PipelineModel, validation: DataFrame) = {
    // Make predictions.
    val predictions = model.transform(validation)

    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("f1") // f1 is the default anyway

    val f1: Double = evaluator.evaluate(predictions)
    println("@@ f1 ====> " + f1)
  }

The whole flow is implemented in a test in VoteIntentionTrainingTest called train and evaluate model. Once executed it should return a value for f1 of 0.6736260921544567 and this will the benchmark to compare the tuned version against.

Better Hyperparameters Values

Figuring out the optimal hyperparameters value should be more interesting. A ParamGridBuilder allows to pass different candidates for each hyperparameter. Considering the Spark default values (numTrees = 20, maxDepth = 5), a good first approximation would be two values surrounding the default. For example numTrees is 20 hence passing an Array(15, 20, 25) will give an idea about whether better metrics can be obtained with values greater or smaller than the default one.

def tuning(trainingSet: DataFrame): CrossValidatorModel = {

  val rfClassifier = new RandomForestClassifier()
  val pipeline= new Pipeline().setStages(
    Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, rfClassifier))

  val nFolds: Int = 8
  val metric: String = "f1"
  val paramGrid = new ParamGridBuilder()
    .addGrid(rfClassifier.numTrees, Array(15, 20, 25))
    .addGrid(rfClassifier.maxDepth, Array(4, 5, 6))
    .build()
  val evaluator = new MulticlassClassificationEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName(metric)

  val cv: CrossValidator = new CrossValidator()
    .setEstimator(pipeline)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(evaluator)
    .setNumFolds(nFolds)
    .setParallelism(3)

  val fittedPipeline: CrossValidatorModel = cv.fit(trainingSet)
  val best: Model[_] = fittedPipeline.bestModel
  val stages: Seq[Transformer] = best.asInstanceOf[PipelineModel].stages.toList


  val paramMap = fittedPipeline.getEstimatorParamMaps
    .zip(fittedPipeline.avgMetrics)
    .maxBy(_._2)
    ._1

  println(s"@@@@ best num trees:     ${stages(5).asInstanceOf[RandomForestClassificationModel].getNumTrees}")
  println(s"@@@@ best max depth:     ${stages(5).asInstanceOf[RandomForestClassificationModel].getMaxDepth}")
  println(s"@@@@ best max bins:      ${stages(5).asInstanceOf[RandomForestClassificationModel].getMaxBins}")

  println(s"@@@@ best paramMap:      ${paramMap}")

  fittedPipeline
}

Once the pipeline is fitted we grab the best model, find out the optimal hyperparameters values (among the ones provided in ParamGridBuilder) and print them in the console. As a note, we are interested in stage 6 (so the 5th element in the array of stages) because the pipeline (as defined in this example) has 6 stages, the last one being the classifier (the one we are interested in) but paramMap returns the same results. The whole process is implemented in VoteIntentionTrainingTest as the test tuning a model and save it in $TunedModelPath and if executed should return the following optimal values:

num trees:     25
max depth:     6

We could refine those results further and for each hyperparameter pass new candidates in an interval around the optimal value. For example, in the numTrees case an Array(23, 24, 25, 26, 27) could be passed to the ParamGridBuilder with a new set candidates to fulfil the role of optimal value. I’ll omit that here but that should return the following values:

num trees:     24
max depth:     6

Tuned vs Not Tuned Model

The optimal values can now be used to override the default ones:

  def training(trainingSet: DataFrame): PipelineModel = {
    val transformationPipeline = new Pipeline()
    val classifier = new RandomForestClassifier()
      .setNumTrees(24) // <- Tuned value
      .setMaxDepth(6) // <- Tuned value
      .setSeed(5043) // set the seed for reproducibility
    val trainingPipeline: transformationPipeline.type = transformationPipeline.setStages(
      Array(genderIndexer, areaIndexer, educationIndexer, voteIntentionIndexer, vectorAssembler, classifier))
    trainingPipeline.fit(trainingSet)
  }

And an evaluation of the new model against the same validation dataset, should return an f1 of 0.6919821403245205 which is an improvement if compared to the previous value of f1 (0.6736260921544567) obtained with the default hyperparameters value.

numTrees = 20, maxDepth = 5 -> f1 = 0.6736260921544567 (default values)
numTrees = 24, maxDepth = 6 -> f1 = 0.6919821403245205

Training and Parallelism

It is worth noting that cross validation and the parameter grid increase substantially the computational cost of fitting a model. The first tuning example:

.addGrid(rfClassifier.numTrees, Array(15, 20, 25))
.addGrid(rfClassifier.maxDepth, Array(4, 5, 6))

will evaluate 9 different parameter combinations (3 * 3) and this on the top of the cross validation cost (8 folds). In this fictional example we are dealing with dataset intentionally small but in a real case scenario Spark and a distributed architecture are likely to make a big difference and we can take advantage of this via the setParallelism method.

val cv: CrossValidator = new CrossValidator()
      .setEstimator(pipeline)
      .setEstimatorParamMaps(paramGrid)
      .setEvaluator(evaluator)
      .setNumFolds(nFolds)
      .setParallelism(3)

That would set the max number of threads to use when running parallel algorithms (default is 1 for serial execution).

———————————

1. [In Spark 2.4.3 the default values are: numTrees = 20, maxDepth = 5, maxBins = 32]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s