Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/b5f0335b7c218b57e413a63633cad9f4 to your computer and use it in GitHub Desktop.
Save dacr/b5f0335b7c218b57e413a63633cad9f4 to your computer and use it in GitHub Desktop.
Enhanced drools example banking application wired to a kafka topic / published by https://github.com/dacr/code-examples-manager #374073d7-78bb-4be6-8ee4-f0bcab68765e/197ca2822bf2c42838b6151614a3c45e5f47e1eb
// summary : Enhanced drools example banking application wired to a kafka topic
// keywords : scala, drools, mvel, ai, knowledgebase, kafka, cep, complex-event-processing
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 374073d7-78bb-4be6-8ee4-f0bcab68765e
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// created-on : 2019-10-23T22:02:54+02:00
// managed-by : https://github.com/dacr/code-examples-manager
/*
PLEASE FIRST START A REPL SESSION AND EVAL THE CONTENT OF
the "drools-kb-banking-events-stream-generator.sc" SCRIPT
to start an embedded kafka server and get simple way to generate events
(See : https://gist.github.com/dacr/da70d346f0e477c1a6e78bc15405cfd3)
then quickly makes such call on THE REPL :
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32)
And see the effect on this script
*/
import $ivy.`fr.janalyse::drools-scripting:1.0.11`
import $ivy.`org.apache.kafka::kafka:2.6.0`
import fr.janalyse.droolscripting._
import org.apache.kafka.clients.consumer.KafkaConsumer
class BankingInterface() {
def alert(fromAccountId:String, toAccountId:String, amount:Double, msg:String):Unit = {
System.out.println(s"ALERT - $fromAccountId->$toAccountId $amount CFA : $msg")
// CALL to API rest to block
}
}
val drl =
"""package banking
|
|import java.util.LinkedList
|
|//global com.truc.BankingInterface bankingInterface;
|
|declare OperationRequest @role(event) @timestamp(timestamp)
| timestamp: java.util.Date
| operationId: String
| fromAccountId: String
| toAccountId: String
| label:String
| amount: double
|end
|
|
|declare Account
| id:String
| balance:double
|end
|
|
|// -------------------------------------------------------------------------------
|rule "init" when then
| insert(new Account("aaa-bbb", 1000000));
| insert(new Account("bbb-ddd", 1000000));
|end
|
|// -------------------------------------------------------------------------------
|
|rule "update balance" no-loop when
| $operation: OperationRequest(
| $timestamp:timestamp,
| $fromId:fromAccountId,
| $toId:toAccountId,
| $amount:amount
| )
| $from:Account(id == $fromId, $fromBalance:balance, $fromBalance-$amount >= 0)
| $to:Account(id == $toId, $toBalance:balance)
|then
| modify($from) {
| setBalance($fromBalance - $amount);
| }
| modify($to) {
| setBalance($toBalance + $amount);
| }
| System.out.println(""+$timestamp+" UPDATE BALANCES : "+$fromId+":"+($fromBalance - $amount)+ " -> "+$toId+":"+($toBalance + $amount));
|end
|
|// -------------------------------------------------------------------------------
|
|rule "Log events"
|when
| $operation:OperationRequest($timestamp:timestamp,$label:label, $amount:amount)
|then
| System.out.println(""+$timestamp+" "+$label+" "+$amount);
|end
|
|
|rule "Transaction too high !"
| salience 1000
|when
| $lastEvent:OperationRequest(
| $timestamp:timestamp,
| $lastAmount:amount,
| $label:label,
| amount > 100000) over windows:length(1)
|then
| System.out.println(""+$timestamp+" REJECTED TOO HIGH "+$label+" "+$lastAmount);
| delete($lastEvent);
|end
|
|
|rule "Too many important transaction"
| salience 1000
|when
| $lastEvent:OperationRequest(
| $timestamp:timestamp,
| $fromAccountId:fromAccountId,
| $lastAmount:amount,
| amount > 10000) over windows:length(1)
| $events:LinkedList(size>2) from collect (
| OperationRequest(
| fromAccountId==$fromAccountId,
| amount > 10000,
| this before[0s, 5m] $lastEvent)
| )
|then
| System.out.println(""+$timestamp+" : TROP DE TRANSACTIONS IMPORTANTES");
| delete($lastEvent);
|end
|
|
|
|
|rule "Duplicated amounts in a short period of time"
| salience 1100
|when
| $lastEvent:OperationRequest(
| $timestamp:timestamp,
| $fromAccountId:fromAccountId,
| $toAccountId:toAccountId,
| $lastAmount:amount) over windows:length(1)
| $previousEvents:LinkedList(size>0) from collect (
| OperationRequest(
| this != $lastEvent,
| fromAccountId==$fromAccountId,
| amount == $lastAmount,
| this before[0s, 10s] $lastEvent)
| )
|then
| System.out.println(""+$timestamp+" TROP DE TRANSACTIONS DUPLIQUEES, deleting latest="+$lastEvent);
| //bankingInterface.alert($fromAccountId, $toAccountId, $lastAmount, "trop de transaction trop rapidement")
| //bankingInterface.suspisciousTransaction($lastEvent, $previousEvents)
|end
|
|
|
|rule "Too many transactions on same accounts in a short period of time"
| salience 1000
|when
| $lastEvent:OperationRequest($timestamp:timestamp,$fromAccountId:fromAccountId, $toAccountId:toAccountId) over windows:length(1)
| $events:LinkedList(size>2) from collect (
| OperationRequest(
| fromAccountId==$fromAccountId,
| toAccountId==$toAccountId,
| this before[0s, 10s] $lastEvent)
| )
|then
| System.out.println(""+$timestamp+" TROP DE TRANSACTIONS TROP RAPIDEMENT, deleting latest="+$lastEvent);
| delete($lastEvent);
| //bankingInterface.alert($fromAccountId, $toAccountId, $lastAmount, "trop de transaction trop rapidement")
|end
|
|""".stripMargin
val config = DroolsEngineConfig(
withDroolsLogging = false,
equalsWithIdentity = false,
pseudoClock = false
)
val engine = DroolsEngine(drl,config)
val props = new java.util.Properties()
props.put("bootstrap.servers", "127.0.0.1:4242")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest") // latest message; versus "earliest"
props.put("group.id", "consumer-group")
import scala.jdk.CollectionConverters._
import java.time.Duration
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List("banking-operations").asJavaCollection)
println("Banking expert system is ready to process events")
while (true) {
val record = consumer.poll(Duration.ofSeconds(1)).asScala
for (data <- record.iterator) {
val json = data.value()
println(json)
engine.insertJson(json, "banking.OperationRequest")
}
engine.fireAllRules()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment