Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Drools banking skeleton application wired to a kafka topic / published by https://github.com/dacr/code-examples-manager #cf0d98c9-9383-4297-b6f3-ff1fed217e23/77bdf833d53719c889c950c053656ad90e2d602e
// summary : Drools banking skeleton application wired to a kafka topic
// keywords : scala, drools, mvel, ai, knowledgebase, kafka, cep, complex-event-processing
// publish : gist
// authors : David Crosson
// license : Apache2
// id : cf0d98c9-9383-4297-b6f3-ff1fed217e23
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// created-on : 2019-10-03T08:46:39+02:00
/*
PLEASE FIRST START A REPL SESSION AND EVAL THE CONTENT OF
the "drools-kb-banking-events-stream-generator.sc" SCRIPT
to start an embedded kafka server and get simple way to generate events
(See : https://gist.github.com/dacr/da70d346f0e477c1a6e78bc15405cfd3)
then quickly makes such call on THE REPL :
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32)
And see the effect on this script
*/
import $ivy.`fr.janalyse::drools-scripting:1.0.11`
import $ivy.`org.apache.kafka::kafka:2.6.0`
import fr.janalyse.droolscripting._
import org.apache.kafka.clients.consumer.KafkaConsumer
val drl =
"""package banking
|
|declare OperationRequest @role(event) @timestamp(timestamp)
| timestamp: java.util.Date
| operationId: String
| fromAccountId: String
| toAccountId: String
| label:String
| amount: double
|end
|
|rule "Log events"
|when
| OperationRequest($label:label, $amount:amount)
|then
| System.out.println(""+$label+" "+$amount);
|end
|
|""".stripMargin
val config = DroolsEngineConfig(
withDroolsLogging = false,
equalsWithIdentity = false,
pseudoClock = false
)
val engine = DroolsEngine(drl,config)
val props = new java.util.Properties()
props.put("bootstrap.servers", "127.0.0.1:4242")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest") // latest message; versus "earliest"
props.put("group.id", "consumer-group")
import scala.jdk.CollectionConverters._
import java.time.Duration
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List("banking-operations").asJavaCollection)
while (true) {
val record = consumer.poll(Duration.ofSeconds(1)).asScala
for (data <- record.iterator) {
val json = data.value()
engine.insertJson(json, "banking.OperationRequest")
}
engine.fireAllRules()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment