Last active
June 29, 2016 15:57
-
-
Save l15k4/ba9b6caccb8681ba809420d9c2efd2bd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val msgCount = 10 | |
val producerSettings = | |
ProducerSettings(system, new StringSerializer, new StringSerializer) | |
.withBootstrapServers(s"$KafkaHost:9092") | |
.withProperty("batch.size", "0") | |
def consumerSettings(topic: String, clientId: String) = | |
ConsumerSettings(system, new StringDeserializer, new StringDeserializer) | |
.withBootstrapServers(s"$KafkaHost:9092") | |
.withClientId(clientId) | |
.withPollInterval(100.millis) | |
// .withProperty("enable.auto.commit", "false") | |
// .withProperty("heartbeat.interval.ms", "300") | |
.withGroupId(clientId) | |
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |
val sf: Future[Done] = { | |
val control = | |
Consumer.committableSource(consumerSettings("topic1", "client1"), Subscriptions.topics("topic1")) | |
.take(5) | |
.mapAsync(1)( result => result.committableOffset.commitScaladsl().map(_ => result)) | |
.grouped(5) | |
.to(Sink.head).run() | |
Thread.sleep(1000) | |
control.stop.flatMap(_ => control.shutdown()) | |
} | |
Thread.sleep(500) | |
Source(1 to msgCount) | |
.map(msg => new ProducerRecord[String, String]("topic1", msg.toString)) | |
.runWith(Producer.plainSink(producerSettings)) | |
_____________________________________________________________________ | |
lisak@lisak: ~/src/gwi/gwiq-pipeline/docker on master [+!$] | |
$ JAVA_TOOL_OPTIONS=-Djava.net.preferIPv4Stack=true kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic topic1 --group client1 | |
Picked up JAVA_TOOL_OPTIONS: -Djava.net.preferIPv4Stack=true | |
[2016-06-29 17:48:07,632] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) | |
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/client1/offsets/topic1/0. | |
lisak@lisak: ~/src/gwi/gwiq-pipeline/docker on master [+!$] | |
$ JAVA_TOOL_OPTIONS=-Djava.net.preferIPv4Stack=true kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2181 --group client1 --describe | |
Picked up JAVA_TOOL_OPTIONS: -Djava.net.preferIPv4Stack=true | |
No topic available for consumer group provided | |
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment