$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$

Looking under the hood of pipelines

Introduction

The ability to chain together different transformers and predictors is an important feature for any Machine Learning (ML) library. In FlinkML we wanted to provide an intuitive API, and at the same time utilize the capabilities of the Scala language to provide type-safe implementations of our pipelines. What we hope to achieve then is an easy to use API, that protects users from type errors at pre-flight (before the job is launched) time, thereby eliminating cases where long running jobs are submitted to the cluster only to see them fail due to some error in the series of data transformations that commonly happen in an ML pipeline.

In this guide then we will describe the choices we made during the implementation of chainable transformers and predictors in FlinkML, and provide guidelines on how developers can create their own algorithms that make use of these capabilities.

The what and the why

So what do we mean by “ML pipelines”? Pipelines in the ML context can be thought of as chains of operations that have some data as input, perform a number of transformations to that data, and then output the transformed data, either to be used as the input (features) of a predictor function, such as a learning model, or just output the transformed data themselves, to be used in some other task. The end learner can of course be a part of the pipeline as well. ML pipelines can often be complicated sets of operations (in-depth explanation) and can become sources of errors for end-to-end learning systems.

The purpose of ML pipelines is then to create a framework that can be used to manage the complexity introduced by these chains of operations. Pipelines should make it easy for developers to define chained transformations that can be applied to the training data, in order to create the end features that will be used to train a learning model, and then perform the same set of transformations just as easily to unlabeled (test) data. Pipelines should also simplify cross-validation and model selection on these chains of operations.

Finally, by ensuring that the consecutive links in the pipeline chain “fit together” we also avoid costly type errors. Since each step in a pipeline can be a computationally-heavy operation, we want to avoid running a pipelined job, unless we are sure that all the input/output pairs in a pipeline “fit”.

Pipelines in FlinkML

The building blocks for pipelines in FlinkML can be found in the ml.pipeline package. FlinkML follows an API inspired by sklearn which means that we have Estimator, Transformer and Predictor interfaces. For an in-depth look at the design of the sklearn API the interested reader is referred to this paper. In short, the Estimator is the base class from which Transformer and Predictor inherit. Estimator defines a fit method, and Transformer also defines a transform method and Predictor defines a predict method.

The fit method of the Estimator performs the actual training of the model, for example finding the correct weights in a linear regression task, or the mean and standard deviation of the data in a feature scaler. As evident by the naming, classes that implement Transformer are transform operations like scaling the input and Predictor implementations are learning algorithms such as Multiple Linear Regression. Pipelines can be created by chaining together a number of Transformers, and the final link in a pipeline can be a Predictor or another Transformer. Pipelines that end with Predictor cannot be chained any further. Below is an example of how a pipeline can be formed:

// Training data
val input: DataSet[LabeledVector] = ...
// Test data
val unlabeled: DataSet[Vector] = ...

val scaler = StandardScaler()
val polyFeatures = PolynomialFeatures()
val mlr = MultipleLinearRegression()

// Construct the pipeline
val pipeline = scaler
  .chainTransformer(polyFeatures)
  .chainPredictor(mlr)

// Train the pipeline (scaler and multiple linear regression)
pipeline.fit(input)

// Calculate predictions for the testing data
val predictions: DataSet[LabeledVector] = pipeline.predict(unlabeled)

As we mentioned, FlinkML pipelines are type-safe. If we tried to chain a transformer with output of type A to another with input of type B we would get an error at pre-flight time if A != B. FlinkML achieves this kind of type-safety through the use of Scala’s implicits.

Scala implicits

If you are not familiar with Scala’s implicits we can recommend this excerpt from Martin Odersky’s “Programming in Scala”. In short, implicit conversions allow for ad-hoc polymorphism in Scala by providing conversions from one type to another, and implicit values provide the compiler with default values that can be supplied to function calls through implicit parameters. The combination of implicit conversions and implicit parameters is what allows us to chain transform and predict operations together in a type-safe manner.

Operations

As we mentioned, the trait (abstract class) Estimator defines a fit method. The method has two parameter lists (i.e. is a curried function). The first parameter list takes the input (training) DataSet and the parameters for the estimator. The second parameter list takes one implicit parameter, of type FitOperation. FitOperation is a class that also defines a fit method, and this is where the actual logic of training the concrete Estimators should be implemented. The fit method of Estimator is essentially a wrapper around the fit method of FitOperation. The predict method of Predictor and the transform method of Transform are designed in a similar manner, with a respective operation class.

In these methods the operation object is provided as an implicit parameter. Scala will look for implicits in the companion object of a type, so classes that implement these interfaces should provide these objects as implicit objects inside the companion object.

As an example we can look at the StandardScaler class. StandardScaler extends Transformer, so it has access to its fit and transform functions. These two functions expect objects of FitOperation and TransformOperation as implicit parameters, for the fit and transform methods respectively, which StandardScaler provides in its companion object, through transformVectors and fitVectorStandardScaler:

class StandardScaler extends Transformer[StandardScaler] {
  ...
}

object StandardScaler {

  ...

  implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] {
    override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T])
      : Unit = {
        ...
      }

  implicit def transformVectors[T <: Vector: VectorConverter: TypeInformation: ClassTag] = {
      new TransformOperation[StandardScaler, T, T] {
        override def transform(
          instance: StandardScaler,
          transformParameters: ParameterMap,
          input: DataSet[T])
        : DataSet[T] = {
          ...
        }

}

Note that StandardScaler does not override the fit method of Estimator or the transform method of Transformer. Rather, its implementations of FitOperation and TransformOperation override their respective fit and transform methods, which are then called by the fit and transform methods of Estimator and Transformer. Similarly, a class that implements Predictor should define an implicit PredictOperation object inside its companion object.

Types and type safety

Apart from the fit and transform operations that we listed above, the StandardScaler also provides fit and transform operations for input of type LabeledVector. This allows us to use the algorithm for input that is labeled or unlabeled, and this happens automatically, depending on the type of the input that we give to the fit and transform operations. The correct implicit operation is chosen by the compiler, depending on the input type.

If we try to call the fit or transform methods with types that are not supported we will get a runtime error before the job is launched. While it would be possible to catch these kinds of errors at compile time as well, the error messages that we are able to provide the user would be much less informative, which is why we chose to throw runtime exceptions instead.

Chaining

Chaining is achieved by calling chainTransformer or chainPredictor on an object of a class that implements Transformer. These methods return a ChainedTransformer or ChainedPredictor object respectively. As we mentioned, ChainedTransformer objects can be chained further, while ChainedPredictor objects cannot. These classes take care of applying fit, transform, and predict operations for a pair of successive transformers or a transformer and a predictor. They also act recursively if the length of the chain is larger than two, since every ChainedTransformer defines a transform and fit operation that can be further chained with more transformers or a predictor.

It is important to note that developers and users do not need to worry about chaining when implementing their algorithms, all this is handled automatically by FlinkML.

How to Implement a Pipeline Operator

In order to support FlinkML’s pipelining, algorithms have to adhere to a certain design pattern, which we will describe in this section. Let’s assume that we want to implement a pipeline operator which changes the mean of your data. Since centering data is a common pre-processing step in many analysis pipelines, we will implement it as a Transformer. Therefore, we first create a MeanTransformer class which inherits from Transformer

class MeanTransformer extends Transformer[MeanTransformer] {}

Since we want to be able to configure the mean of the resulting data, we have to add a configuration parameter.

class MeanTransformer extends Transformer[MeanTransformer] {
  def setMean(mean: Double): this.type = {
    parameters.add(MeanTransformer.Mean, mean)
    this
  }
}

object MeanTransformer {
  case object Mean extends Parameter[Double] {
    override val defaultValue: Option[Double] = Some(0.0)
  }

  def apply(): MeanTransformer = new MeanTransformer
}

Parameters are defined in the companion object of the transformer class and extend the Parameter class. Since the parameter instances are supposed to act as immutable keys for a parameter map, they should be implemented as case objects. The default value will be used if no other value has been set by the user of this component. If no default value has been specified, meaning that defaultValue = None, then the algorithm has to handle this situation accordingly.

We can now instantiate a MeanTransformer object and set the mean value of the transformed data. But we still have to implement how the transformation works. The workflow can be separated into two phases. Within the first phase, the transformer learns the mean of the given training data. This knowledge can then be used in the second phase to transform the provided data with respect to the configured resulting mean value.

The learning of the mean can be implemented within the fit operation of our Transformer, which it inherited from Estimator. Within the fit operation, a pipeline component is trained with respect to the given training data. The algorithm is, however, not implemented by overriding the fit method but by providing an implementation of a corresponding FitOperation for the correct type. Taking a look at the definition of the fit method in Estimator, which is the parent class of Transformer, reveals what why this is the case.

trait Estimator[Self] extends WithParameters with Serializable {
  that: Self =>

  def fit[Training](
      training: DataSet[Training],
      fitParameters: ParameterMap = ParameterMap.Empty)
      (implicit fitOperation: FitOperation[Self, Training]): Unit = {
    FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
    fitOperation.fit(this, fitParameters, training)
  }
}

We see that the fit method is called with an input data set of type Training, an optional parameter list and in the second parameter list with an implicit parameter of type FitOperation. Within the body of the function, first some machine learning types are registered and then the fit method of the FitOperation parameter is called. The instance gives itself, the parameter map and the training data set as a parameters to the method. Thus, all the program logic takes place within the FitOperation.

The FitOperation has two type parameters. The first defines the pipeline operator type for which this FitOperation shall work and the second type parameter defines the type of the data set elements. If we first wanted to implement the MeanTransformer to work on DenseVector, we would, thus, have to provide an implementation for FitOperation[MeanTransformer, DenseVector].

val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
  override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = {
    import org.apache.flink.ml.math.Breeze._
    val meanTrainingData: DataSet[DenseVector] = input
      .map{ x => (x.asBreeze, 1) }
      .reduce{
        (left, right) =>
          (left._1 + right._1, left._2 + right._2)
      }
      .map{ p => (p._1/p._2).fromBreeze }
  }
}

A FitOperation[T, I] has a fit method which is called with an instance of type T, a parameter map and an input DataSet[I]. In our case T=MeanTransformer and I=DenseVector. The parameter map is necessary if our fit step depends on some parameter values which were not given directly at creation time of the Transformer. The FitOperation of the MeanTransformer sums the DenseVector instances of the given input data set up and divides the result by the total number of vectors. That way, we obtain a DataSet[DenseVector] with a single element which is the mean value.

But if we look closely at the implementation, we see that the result of the mean computation is never stored anywhere. If we want to use this knowledge in a later step to adjust the mean of some other input, we have to keep it around. And here is where the parameter of type MeanTransformer which is given to the fit method comes into play. We can use this instance to store state, which is used by a subsequent transform operation which works on the same object. But first we have to extend MeanTransformer by a member field and then adjust the FitOperation implementation.

class MeanTransformer extends Transformer[Centering] {
  var meanOption: Option[DataSet[DenseVector]] = None

  def setMean(mean: Double): Mean = {
    parameters.add(MeanTransformer.Mean, mu)
  }
}

val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
  override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = {
    import org.apache.flink.ml.math.Breeze._

    instance.meanOption = Some(input
      .map{ x => (x.asBreeze, 1) }
      .reduce{
        (left, right) =>
          (left._1 + right._1, left._2 + right._2)
      }
      .map{ p => (p._1/p._2).fromBreeze })
  }
}

If we look at the transform method in Transformer, we will see that we also need an implementation of TransformOperation. A possible mean transforming implementation could look like the following.

val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] {
  override def transform(
      instance: MeanTransformer,
      transformParameters: ParameterMap,
      input: DataSet[DenseVector])
    : DataSet[DenseVector] = {
    val resultingParameters = parameters ++ transformParameters

    val resultingMean = resultingParameters(MeanTransformer.Mean)

    instance.meanOption match {
      case Some(trainingMean) => {
        input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean, "trainingMean")
      }
      case None => throw new RuntimeException("MeanTransformer has not been fitted to data.")
    }
  }
}

class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector, DenseVector] {
  var trainingMean: DenseVector = null

  override def open(parameters: Configuration): Unit = {
    trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0)
  }

  override def map(vector: DenseVector): DenseVector = {
    import org.apache.flink.ml.math.Breeze._

    val result = vector.asBreeze - trainingMean.asBreeze + resultingMean

    result.fromBreeze
  }
}

Now we have everything implemented to fit our MeanTransformer to a training data set of DenseVector instances and to transform them. However, when we execute the fit operation

val trainingData: DataSet[DenseVector] = ...
val meanTransformer = MeanTransformer()

meanTransformer.fit(trainingData)

we receive the following error at runtime: "There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]". The reason is that the Scala compiler could not find a fitting FitOperation value with the right type parameters for the implicit parameter of the fit method. Therefore, it chose a fallback implicit value which gives you this error message at runtime. In order to make the compiler aware of our implementation, we have to define it as an implicit value and put it in the scope of the MeanTransformer's companion object.

object MeanTransformer{
  implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] ...

  implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] ...
}

Now we can call fit and transform of our MeanTransformer with DataSet[DenseVector] as input. Furthermore, we can now use this transformer as part of an analysis pipeline where we have a DenseVector as input and expected output.

val trainingData: DataSet[DenseVector] = ...

val mean = MeanTransformer.setMean(1.0)
val polyFeatures = PolynomialFeatures().setDegree(3)

val pipeline = mean.chainTransformer(polyFeatures)

pipeline.fit(trainingData)

It is noteworthy that there is no additional code needed to enable chaining. The system automatically constructs the pipeline logic using the operations of the individual components.

So far everything works fine with DenseVector. But what happens, if we call our transformer with LabeledVector instead?

val trainingData: DataSet[LabeledVector] = ...

val mean = MeanTransformer()

mean.fit(trainingData)

As before we see the following exception upon execution of the program: "There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]". It is noteworthy, that this exception is thrown in the pre-flight phase, which means that the job has not been submitted to the runtime system. This has the advantage that you won’t see a job which runs for a couple of days and then fails because of an incompatible pipeline component. Type compatibility is, thus, checked at the very beginning for the complete job.

In order to make the MeanTransformer work on LabeledVector as well, we have to provide the corresponding operations. Consequently, we have to define a FitOperation[MeanTransformer, LabeledVector] and TransformOperation[MeanTransformer, LabeledVector, LabeledVector] as implicit values in the scope of MeanTransformer’s companion object.

object MeanTransformer {
  implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector] ...

  implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer, LabeledVector, LabeledVector] ...
}

If we wanted to implement a Predictor instead of a Transformer, then we would have to provide a FitOperation, too. Moreover, a Predictor requires a PredictOperation which implements how predictions are calculated from testing data.