Skip to content

Instantly share code, notes, and snippets.

@l15k4
Last active June 29, 2016 15:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/ba9b6caccb8681ba809420d9c2efd2bd to your computer and use it in GitHub Desktop.
Save l15k4/ba9b6caccb8681ba809420d9c2efd2bd to your computer and use it in GitHub Desktop.
val msgCount = 10
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(s"$KafkaHost:9092")
.withProperty("batch.size", "0")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(s"$KafkaHost:9092")
.withClientId(clientId)
.withPollInterval(100.millis)
// .withProperty("enable.auto.commit", "false")
// .withProperty("heartbeat.interval.ms", "300")
.withGroupId(clientId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val sf: Future[Done] = {
val control =
Consumer.committableSource(consumerSettings("topic1", "client1"), Subscriptions.topics("topic1"))
.take(5)
.mapAsync(1)( result => result.committableOffset.commitScaladsl().map(_ => result))
.grouped(5)
.to(Sink.head).run()
Thread.sleep(1000)
control.stop.flatMap(_ => control.shutdown())
}
Thread.sleep(500)
Source(1 to msgCount)
.map(msg => new ProducerRecord[String, String]("topic1", msg.toString))
.runWith(Producer.plainSink(producerSettings))
_____________________________________________________________________
lisak@lisak: ~/src/gwi/gwiq-pipeline/docker on master [+!$]
$ JAVA_TOOL_OPTIONS=-Djava.net.preferIPv4Stack=true kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic1 --group client1
Picked up JAVA_TOOL_OPTIONS: -Djava.net.preferIPv4Stack=true
[2016-06-29 17:48:07,632] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/client1/offsets/topic1/0.
lisak@lisak: ~/src/gwi/gwiq-pipeline/docker on master [+!$]
$ JAVA_TOOL_OPTIONS=-Djava.net.preferIPv4Stack=true kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2181 --group client1 --describe
Picked up JAVA_TOOL_OPTIONS: -Djava.net.preferIPv4Stack=true
No topic available for consumer group provided
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment