Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/da70d346f0e477c1a6e78bc15405cfd3 to your computer and use it in GitHub Desktop.
Save dacr/da70d346f0e477c1a6e78bc15405cfd3 to your computer and use it in GitHub Desktop.
Start an embedded kafka and provide simple way to generate simple banking events / published by https://github.com/dacr/code-examples-manager #d45b327d-934a-4c2c-a312-1b98641a6b44/d98092286ab630ced8e552e9c0dc2b2d3f2f0578
// summary : Start an embedded kafka and provide simple way to generate simple banking events
// keywords : scala, kafka, stream, events
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : d45b327d-934a-4c2c-a312-1b98641a6b44
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// created-on : 2019-10-11T20:51:47+02:00
// managed-by : https://github.com/dacr/code-examples-manager
/*
THIS SCRIPT starts a kafka server and provide an utility function to
generate banking events.
For best flexibility, copy paste the content of this script into an ammonite REPL session.
(just start amm command)
Once started you can play with those scripts to see what happens when you generate events :
- drools-kb-banking.sc : https://gist.github.com/c56a579cca9daf3290ee2aae4f347f3c
- drools-kb-banking-advanced.sc : https://gist.github.com/b5f0335b7c218b57e413a63633cad9f4
- drools-kb-banking-skeleton.sc : https://gist.github.com/89c2d24d228c955b2da308fc29dd6037
*/
import $ivy.`io.github.embeddedkafka::embedded-kafka:2.6.0`
import $ivy.`org.json4s::json4s-jackson:3.6.10`
import $ivy.`org.json4s::json4s-ext:3.6.10`
import net.manub.embeddedkafka._
import java.time.OffsetDateTime
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.write
import org.json4s.Extraction
import org.json4s.ext.JavaTimeSerializers
case class OperationRequest(
timestamp: OffsetDateTime,
operationId: String,
fromAccountId: String,
toAccountId: String,
label:String,
amount: Double,
)
object Generator {
val requestTopic="banking-operations"
val dateFormatPattern = "yyyy-MM-dd'T'HH:mm:ss'Z'"
import EmbeddedKafka.publishToKafka
implicit val kafkaConfig = EmbeddedKafkaConfig(kafkaPort = 4242)
implicit val kafkaSerializer = new StringSerializer
implicit val kafkaDeserializer = new StringDeserializer
implicit val jsonFormats = DefaultFormats.lossless ++ JavaTimeSerializers.all
def start():Unit = EmbeddedKafka.start()
def stop():Unit = EmbeddedKafka.stop()
def operate(operationId:String,fromAccountId:String,toAccountId: String,label:String,amount: Double):Unit = {
val operationRequest = OperationRequest(
timestamp = OffsetDateTime.now(),
operationId = operationId,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
label = label,
amount = amount,
)
val jsonMessage = Extraction.decompose(operationRequest)
publishToKafka(requestTopic, write(jsonMessage))
}
}
Generator.start()
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32)
println("RUN THIS IN REPL MODE, not in script mode.")
println("Copy and paste this code into a dedicated ammonite console")
//System.in.read() // Enter to exit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment