Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Embedded Kafka broker / producer / simple consumer in a single process useful for testing or for persistent queues.
import java.util.Properties
import kafka.server.KafkaServer
import kafka.server.KafkaConfig
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.message.Message
import kafka.producer.ProducerData
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.utils.Utils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
object KafkaEmbedded extends App {
val props = new Properties()
props.setProperty("hostname", "localhost")
props.setProperty("port", "9090");
props.setProperty("brokerid", "1")
props.setProperty("log.dir", "/tmp/embeddedkafka/")
props.setProperty("enable.zookeeper", "false")
val server = new KafkaServer(new KafkaConfig(props))
server.startup()
val prodProps = new Properties()
prodProps.setProperty("producer.type", "async")
prodProps.setProperty("queue.time", "2000")
prodProps.setProperty("queue.size", "100")
prodProps.setProperty("batch.size", "10")
prodProps.setProperty("broker.list", "1:localhost:9090")
val prodConfig = new ProducerConfig(prodProps)
val prod = (new Producer[String, Message](prodConfig))
for(i <- 1 to 200) {
prod.send(new ProducerData("TEST",new Message("testing 1 2 3".getBytes)))
}
val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
var offset = 0L
var i = 0
while (true) {
val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
for (msg <- cons.fetch(fetchRequest)) {
i = i + 1
println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
offset = msg.offset
}
}
sys.addShutdownHook({
prod.close()
cons.close()
server.shutdown()
server.awaitShutdown()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.