Make crucial predictions as the data comes

Oct 9, 2019 | By Andrea Spina, Francesco Frontera

Walking by the hottest ITstreets in these days means you’ve likely heard about achieving Online Machine Learning, i.e. moving AI towards streaming scenario and exploiting the real-time capabilities along with new Artificial Intelligence techniques.  Moreover, you will also notice the lack of research related to this scope, despite the growing interest on it. If we try to investigate it a little bit deeper then, we realize that a former step is missing: nowadays, well-known streaming applications still don’t get the concept of Model Serving properly and industries still lean on lambda architecture in order to achieve the goal. Suppose a bank has a concrete frequently updated batch trained Machine Learning model (e.g. an optimized Gradient Descent applied to past buffer overflow attack attempts) and it wants to deploy the model directly to their own canary. 

Distributed IDS –  backed by the streaming system – in order to achieve real-time responses about the quality of the model. Notionally, the bank should have the opportunity to load automatically the trained model to the IDS and exploit it in real-time in order to compute predictions on incoming events, achieving persisted and always up to date covering,  online fraud detection and saving a lot of money. Unfortunately, it happens that the bank is forced to distribute the model across the infrastructure with a pre-defined layout, and – most of the time – you have to deploy directly your weight vector and compute predictions by hard programming math instructions on it; given the cumbersome reality, the bank will lean on the good safe old parallel batch job which investigates persisted events as they come available to disk. In order to solve this huge gap, herein we present Flink-JPMML(repo), a fresh-made open-source Scala library aimed at achieving Streaming Model serving predictions at scale on Apache Flink Real-Time engine.

Fast as Squirrels

Apache Flink is an open-source distributed streaming-first processing engine: it provides high-availability and exactly-once consistency as long as real-time complex event processing at ridiculous scale. Flink provides also batch computation as a sub-case of streaming. Radicalbit uses Flink at its core and still, it amazes for efficiency, robustness and scalability features, making itself perfectly fitting the core of a Kappa architecture. PMML states for Predictive Mark-Up Model Language and it represents a well-established standard for the persistence of Machine Learning models across different systems. PMML is based on a really efficient xml semantic which allows defining trained unsupervised/supervised, probabilistic and even deep learning models in order to persist a source-independent trained model, which can be imported/exported by any system, no matters about which one created it. We employed JPMML-evaluator library in order to adopt the standard within Flink-jpmml. Coming at this step, we’re ready to put our hands dirty.

 

User-defined Predictions like Flink API

First of all, In order to run Flink-JPMML add the following dependency: if you’re a sbt-er, then

io.radicalbit" %% "flink-jpmml-scala" % "0.5.0

For maven users instead

F1 Modeling: our use case for Telemetry Sports

The history of F1 motor racing and the use of telemetry as a way to monitor car setup and performance dates back to the 80s. The first electronic systems were installed onboard the car, collected information for only one lap and the data were then downloaded when the...

This is time for Time-series Databases

The history of Database management systems could be interpreted as a Darwinian evolution process. The dominance of relational databases gives way to the data warehouses one, which better adapt to the earliest business intelligence requirements; then, alongside the...

<dependencies>
  <dependency>
    <groupId>io.radicalbit</groupId>
    <artifactId>flink-jpmml-scala</artifactId>
    <version>0.5.0</version>
  </dependency>
</dependencies>

Probably you’ll need also to publish the library locally; in order to do that, follow these steps: 

  1. launch sbt interface within the flink- > sbt
  2. jump in flink-jpmml-scala project directory > project flink-jpmml-scala
  3. publish the library at your local repo > publishLocal 

At this point, Flink-JPMML expects scala-core, flink-streaming and flink-clients libraries as provided. Let’s go ahead. Wherever your PMML model resides, just provide the path to it.

val sourcePath = "/path/to/your/pmml/model.xml"

This will be the only thing you need to bother about: Flink-JPMML automatically checks the distributed backend accordingly to Flink by implementing a dedicated ModelReader.

import io.radicalbit.flink.pmml.scala.api.reader.ModelReader

val modelReader = ModelReader(sourcePath)

Now let’s define an input stream.

import org.apache.flink.streaming.api.scala._

case class IrisInput(pLength: Double, pWidth: Double, sLength: Double, sWidth: Double, timestamp: Long, color: Int, prediction: Option[String]) {
 def toVector: Vector = DenseVector(pLength, pWidth, sLength, sWidth)
 }

val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[IrisInput] = yourIrisSource(env)

Here we go. The following import

import io.radicalbit.flink.pmml.scala._


extends Flink DataStream with the evaluate method. Strictly speaking, it provides you the tool which let us achieve streaming predictions in real-time..

import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector

val out = events.evaluate(modelReader) { (event, model) =>

// flink pmml model requires to be evaluated against Flink Vectors
val vectorEvent: Vector = event.toVector

// now we can call model: PmmlModel predict method
val prediction = model.predict(vectorEvent)

// Prediction container own the prediction result as a ADT called Score
prediction match {
    
case Prediction(Score(value)) =>
// return the event with updated prediction
event.copy(kind = Some(computeKind(value)))
case Prediction(EmptyScore) =>
// return just the event
logger.info("It was not possible to predict event {}", event); event
}
out.print()

env.execute("Flink JPMML simple execution.")
}

private def computeKind(value: Double): String = {
    value match {
    case 1.0 => "Iris-setosa"
    case 2.0 => "Iris-versicolor"
    case 3.0 => "Iris-virginica"
    case _ => "other"
    }
}

Now you can take the sample PMML clustering model available here with the only duty to add class as output parameter; so lets simple add

<groupid>MiningField name="class" invalidValueTreatment="asIs" usageType="predicted"/</groupid>

to the mining fields list. Then, add class as output

<groupid>Output</groupid>
<groupid>OutputField name="PCluster" optype="class" dataType="integer" targetField="class" feature="entityId"/</groupid>
<groupid>/Output</groupid>

At this point we’re ready to execute our job: Flink-JPMML will send you a log message about the loading state:
17/05/24 14:33:11 INFO package$RichDataStream$$anon$1: Model has been read successfully, model name: k-means
Finally, we have the operator output against some random flowers.

IrisInput(5.7,1.8,2.5,0.7, 34, 1495635020923, Some(Other))
IrisInput(5.5,3.8,5.2,4.3, 93, 1495635020233, Some(Iris-setosa))
IrisInput(4.3,2.3,2.0,3.1, 122, 1495635020100, Some(Other))
IrisInput(5.1,5.7,4.8,2.1,255, 1495635020583, Some(Iris-versicolor))
IrisInput(4.2,0.8,0.9,2.6, 0, 1495635020921, Some(Iris-virginica))

Flink-JPMML brings also a shortcut in order to perform quick predictions over a DataStream of Flink vectors. This feature comes as follow:

val vectorStream = events.map(_.toVector) val predictions: (Prediction, Vector) = vectorStream.evaluate(reader)

This comes extremely useful if the user needs to apply concrete math preprocessing before the evaluation and only the prediction result is required (e.g. model quality assessment).

What happens behind the scenes?

Given a simple and ease to use API structure, flink-jpmml attempts to target out all the performance making Flink one of the most powerful distributed processing engines today.

The Reader

The ModelReader object aims at retrieving the PMML model from every Flink supported distributed system; namely speaking, it’s able to load from any supported distributed file system (e.g. HDFS, Alluxio). The model reader instance is delivered to the Task Managers and the latter will leverage the former’s API at operator materialization time only: that means the model is lazily ridden.

The Model

The library allows Flink to load the model by the employment of a singleton loader per Task Manager, so it does read independently from the number of sub-tasks running on each TM. This optimization lets Flink scale the model evaluation in thread-safety, considering that still really simple PMMLs can grow over several hundreds of MBs.

Evaluation As UDF

The evaluate method implements an underlying FlatMap implementation and it’s enriched by the above-described user-defined function, provided by the user as a partial function. Formerly, the idea was to create something a-la-flinkML, i.e. a core object shaped by strategy patterns in order to compute predictions just like you’d do if you make use of typical ML libraries. But at the end of the day, we’re performing a streaming task, so the user has the unbounded input event and the model as an instance of PmmlModel. Herein Flink-JPMML demands the user to compute the prediction only, but anyway, the UDF allows to apply any kind of custom operation and any serializable output type is allowed.

To the next step and Call to action

We introduced a scalable light-weight library called Flink-JPMML exploiting Apache Flink capabilities as a real-time processing engine and offering a brand-new way to serve any of your Machine Learning models exported with PMML standard. Along with the next post, we will discuss how Flink-JPMML lets the user manage NaN values and we will describe how the library handles failures; alongside, we will provide the reason behind Flink vector choice and we will point out the steps we expect to follow in order to keep this library better. We’d be really pleased to welcome new contributors to Flink-JPMML, just check the repository and the open issues.

Waiting for you 😉

flink-jpmml authors

Headquarter

Via Pirelli, 11
20124 Milan

Milan

Via Pietro Borsieri, 41
20159 Milan

Amsterdam

Egelantiersgracht, 99
1015 re Amsterdam

London

North end road, 404
sw6 1lu London

Radicalbit S.r.l. • P.Iva 09292580967‬ • Radicalbit benefits from the funds granted by POR Regione Veneto • Privacy Policy