Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Start an embedded kafka and provide simple way to generate simple banking events / published by https://github.com/dacr/code-examples-manager #d45b327d-934a-4c2c-a312-1b98641a6b44/e7e9069e86ee8c6d9b39b1815f7a9b50d85af841
// summary : Start an embedded kafka and provide simple way to generate simple banking events
// keywords : scala, kafka, stream, events
// publish : gist
// authors : David Crosson
// license : Apache2
// id : d45b327d-934a-4c2c-a312-1b98641a6b44
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// created-on : 2019-10-11T20:51:47+02:00
/*
THIS SCRIPT starts a kafka server and provide an utility function to
generate banking events.
For best flexibility, copy paste the content of this script into an ammonite REPL session.
(just start amm command)
Once started you can play with those scripts to see what happens when you generate events :
- drools-kb-banking.sc : https://gist.github.com/c56a579cca9daf3290ee2aae4f347f3c
- drools-kb-banking-advanced.sc : https://gist.github.com/b5f0335b7c218b57e413a63633cad9f4
- drools-kb-banking-skeleton.sc : https://gist.github.com/89c2d24d228c955b2da308fc29dd6037
*/
import $ivy.`io.github.embeddedkafka::embedded-kafka:2.6.0`
import $ivy.`org.json4s::json4s-jackson:3.6.10`
import $ivy.`org.json4s::json4s-ext:3.6.10`
import net.manub.embeddedkafka._
import java.time.OffsetDateTime
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.write
import org.json4s.Extraction
import org.json4s.ext.JavaTimeSerializers
case class OperationRequest(
timestamp: OffsetDateTime,
operationId: String,
fromAccountId: String,
toAccountId: String,
label:String,
amount: Double,
)
object Generator {
val requestTopic="banking-operations"
val dateFormatPattern = "yyyy-MM-dd'T'HH:mm:ss'Z'"
import EmbeddedKafka.publishToKafka
implicit val kafkaConfig = EmbeddedKafkaConfig(kafkaPort = 4242)
implicit val kafkaSerializer = new StringSerializer
implicit val kafkaDeserializer = new StringDeserializer
implicit val jsonFormats = DefaultFormats.lossless ++ JavaTimeSerializers.all
def start():Unit = EmbeddedKafka.start()
def stop():Unit = EmbeddedKafka.stop()
def operate(operationId:String,fromAccountId:String,toAccountId: String,label:String,amount: Double):Unit = {
val operationRequest = OperationRequest(
timestamp = OffsetDateTime.now(),
operationId = operationId,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
label = label,
amount = amount,
)
val jsonMessage = Extraction.decompose(operationRequest)
publishToKafka(requestTopic, write(jsonMessage))
}
}
Generator.start()
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32)
println("RUN THIS IN REPL MODE, not in script mode.")
println("Copy and paste this code into a dedicated ammonite console")
//System.in.read() // Enter to exit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment