Flink-JPMML – Streaming Machine Learning Model Serving on Flink (Part 1)

 

Make crucial predictions as the data comes

Walking by the hottest IT streets in these days means you’ve likely heard about achieving Online Machine Learning, i.e. moving AI towards streaming scenario and exploiting the real time capabilities along with new Artificial Intelligence techniques.  Alongside, you will listen also about lack of research related to this scope, in respect of which the audience is quickly expanding.

If we try to investigate it a little bit deeper then, we realize that a former step is missing: nowadays, well-known streaming applications still don’t get the concept of Model Serving properly and industries still lean on lambda architecture in order to achieve the goal.

Suppose a bank has a concrete frequently updated batch trained Machine Learning model (e.g. an optimized Gradient Descent applied to past buffer overflow attack attempts) and it wants to deploy the model directly to their own canary Distributed IDS –  backed by the streaming system – in order to achieve real time responses about the quality of the model.


Notionally, the bank should have the opportunity to load automatically the trained model to the IDS and exploit it in real time in order to compute predictions on incoming events, achieving persisted and always up to date covering,  online fraud detection and saving a lot of money; unfortunately, it happens that the bank is forced to distribute the model across the infrastructure with a pre-defined layout, and – most of the time – you have to deploy directly your weight vector and compute predictions by hard programming math instructions on it; given the cumbersome reality, the bank will lean on the good safe old parallel batch job which investigate persisted events as they come available to disk.

In order to solve this huge gap, herein we present Flink-JPMML(repo), a fresh-made open-source Scala library aimed to achieve Streaming Model serving predictions at scale on Apache Flink Real Time engine.

Fast as Squirrels

Apache Flink is an open-source distributed streaming-first processing engine: it provides high-availability and exactly-once consistency as long as real time complex event processing at ridiculous scale. Flink provides also batch computation as sub-case of streaming. Radicalbit employs Flink at its core and still it amazes for efficiency, robustness and scalability features, making itself perfectly fitting the core of a Kappa architecture.

PMML states for Predictive Mark-Up Model Language and it represents a well-established standard for persistence of Machine Learning models across different systems. PMML is based on a really efficient xml semantic which allows to define trained unsupervised/supervised, probabilistic and even deep learning models in order to persist a source-independent trained model, which can be imported/exported by any system, no matters about which one created it. We employed JPMML-evaluator library in order to adopt the standard within flink-jpmml.

Coming at this step, we’re ready to put our hands dirty.

User defined Predictions like Flink api

First of all, In order to run Flink-JPMML add the following dependency:

if you’re a sbt-er, then

"io.radicalbit" %% "flink-jpmml-scala" % "0.5.0"

For maven users instead

<dependencies>
  <dependency>
    <groupId>io.radicalbit</groupId>
    <artifactId>flink-jpmml-scala</artifactId>
    <version>0.5.0</version>
  </dependency>
</dependencies>

Probably you’ll need also to publish the library locally; in order to do that, follow these steps:

1. launch sbt interface within the flink-

> sbt

2. jump in flink-jpmml-scala project directory

> project flink-jpmml-scala

3. publish the library at your local repo

> publishLocal

 

At this point Flink-JPMML expects scala-core, flink-streaming and flink-clients libraries as provided.

Lets go ahead.

Wherever your PMML model resides, just provide the path to it.

val sourcePath = "/path/to/your/pmml/model.xml"

This will be the only thing you need to bother about: Flink-JPMML automatically check the distributed backend accordingly to Flink by implementing a dedicated ModelReader.

import io.radicalbit.flink.pmml.scala.api.reader.ModelReader

val modelReader = ModelReader(sourcePath)

Now lets define an input stream.

import org.apache.flink.streaming.api.scala._

case class IrisInput(pLength: Double, pWidth: Double, sLength: Double, sWidth: Double, timestamp: Long, color: Int, prediction: Option[String]) {
 def toVector: Vector = DenseVector(pLength, pWidth, sLength, sWidth)
 }

val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[IrisInput] = yourIrisSource(env)

Here we go. The following import

import io.radicalbit.flink.pmml.scala._

extends Flink DataStream with the evaluate method. Strictly speaking, it provides to you the tool which let us to achieve streaming predictions in real time.

import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector 

val out = events.evaluate(modelReader) { (event, model) =>

  // flink pmml model requires to be evaluated against Flink Vectors 
  val vectorEvent: Vector = event.toVector 

  // now we can call model: PmmlModel predict method 
  val prediction = model.predict(vectorEvent) 

  // Prediction container own the prediction result as a ADT called Score 
  prediction match { 
    case Prediction(Score(value)) => 
      // return the event with updated prediction 
      event.copy(kind = Some(computeKind(value))) 
    case Prediction(EmptyScore) => 
      // return just the event 
      logger.info("It was not possible to predict event {}", event); event 
  } 
  out.print()

  env.execute("Flink JPMML simple execution.") 
} 

private def computeKind(value: Double): String = { 
  value match { 
    case 1.0 => "Iris-setosa" 
    case 2.0 => "Iris-versicolor" 
    case 3.0 => "Iris-virginica" 
    case _ => "other" 
  } 
}

Now you can take the sample PMML clustering model available here with the only duty to add class as output parameter; so lets simple add

<MiningField name="class" invalidValueTreatment="asIs" usageType="predicted"/>

to the mining fields list.

Then, add class as output

<Output>
    <OutputField name="PCluster" optype="class" dataType="integer" targetField="class" feature="entityId"/>
</Output>

At this point we’re ready to execute our job: Flink-JPMML will send you a log message about the loading state:

17/05/24 14:33:11 INFO package$RichDataStream$$anon$1: Model has been read successfully, model name: k-means

Finally we have the operator output against some random flowers.

IrisInput(5.7,1.8,2.5,0.7, 34, 1495635020923, Some(Other))
IrisInput(5.5,3.8,5.2,4.3, 93, 1495635020233, Some(Iris-setosa))
IrisInput(4.3,2.3,2.0,3.1, 122, 1495635020100, Some(Other))
IrisInput(5.1,5.7,4.8,2.1, 255, 1495635020583, Some(Iris-versicolor))
IrisInput(4.2,0.8,0.9,2.6, 0, 1495635020921, Some(Iris-virginica))

Flink-JPMML brings also a shortcut in order to perform quick predictions over a DataStream of Flink vectors. This feature comes as follow

val vectorStream = events.map(_.toVector) val predictions: (Prediction, Vector) = vectorStream.evaluate(reader)

This comes extremely useful if the user needs to apply concrete math preprocessing before the evaluation and only the prediction result is required (e.g. model quality assessment).

What happens behind the scenes?

Given a simple and ease to use API structure, flink-jpmml attempts to target out all the performance making Flink one of the most powerful distributed processing engines today.

The Reader

The ModelReader object aims to retrieve the PMML model from every Flink supported distributed system; namely speaking, it’s able to load from any supported distributed file system (e.g. HDFS, Alluxio). The model reader instance is delivered to the Task Managers and the latter will leverage the former’s API at operator materialization time only: that means the model is lazily ridden.

The Model

The library allows Flink to load the model by the employment of a singleton loader per Task Manager, so it does read independently from the number of sub-tasks running on each TM. This optimization lets Flink scale the model evaluation in thread-safety, considering that still really simple PMMLs can grow over several hundreds of MBs.

Evaluation As UDF

The evaluate method implements an underlying FlatMap implementation and it’s enriched by the above described user defined function, provided by the user as a partial function.
Formerly, the idea was to create something a-la-flinkML, i.e. a core object shaped by strategy patterns in order to compute predictions just like you’d do if you employ typical ML libraries. But at the end of the day we’re performing a streaming task, so the user has the unbounded input event and the model as instance of PmmlModel. Herein Flink-JPMML demands the user to compute the prediction only, but anyway the UDF allows to apply any kind of custom operation and any serializable output type is allowed.

To the next step and Call to action

We introduced a scalable light-weight library called Flink-JPMML exploiting Apache Flink capabilities as a real time processing engine and offering a brand-new way to serve any your Machine Learning model exported with PMML standard.
Along the next post we will discuss how Flink-JPMML lets the user manage NaN values and we will describe how the library handles failures; alongside, we will provide the reason behind Flink vector choice and we will point out the steps we expect to follow in order to keep this library better.

We’d be really pleased to welcome new contributors to Flink-JPMML, just check the repository and the open issues.

Waiting for you 😉

flink-jpmml authors

Francesco Frontera
francesco.frontera@radicalbit.io
Andrea Spina
andrea.spina@radicalbit.io