Skip to content

Instantly share code, notes, and snippets.

@bzz
Forked from mardambey/KafkaEmbedded.scala
Last active January 15, 2018 16:05
Show Gist options
  • Save bzz/936648c2af94d1fac4ee to your computer and use it in GitHub Desktop.
Save bzz/936648c2af94d1fac4ee to your computer and use it in GitHub Desktop.
Embedded Kafka 0.8.2.0 (without Zookeeper) and producer/consumer in Scala with Gradle for tests
apply plugin: 'scala'
dependencies {
compile "org.scala-lang:scala-library:2.10.4"
compile ("org.apache.kafka:kafka_2.10:0.8.2.0") {
exclude group: "com.sun.jmx", module: "jmxri"
exclude group: "com.sun.jdmk", module: "jmxtools"
exclude group: "javax.jms", module: "jms"
}
compile ("org.apache.kafka:kafka_$scalaVersion:0.8.2.0:test") {
exclude group: "com.sun.jmx", module: "jmxri"
exclude group: "com.sun.jdmk", module: "jmxtools"
exclude group: "javax.jms", module: "jms"
}
}
import java.util.Properties
import scala.util.control.Breaks._
import kafka.server.KafkaServer
import kafka.server.KafkaConfig
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.message.Message
import kafka.producer.KeyedMessage
import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
import kafka.utils.Utils
import kafka.api.FetchRequestBuilder
object KafkaEmbedded extends App {
runEmbeddedKafka()
putSampleDataToKafka()
readSmapleData()
def runEmbeddedKafka() = {
val props = new Properties()
props.setProperty("hostname", "localhost")
props.setProperty("port", "9090");
props.setProperty("brokerid", "1")
props.setProperty("log.dir", "/tmp/embeddedkafka/")
props.setProperty("enable.zookeeper", "false")
//TODO: run EmbeddedZookeeper from i.e like ZooKeeperTestHarness
//props.setProperty("zookeeper.connect", zkConnect.toString)
val server = new KafkaServer(new KafkaConfig(props))
server.startup()
}
def putSampleDataToKafka() = {
//TODO: create topic first, like kafka.utils.TestUtils.createTopic()
val prodProps = new Properties()
prodProps.setProperty("producer.type", "async")
prodProps.setProperty("queue.time", "2000")
prodProps.setProperty("queue.size", "100")
prodProps.setProperty("batch.size", "10")
prodProps.setProperty("broker.list", "1:localhost:9090")
prodProps.setProperty("key.serializer.class", "kafka.serializer.StringEncoder")
val prodConfig = new ProducerConfig(prodProps)
val prod = (new Producer[String, Message](prodConfig))
for(i <- 1 to 200) {
prod.send(new KeyedMessage("TEST", new Message("testing 1 2 3".getBytes)))
}
}
def readSmapleData() = {
val cons = new SimpleConsumer("localhost", 9090, 100, 1024, "simpleConsumer")
val topic = "TEST"
val partition = 0
var offset = 0L
var i = 0
while (true) {
var numRead = 0
val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024).build()
val fetchResponse = cons.fetch(fetchRequest)
for (msg <- fetchResponse.messageSet(topic, partition)) {
numRead += 1
println("consumed [ " + i + "]: offset = " + msg.offset + ", payload = " + Utils.readString(msg.message.payload))
offset = msg.nextOffset
i += 1
}
if (numRead == 0) {
break
}
}
}
sys.addShutdownHook({
prod.close()
cons.close()
server.shutdown()
server.awaitShutdown()
})
}
@desavera
Copy link

desavera commented Jan 15, 2018

This looks good ... specially for those maintaining old versions of Kafka ! Just a note , you have to declare :

var server:KafkaServer = _
var prod:Producer[String, Message] = _
var cons:SimpleConsumer = _

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment