Skip to content

Instantly share code, notes, and snippets.

@chessman
Last active September 16, 2016 15:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chessman/8a5175c5d5c933796e02e42948146609 to your computer and use it in GitHub Desktop.
Save chessman/8a5175c5d5c933796e02e42948146609 to your computer and use it in GitHub Desktop.
Kafka 0.9.0.1 Consumer
class MyConsumer(
groupId: String,
broker: String
) extends Logging {
val props = new Properties();
props.put(KafkaConsumerConf.BOOTSTRAP_SERVERS_CONFIG, broker)
props.put(KafkaConsumerConf.GROUP_ID_CONFIG, groupId)
props.put(KafkaConsumerConf.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer = new KafkaConsumer(
props,
new org.apache.kafka.common.serialization.ByteArrayDeserializer(),
new org.example.kafka.MyDeserializer
)
consumer.subscribe(List("mytopic").asJava)
def read(): Seq[ConsumerRecord[Array[Byte], Try[MyEventStruct]]] = {
consumer.listTopics() //<- DOESN'T WORK WITHOUT IT
consumer.poll(100).iterator.asScala.toSeq
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment