public
Last active

Embedded Kafka broker / producer / simple consumer in a single process useful for testing or for persistent queues.

  • Download Gist
KafkaEmbedded.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
import java.util.Properties
import kafka.server.KafkaServer
import kafka.server.KafkaConfig
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.message.Message
import kafka.producer.ProducerData
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.utils.Utils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
 
object KafkaEmbedded extends App {
val props = new Properties()
props.setProperty("hostname", "localhost")
props.setProperty("port", "9090");
props.setProperty("brokerid", "1")
props.setProperty("log.dir", "/tmp/embeddedkafka/")
props.setProperty("enable.zookeeper", "false")
val server = new KafkaServer(new KafkaConfig(props))
server.startup()
val prodProps = new Properties()
prodProps.setProperty("producer.type", "async")
prodProps.setProperty("queue.time", "2000")
prodProps.setProperty("queue.size", "100")
prodProps.setProperty("batch.size", "10")
prodProps.setProperty("broker.list", "1:localhost:9090")
val prodConfig = new ProducerConfig(prodProps)
val prod = (new Producer[String, Message](prodConfig))
for(i <- 1 to 200) {
prod.send(new ProducerData("TEST",new Message("testing 1 2 3".getBytes)))
}
val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
var offset = 0L
var i = 0
while (true) {
val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
for (msg <- cons.fetch(fetchRequest)) {
i = i + 1
println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
offset = msg.offset
}
}
sys.addShutdownHook({
prod.close()
cons.close()
server.shutdown()
server.awaitShutdown()
})
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.