Skip to content

Instantly share code, notes, and snippets.

@l15k4
Last active Jun 29, 2016
Embed
What would you like to do?
val msgCount = 10
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(s"$KafkaHost:9092")
.withProperty("batch.size", "0")
def consumerSettings(topic: String, clientId: String) =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(s"$KafkaHost:9092")
.withClientId(clientId)
.withPollInterval(100.millis)
// .withProperty("enable.auto.commit", "false")
// .withProperty("heartbeat.interval.ms", "300")
.withGroupId(clientId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val sf: Future[Done] = {
val control =
Consumer.committableSource(consumerSettings("topic1", "client1"), Subscriptions.topics("topic1"))
.take(5)
.mapAsync(1)( result => result.committableOffset.commitScaladsl().map(_ => result))
.grouped(5)
.to(Sink.head).run()
Thread.sleep(1000)
control.stop.flatMap(_ => control.shutdown())
}
Thread.sleep(500)
Source(1 to msgCount)
.map(msg => new ProducerRecord[String, String]("topic1", msg.toString))
.runWith(Producer.plainSink(producerSettings))
_____________________________________________________________________
lisak@lisak: ~/src/gwi/gwiq-pipeline/docker on master [+!$]
$ JAVA_TOOL_OPTIONS=-Djava.net.preferIPv4Stack=true kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic1 --group client1
Picked up JAVA_TOOL_OPTIONS: -Djava.net.preferIPv4Stack=true
[2016-06-29 17:48:07,632] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/client1/offsets/topic1/0.
lisak@lisak: ~/src/gwi/gwiq-pipeline/docker on master [+!$]
$ JAVA_TOOL_OPTIONS=-Djava.net.preferIPv4Stack=true kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2181 --group client1 --describe
Picked up JAVA_TOOL_OPTIONS: -Djava.net.preferIPv4Stack=true
No topic available for consumer group provided
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment