Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:10
Show Gist options
  • Save dacr/c56a579cca9daf3290ee2aae4f347f3c to your computer and use it in GitHub Desktop.
Save dacr/c56a579cca9daf3290ee2aae4f347f3c to your computer and use it in GitHub Desktop.
Drools banking example application wired to a kafka topic / published by https://github.com/dacr/code-examples-manager #c7c25a3a-9167-4950-b390-42187623e086/851299fb2f8e81747800934dfbe574765040e6ec
// summary : Drools banking example application wired to a kafka topic
// keywords : scala, drools, mvel, ai, knowledgebase, kafka, cep, complex-event-processing
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : c7c25a3a-9167-4950-b390-42187623e086
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// created-on : 2019-10-03T08:46:39+02:00
// managed-by : https://github.com/dacr/code-examples-manager
/*
PLEASE FIRST START A REPL SESSION AND EVAL THE CONTENT OF
the "drools-kb-banking-events-stream-generator.sc" SCRIPT
to start an embedded kafka server and get simple way to generate events
(See : https://gist.github.com/dacr/da70d346f0e477c1a6e78bc15405cfd3)
then quickly makes such call on THE REPL :
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32)
And see the effect on this script
*/
import $ivy.`fr.janalyse::drools-scripting:1.0.11`
import $ivy.`org.apache.kafka::kafka:2.6.0`
import fr.janalyse.droolscripting._
import org.apache.kafka.clients.consumer.KafkaConsumer
val drl =
"""package banking
|
|global org.slf4j.Logger logger
|
|declare OperationRequest @role(event) @timestamp(timestamp) @expires(10s)
| timestamp: java.util.Date
| operationId: String
| fromAccountId: String
| toAccountId: String
| label:String
| amount: double
|end
|
|declare Account
| id:String
| balance:double
|end
|// -------------------------------------------------------------------------------
|rule "init" when then
| insert(new Account("aaa-bbb", 100000));
| insert(new Account("bbb-ddd", 100000));
|end
|// -------------------------------------------------------------------------------
|rule "operation received" when
| OperationRequest($timestamp:timestamp, $label:label)
|then
| logger.info($timestamp+" "+$label);
|end
|// -------------------------------------------------------------------------------
|rule "two many operations in a short period of time"
|when
| $first:OperationRequest($from:fromAccountId, $to:toAccountId)
| $second:OperationRequest(this after[0,10s] $first, this != $first, fromAccountId == $from)
|then
| logger.warn("OPERATION CANCELLED - TOO NEAR FROM PREVIOUS ONE");
| delete($second)
|end
|// -------------------------------------------------------------------------------
|rule "update balance" no-loop when
| $operation: OperationRequest(
| $fromId:fromAccountId,
| $toId:toAccountId,
| $amount:amount
| )
| $from:Account(id == $fromId, $fromBalance:balance, $fromBalance-$amount>0)
| $to:Account(id == $toId, $toBalance:balance)
|then
| modify($from) {
| setBalance($fromBalance-$amount);
| }
| modify($to) {
| setBalance($toBalance+$amount);
| }
|end
|// -------------------------------------------------------------------------------
|""".stripMargin
val config = DroolsEngineConfig(
withDroolsLogging = true,
equalsWithIdentity = false,
pseudoClock = false
)
val engine = DroolsEngine(drl,config)
val props = new java.util.Properties()
props.put("bootstrap.servers", "127.0.0.1:4242")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest") // latest message; versus "earliest"
props.put("group.id", "consumer-group")
import scala.jdk.CollectionConverters._
import java.time.Duration
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List("banking-operations").asJavaCollection)
while (true) {
val record = consumer.poll(Duration.ofSeconds(1)).asScala
for (data <- record.iterator) {
val json = data.value()
engine.insertJson(json, "banking.OperationRequest")
}
engine.fireAllRules()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment