Last active
April 2, 2023 10:10
-
-
Save dacr/b5f0335b7c218b57e413a63633cad9f4 to your computer and use it in GitHub Desktop.
Enhanced drools example banking application wired to a kafka topic / published by https://github.com/dacr/code-examples-manager #374073d7-78bb-4be6-8ee4-f0bcab68765e/197ca2822bf2c42838b6151614a3c45e5f47e1eb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Enhanced drools example banking application wired to a kafka topic | |
// keywords : scala, drools, mvel, ai, knowledgebase, kafka, cep, complex-event-processing | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 374073d7-78bb-4be6-8ee4-f0bcab68765e | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
// created-on : 2019-10-23T22:02:54+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
/* | |
PLEASE FIRST START A REPL SESSION AND EVAL THE CONTENT OF | |
the "drools-kb-banking-events-stream-generator.sc" SCRIPT | |
to start an embedded kafka server and get simple way to generate events | |
(See : https://gist.github.com/dacr/da70d346f0e477c1a6e78bc15405cfd3) | |
then quickly makes such call on THE REPL : | |
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32) | |
And see the effect on this script | |
*/ | |
import $ivy.`fr.janalyse::drools-scripting:1.0.11` | |
import $ivy.`org.apache.kafka::kafka:2.6.0` | |
import fr.janalyse.droolscripting._ | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
class BankingInterface() { | |
def alert(fromAccountId:String, toAccountId:String, amount:Double, msg:String):Unit = { | |
System.out.println(s"ALERT - $fromAccountId->$toAccountId $amount CFA : $msg") | |
// CALL to API rest to block | |
} | |
} | |
val drl = | |
"""package banking | |
| | |
|import java.util.LinkedList | |
| | |
|//global com.truc.BankingInterface bankingInterface; | |
| | |
|declare OperationRequest @role(event) @timestamp(timestamp) | |
| timestamp: java.util.Date | |
| operationId: String | |
| fromAccountId: String | |
| toAccountId: String | |
| label:String | |
| amount: double | |
|end | |
| | |
| | |
|declare Account | |
| id:String | |
| balance:double | |
|end | |
| | |
| | |
|// ------------------------------------------------------------------------------- | |
|rule "init" when then | |
| insert(new Account("aaa-bbb", 1000000)); | |
| insert(new Account("bbb-ddd", 1000000)); | |
|end | |
| | |
|// ------------------------------------------------------------------------------- | |
| | |
|rule "update balance" no-loop when | |
| $operation: OperationRequest( | |
| $timestamp:timestamp, | |
| $fromId:fromAccountId, | |
| $toId:toAccountId, | |
| $amount:amount | |
| ) | |
| $from:Account(id == $fromId, $fromBalance:balance, $fromBalance-$amount >= 0) | |
| $to:Account(id == $toId, $toBalance:balance) | |
|then | |
| modify($from) { | |
| setBalance($fromBalance - $amount); | |
| } | |
| modify($to) { | |
| setBalance($toBalance + $amount); | |
| } | |
| System.out.println(""+$timestamp+" UPDATE BALANCES : "+$fromId+":"+($fromBalance - $amount)+ " -> "+$toId+":"+($toBalance + $amount)); | |
|end | |
| | |
|// ------------------------------------------------------------------------------- | |
| | |
|rule "Log events" | |
|when | |
| $operation:OperationRequest($timestamp:timestamp,$label:label, $amount:amount) | |
|then | |
| System.out.println(""+$timestamp+" "+$label+" "+$amount); | |
|end | |
| | |
| | |
|rule "Transaction too high !" | |
| salience 1000 | |
|when | |
| $lastEvent:OperationRequest( | |
| $timestamp:timestamp, | |
| $lastAmount:amount, | |
| $label:label, | |
| amount > 100000) over windows:length(1) | |
|then | |
| System.out.println(""+$timestamp+" REJECTED TOO HIGH "+$label+" "+$lastAmount); | |
| delete($lastEvent); | |
|end | |
| | |
| | |
|rule "Too many important transaction" | |
| salience 1000 | |
|when | |
| $lastEvent:OperationRequest( | |
| $timestamp:timestamp, | |
| $fromAccountId:fromAccountId, | |
| $lastAmount:amount, | |
| amount > 10000) over windows:length(1) | |
| $events:LinkedList(size>2) from collect ( | |
| OperationRequest( | |
| fromAccountId==$fromAccountId, | |
| amount > 10000, | |
| this before[0s, 5m] $lastEvent) | |
| ) | |
|then | |
| System.out.println(""+$timestamp+" : TROP DE TRANSACTIONS IMPORTANTES"); | |
| delete($lastEvent); | |
|end | |
| | |
| | |
| | |
| | |
|rule "Duplicated amounts in a short period of time" | |
| salience 1100 | |
|when | |
| $lastEvent:OperationRequest( | |
| $timestamp:timestamp, | |
| $fromAccountId:fromAccountId, | |
| $toAccountId:toAccountId, | |
| $lastAmount:amount) over windows:length(1) | |
| $previousEvents:LinkedList(size>0) from collect ( | |
| OperationRequest( | |
| this != $lastEvent, | |
| fromAccountId==$fromAccountId, | |
| amount == $lastAmount, | |
| this before[0s, 10s] $lastEvent) | |
| ) | |
|then | |
| System.out.println(""+$timestamp+" TROP DE TRANSACTIONS DUPLIQUEES, deleting latest="+$lastEvent); | |
| //bankingInterface.alert($fromAccountId, $toAccountId, $lastAmount, "trop de transaction trop rapidement") | |
| //bankingInterface.suspisciousTransaction($lastEvent, $previousEvents) | |
|end | |
| | |
| | |
| | |
|rule "Too many transactions on same accounts in a short period of time" | |
| salience 1000 | |
|when | |
| $lastEvent:OperationRequest($timestamp:timestamp,$fromAccountId:fromAccountId, $toAccountId:toAccountId) over windows:length(1) | |
| $events:LinkedList(size>2) from collect ( | |
| OperationRequest( | |
| fromAccountId==$fromAccountId, | |
| toAccountId==$toAccountId, | |
| this before[0s, 10s] $lastEvent) | |
| ) | |
|then | |
| System.out.println(""+$timestamp+" TROP DE TRANSACTIONS TROP RAPIDEMENT, deleting latest="+$lastEvent); | |
| delete($lastEvent); | |
| //bankingInterface.alert($fromAccountId, $toAccountId, $lastAmount, "trop de transaction trop rapidement") | |
|end | |
| | |
|""".stripMargin | |
val config = DroolsEngineConfig( | |
withDroolsLogging = false, | |
equalsWithIdentity = false, | |
pseudoClock = false | |
) | |
val engine = DroolsEngine(drl,config) | |
val props = new java.util.Properties() | |
props.put("bootstrap.servers", "127.0.0.1:4242") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("auto.offset.reset", "latest") // latest message; versus "earliest" | |
props.put("group.id", "consumer-group") | |
import scala.jdk.CollectionConverters._ | |
import java.time.Duration | |
val consumer = new KafkaConsumer[String, String](props) | |
consumer.subscribe(List("banking-operations").asJavaCollection) | |
println("Banking expert system is ready to process events") | |
while (true) { | |
val record = consumer.poll(Duration.ofSeconds(1)).asScala | |
for (data <- record.iterator) { | |
val json = data.value() | |
println(json) | |
engine.insertJson(json, "banking.OperationRequest") | |
} | |
engine.fireAllRules() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment