Skip to content

Instantly share code, notes, and snippets.

@mardambey
Created October 31, 2011 04:42
Show Gist options
  • Save mardambey/1326933 to your computer and use it in GitHub Desktop.
Save mardambey/1326933 to your computer and use it in GitHub Desktop.
Sample Kafka stdin producer
package com.edate.data.test
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.message.Message
import kafka.producer.ProducerData
import kafka.producer.ProducerData
import kafka.producer.Partitioner
/**
* stream | ./KafkaProducerTest zk-info topic
* where the data is msg
* or
* stream | ./KafkaProducerTest zk-info
* where the data is topic msg
*/
object KafkaProducerTest extends App {
val producerProps = new Properties()
producerProps.put("zk.connect", args(0))
//producerProps.put("partitioner.class", "com.edate.data.test.CustomPartitioner")
//producerProps.put("host", "192.168.100.10")
//producerProps.put("port", "9091")
// we can use the async producer for buffered sends
val producerConfig = new ProducerConfig(producerProps)
val topic = if (args.size > 1) args(1) else null
val producer = new Producer[String, Message](producerConfig)
for( ln <- io.Source.stdin.getLines ) {
if (topic != null) {
producer.send(new ProducerData(topic, new Message(ln.getBytes)))
println(topic + " -> " + ln)
} else {
try {
val t = ln.substring(0, ln.indexOf(" "))
val msg = ln.substring(ln.indexOf(" ") + 1, ln.size)
println(t + " -> " + msg)
producer.send(new ProducerData(t, new Message(msg.getBytes)))
} catch {
case e:Exception => println(e.getMessage)
}
}
}
}
class CustomPartitioner extends Partitioner[Message] {
val r = new scala.util.Random
def partition(data: Message, numPartitions: Int): Int = {
r.nextInt(3) + 1
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment