Skip to content

Instantly share code, notes, and snippets.

Last active April 2, 2023 10:13
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
Start an embedded kafka and provide simple way to generate simple banking events / published by #d45b327d-934a-4c2c-a312-1b98641a6b44/d98092286ab630ced8e552e9c0dc2b2d3f2f0578
// summary : Start an embedded kafka and provide simple way to generate simple banking events
// keywords : scala, kafka, stream, events
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (
// id : d45b327d-934a-4c2c-a312-1b98641a6b44
// execution : scala ammonite script ( - run as follow 'amm'
// created-on : 2019-10-11T20:51:47+02:00
// managed-by :
THIS SCRIPT starts a kafka server and provide an utility function to
generate banking events.
For best flexibility, copy paste the content of this script into an ammonite REPL session.
(just start amm command)
Once started you can play with those scripts to see what happens when you generate events :
- :
- :
- :
import $ivy.`io.github.embeddedkafka::embedded-kafka:2.6.0`
import $ivy.`org.json4s::json4s-jackson:3.6.10`
import $ivy.`org.json4s::json4s-ext:3.6.10`
import net.manub.embeddedkafka._
import java.time.OffsetDateTime
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.write
import org.json4s.Extraction
import org.json4s.ext.JavaTimeSerializers
case class OperationRequest(
timestamp: OffsetDateTime,
operationId: String,
fromAccountId: String,
toAccountId: String,
amount: Double,
object Generator {
val requestTopic="banking-operations"
val dateFormatPattern = "yyyy-MM-dd'T'HH:mm:ss'Z'"
import EmbeddedKafka.publishToKafka
implicit val kafkaConfig = EmbeddedKafkaConfig(kafkaPort = 4242)
implicit val kafkaSerializer = new StringSerializer
implicit val kafkaDeserializer = new StringDeserializer
implicit val jsonFormats = DefaultFormats.lossless ++ JavaTimeSerializers.all
def start():Unit = EmbeddedKafka.start()
def stop():Unit = EmbeddedKafka.stop()
def operate(operationId:String,fromAccountId:String,toAccountId: String,label:String,amount: Double):Unit = {
val operationRequest = OperationRequest(
timestamp =,
operationId = operationId,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
label = label,
amount = amount,
val jsonMessage = Extraction.decompose(operationRequest)
publishToKafka(requestTopic, write(jsonMessage))
Generator.operate("send", "aaa-bbb", "bbb-ddd", "Give me cash", 32)
println("RUN THIS IN REPL MODE, not in script mode.")
println("Copy and paste this code into a dedicated ammonite console")
// // Enter to exit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment