Skip to content

Instantly share code, notes, and snippets.

@mardambey
Created May 10, 2012 02:58
  • Star 21 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save mardambey/2650743 to your computer and use it in GitHub Desktop.
Embedded Kafka broker / producer / simple consumer in a single process useful for testing or for persistent queues.
import java.util.Properties
import kafka.server.KafkaServer
import kafka.server.KafkaConfig
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.message.Message
import kafka.producer.ProducerData
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.utils.Utils
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
object KafkaEmbedded extends App {
val props = new Properties()
props.setProperty("hostname", "localhost")
props.setProperty("port", "9090");
props.setProperty("brokerid", "1")
props.setProperty("log.dir", "/tmp/embeddedkafka/")
props.setProperty("enable.zookeeper", "false")
val server = new KafkaServer(new KafkaConfig(props))
server.startup()
val prodProps = new Properties()
prodProps.setProperty("producer.type", "async")
prodProps.setProperty("queue.time", "2000")
prodProps.setProperty("queue.size", "100")
prodProps.setProperty("batch.size", "10")
prodProps.setProperty("broker.list", "1:localhost:9090")
val prodConfig = new ProducerConfig(prodProps)
val prod = (new Producer[String, Message](prodConfig))
for(i <- 1 to 200) {
prod.send(new ProducerData("TEST",new Message("testing 1 2 3".getBytes)))
}
val cons = new SimpleConsumer("localhost", 9090, 100, 1024)
var offset = 0L
var i = 0
while (true) {
val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
for (msg <- cons.fetch(fetchRequest)) {
i = i + 1
println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.toString(msg.message.payload, "UTF-8"))
offset = msg.offset
}
}
sys.addShutdownHook({
prod.close()
cons.close()
server.shutdown()
server.awaitShutdown()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment