Last active
April 11, 2016 17:54
-
-
Save codeaperature/15088179a11fe13bd61e62fd169de1b4 to your computer and use it in GitHub Desktop.
Apache Kafka Scala Polled Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Kafka - Scala Example of Kafka Polled Consumer\n", | |
"\n", | |
"### Stephan Warren - April 9th, 2016\n", | |
"\n", | |
"Stephan Warren - stephanwarren at ymail\n", | |
"\n", | |
"Initially we are going to follow the quickstart. (See below for where to find this quickstart)\n", | |
"\n", | |
"\n", | |
"#### Download Kafka from http://kafka.apache.org/downloads.html \n", | |
"Choose the version that is kafka_2.11-0.9.0.1.tgz\n", | |
"Verify the MD5 (as required)\n", | |
"\n", | |
"> tar -xzf kafka_2.11-0.9.0.0.tgz\n", | |
"> cd kafka_2.11-0.9.0.0\n", | |
"\n", | |
"\n", | |
"#### Docs\n", | |
"In the decompressed directory, in the subdirectory called site-docs is a file called kafka_2.11-0.9.0.1-site-docs.tgz, you can find the file ./kafka_2.11-0.9.0.1/site-docs/site-docs/quickstart.html. Some of this file is contained in this Toree Notebook.\n", | |
"\n", | |
"#### Start Zookeeper (ZK) & Kafka\n", | |
"Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.\n", | |
"```\n", | |
"$ bin/zookeeper-server-start.sh config/zookeeper.properties\n", | |
"[2016-04-09 19:57:33,818] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)\n", | |
"[2016-04-09 19:57:33,821] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)\n", | |
"[2016-04-09 19:57:33,821] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)\n", | |
"[2016-04-09 19:57:33,821] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)\n", | |
"... (etc)\n", | |
"```\n", | |
"In another shell, start the Kafka server (excuse this long list - the settings may be interesting at a later point):\n", | |
"```\n", | |
"bin/kafka-server-start.sh config/server.properties\n", | |
"[2016-04-09 20:00:33,936] INFO KafkaConfig values: \n", | |
"\tadvertised.host.name = null\n", | |
"\tmetric.reporters = []\n", | |
"\tquota.producer.default = 9223372036854775807\n", | |
"\toffsets.topic.num.partitions = 50\n", | |
"\tlog.flush.interval.messages = 9223372036854775807\n", | |
"\tauto.create.topics.enable = true\n", | |
"\tcontroller.socket.timeout.ms = 30000\n", | |
"\tlog.flush.interval.ms = null\n", | |
"\tprincipal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder\n", | |
"\treplica.socket.receive.buffer.bytes = 65536\n", | |
"\tmin.insync.replicas = 1\n", | |
"\treplica.fetch.wait.max.ms = 500\n", | |
"\tnum.recovery.threads.per.data.dir = 1\n", | |
"\tssl.keystore.type = JKS\n", | |
"\tdefault.replication.factor = 1\n", | |
"\tssl.truststore.password = null\n", | |
"\tlog.preallocate = false\n", | |
"\tsasl.kerberos.principal.to.local.rules = [DEFAULT]\n", | |
"\tfetch.purgatory.purge.interval.requests = 1000\n", | |
"\tssl.endpoint.identification.algorithm = null\n", | |
"\treplica.socket.timeout.ms = 30000\n", | |
"\tmessage.max.bytes = 1000012\n", | |
"\tnum.io.threads = 8\n", | |
"\toffsets.commit.required.acks = -1\n", | |
"\tlog.flush.offset.checkpoint.interval.ms = 60000\n", | |
"\tdelete.topic.enable = false\n", | |
"\tquota.window.size.seconds = 1\n", | |
"\tssl.truststore.type = JKS\n", | |
"\toffsets.commit.timeout.ms = 5000\n", | |
"\tquota.window.num = 11\n", | |
"\tzookeeper.connect = localhost:2181\n", | |
"\tauthorizer.class.name = \n", | |
"\tnum.replica.fetchers = 1\n", | |
"\tlog.retention.ms = null\n", | |
"\tlog.roll.jitter.hours = 0\n", | |
"\tlog.cleaner.enable = true\n", | |
"\toffsets.load.buffer.size = 5242880\n", | |
"\tlog.cleaner.delete.retention.ms = 86400000\n", | |
"\tssl.client.auth = none\n", | |
"\tcontrolled.shutdown.max.retries = 3\n", | |
"\tqueued.max.requests = 500\n", | |
"\toffsets.topic.replication.factor = 3\n", | |
"\tlog.cleaner.threads = 1\n", | |
"\tsasl.kerberos.service.name = null\n", | |
"\tsasl.kerberos.ticket.renew.jitter = 0.05\n", | |
"\tsocket.request.max.bytes = 104857600\n", | |
"\tssl.trustmanager.algorithm = PKIX\n", | |
"\tzookeeper.session.timeout.ms = 6000\n", | |
"\tlog.retention.bytes = -1\n", | |
"\tsasl.kerberos.min.time.before.relogin = 60000\n", | |
"\tzookeeper.set.acl = false\n", | |
"\tconnections.max.idle.ms = 600000\n", | |
"\toffsets.retention.minutes = 1440\n", | |
"\treplica.fetch.backoff.ms = 1000\n", | |
"\tinter.broker.protocol.version = 0.9.0.X\n", | |
"\tlog.retention.hours = 168\n", | |
"\tnum.partitions = 1\n", | |
"\tbroker.id.generation.enable = true\n", | |
"\tlisteners = PLAINTEXT://:9092\n", | |
"\tssl.provider = null\n", | |
"\tssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\n", | |
"\tlog.roll.ms = null\n", | |
"\tlog.flush.scheduler.interval.ms = 9223372036854775807\n", | |
"\tssl.cipher.suites = null\n", | |
"\tlog.index.size.max.bytes = 10485760\n", | |
"\tssl.keymanager.algorithm = SunX509\n", | |
"\tsecurity.inter.broker.protocol = PLAINTEXT\n", | |
"\treplica.fetch.max.bytes = 1048576\n", | |
"\tadvertised.port = null\n", | |
"\tlog.cleaner.dedupe.buffer.size = 134217728\n", | |
"\treplica.high.watermark.checkpoint.interval.ms = 5000\n", | |
"\tlog.cleaner.io.buffer.size = 524288\n", | |
"\tsasl.kerberos.ticket.renew.window.factor = 0.8\n", | |
"\tzookeeper.connection.timeout.ms = 6000\n", | |
"\tcontrolled.shutdown.retry.backoff.ms = 5000\n", | |
"\tlog.roll.hours = 168\n", | |
"\tlog.cleanup.policy = delete\n", | |
"\thost.name = \n", | |
"\tlog.roll.jitter.ms = null\n", | |
"\tmax.connections.per.ip = 2147483647\n", | |
"\toffsets.topic.segment.bytes = 104857600\n", | |
"\tbackground.threads = 10\n", | |
"\tquota.consumer.default = 9223372036854775807\n", | |
"\trequest.timeout.ms = 30000\n", | |
"\tlog.index.interval.bytes = 4096\n", | |
"\tlog.dir = /tmp/kafka-logs\n", | |
"\tlog.segment.bytes = 1073741824\n", | |
"\tlog.cleaner.backoff.ms = 15000\n", | |
"\toffset.metadata.max.bytes = 4096\n", | |
"\tssl.truststore.location = null\n", | |
"\tgroup.max.session.timeout.ms = 30000\n", | |
"\tssl.keystore.password = null\n", | |
"\tzookeeper.sync.time.ms = 2000\n", | |
"\tport = 9092\n", | |
"\tlog.retention.minutes = null\n", | |
"\tlog.segment.delete.delay.ms = 60000\n", | |
"\tlog.dirs = /tmp/kafka-logs\n", | |
"\tcontrolled.shutdown.enable = true\n", | |
"\tcompression.type = producer\n", | |
"\tmax.connections.per.ip.overrides = \n", | |
"\tsasl.kerberos.kinit.cmd = /usr/bin/kinit\n", | |
"\tlog.cleaner.io.max.bytes.per.second = 1.7976931348623157E308\n", | |
"\tauto.leader.rebalance.enable = true\n", | |
"\tleader.imbalance.check.interval.seconds = 300\n", | |
"\tlog.cleaner.min.cleanable.ratio = 0.5\n", | |
"\treplica.lag.time.max.ms = 10000\n", | |
"\tnum.network.threads = 3\n", | |
"\tssl.key.password = null\n", | |
"\treserved.broker.max.id = 1000\n", | |
"\tmetrics.num.samples = 2\n", | |
"\tsocket.send.buffer.bytes = 102400\n", | |
"\tssl.protocol = TLS\n", | |
"\tsocket.receive.buffer.bytes = 102400\n", | |
"\tssl.keystore.location = null\n", | |
"\treplica.fetch.min.bytes = 1\n", | |
"\tunclean.leader.election.enable = true\n", | |
"\tgroup.min.session.timeout.ms = 6000\n", | |
"\tlog.cleaner.io.buffer.load.factor = 0.9\n", | |
"\toffsets.retention.check.interval.ms = 600000\n", | |
"\tproducer.purgatory.purge.interval.requests = 1000\n", | |
"\tmetrics.sample.window.ms = 30000\n", | |
"\tbroker.id = 0\n", | |
"\toffsets.topic.compression.codec = 0\n", | |
"\tlog.retention.check.interval.ms = 300000\n", | |
"\tadvertised.listeners = null\n", | |
"\tleader.imbalance.per.broker.percentage = 10\n", | |
" (kafka.server.KafkaConfig)\n", | |
"[2016-04-09 20:00:33,985] INFO starting (kafka.server.KafkaServer)\n", | |
"[2016-04-09 20:00:33,988] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)\n", | |
"[2016-04-09 20:00:33,997] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)\n", | |
"```\n", | |
"\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Setup Kafka\n", | |
"\n", | |
"To set-up a topic for testing, use the following (in yet another shell):\n", | |
"\n", | |
"```\n", | |
"./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic_name\n", | |
"```\n", | |
"\n", | |
"##### WARNING: Do not use underscores for a topic - This messed up my system. The console would not echo messages despite the benign warning: \n", | |
"Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.\n", | |
"Created topic \"test_topic_name\".\n", | |
"```\n", | |
"./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test\n", | |
"Created topic \"test\".\n", | |
"\n", | |
"./bin/kafka-topics.sh --zookeeper localhost:2181 --list\n", | |
"test\n", | |
"```\n", | |
"\n", | |
"Note: Above, there will be one partition for this topic and one replication. Typically 3 replicants are used in production and each partitions will be serviced by one of the consumers in what's called a consumer group. Ypu can have several consumer groups consuming from the same topic:\n", | |
"\n", | |
"See:\n", | |
"Partititions - http://kafka.apache.org/images/log_anatomy.png\n", | |
"\n", | |
"and:\n", | |
"\n", | |
"Consumer Groups: http://kafka.apache.org/images/consumer-groups.png\n", | |
"\n", | |
"\n", | |
"\n", | |
"Let's check the broker is working by sending (producing messages):\n", | |
"```\n", | |
"/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test\n", | |
"This is a message\n", | |
"This is another message\n", | |
"This is a 3rd message\n", | |
"This 4th message is getting tacky \n", | |
"```\n", | |
"\n", | |
"And did we get these messages?\n", | |
"\n", | |
"```\n", | |
"bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning\n", | |
"```\n", | |
"Looks like it:\n", | |
"```\n", | |
"This is a message\n", | |
"This is another message\n", | |
"This is a 3rd message\n", | |
"This 4th message is getting tacky\n", | |
"```\n", | |
"\n", | |
"\n", | |
"For setting up more Kafka instances, see the quickstart.\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"collapsed": true | |
}, | |
"source": [ | |
"### Consumer for Polled Messages\n", | |
"Here is the Spark code needed to get this going:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"import org.apache.kafka._\n", | |
"import java.util.{Calendar, Properties}\n", | |
"import java.util\n", | |
"import org.apache.kafka.clients.consumer._\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
" def runKafkaConsumer : Unit = {\n", | |
"// val topic = \"test\"\n", | |
" val props = new Properties()\n", | |
" props.put(\"bootstrap.servers\", \"localhost:9092\")\n", | |
" props.put(\"group.id\", \"test_IntelliJ\")\n", | |
" props.put(\"enable.auto.commit\", \"true\")\n", | |
" props.put(\"auto.commit.interval.ms\", \"1000\")\n", | |
" props.put(\"session.timeout.ms\", \"30000\")\n", | |
" props.put(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")\n", | |
" props.put(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")\n", | |
" // no partitions yet\n", | |
" // props.put(\"partition.assignment.strategy\", \"range\");\n", | |
"\n", | |
"\n", | |
" val iterConsumer = new KafkaConsumer[String, String](props)\n", | |
" iterConsumer.subscribe(util.Arrays.asList(\"test\"))\n", | |
" var isRunning = true\n", | |
" while(isRunning) {\n", | |
" val records = iterConsumer.poll(1000)\n", | |
" if(!records.isEmpty) {\n", | |
" println(\"record: \"+ records)\n", | |
" println(\"count: \"+ records.count)\n", | |
" println(\"isEmpty: \"+ records.isEmpty)\n", | |
" println(\"partition: \"+ records.partitions)\n", | |
" println(\"records: \"+ records.records(\"test\"))\n", | |
" var i = 0\n", | |
" val iterRecords = records.iterator\n", | |
" while (iterRecords.hasNext) {\n", | |
" val consumerRec = iterRecords.next()\n", | |
" println(i+ \": Key :: \" + consumerRec.key)\n", | |
" println(i+ \": Value :: \" + consumerRec.value)\n", | |
" println(i+ \": Part...:: \" + consumerRec.partition)\n", | |
" println(i+ \": Topic :: \" + consumerRec.topic)\n", | |
" println(i+ \": Offset :: \" + consumerRec.offset)\n", | |
" i += 1\n", | |
" }\n", | |
" }\n", | |
" println(Calendar.getInstance().getTime())\n", | |
" }\n", | |
" iterConsumer.close()\n", | |
" }\n", | |
"\n" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"\n", | |
"### Getting some data:\n", | |
"Bear in mind that the data is being quick keyed with:\n", | |
"```\n", | |
"/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test\n", | |
"```\n", | |
"\n", | |
"Here is some sample output (the key is missing with the .sh producer):\n", | |
"\n", | |
"```\n", | |
"Sun Apr 10 20:36:41 PDT 2016\n", | |
"record: org.apache.kafka.clients.consumer.ConsumerRecords@79ca92b9\n", | |
"count: 3\n", | |
"isEmpty: false\n", | |
"partition: [test-0]\n", | |
"records: org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable@4387b79e\n", | |
"0: Key :: null\n", | |
"0: Value :: 123\n", | |
"0: Part...:: 0\n", | |
"0: Topic :: test\n", | |
"0: Offset :: 55\n", | |
"1: Key :: null\n", | |
"1: Value :: 333\n", | |
"1: Part...:: 0\n", | |
"1: Topic :: test\n", | |
"1: Offset :: 56\n", | |
"2: Key :: null\n", | |
"2: Value :: 34\n", | |
"2: Part...:: 0\n", | |
"2: Topic :: test\n", | |
"2: Offset :: 57\n", | |
"Sun Apr 10 20:36:41 PDT 2016\n", | |
"Sun Apr 10 20:36:42 PDT 2016\n", | |
"record: org.apache.kafka.clients.consumer.ConsumerRecords@78dd667e\n", | |
"count: 2\n", | |
"isEmpty: false\n", | |
"partition: [test-0]\n", | |
"records: org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable@10db82ae\n", | |
"0: Key :: null\n", | |
"0: Value :: \n", | |
"0: Part...:: 0\n", | |
"0: Topic :: test\n", | |
"0: Offset :: 58\n", | |
"1: Key :: null\n", | |
"1: Value :: \n", | |
"1: Part...:: 0\n", | |
"1: Topic :: test\n", | |
"1: Offset :: 59\n", | |
"Sun Apr 10 20:36:43 PDT 2016\n", | |
"```\n", | |
"\n", | |
"Here are some of the details in the POM file (streaming not used yet):\n", | |
"```\n", | |
"\n", | |
" <dependency>\n", | |
" <groupId>org.apache.spark</groupId>\n", | |
" <artifactId>spark-streaming-kafka_2.11</artifactId>\n", | |
" <version>1.6.1</version>\n", | |
" </dependency>\n", | |
"\n", | |
" <dependency>\n", | |
" <groupId>org.apache.kafka</groupId>\n", | |
" <artifactId>kafka_2.11</artifactId>\n", | |
" <version>0.9.0.1</version>\n", | |
" </dependency>\n", | |
" \n", | |
" <dependency>\n", | |
" <groupId>org.apache.kafka</groupId>\n", | |
" <artifactId>kafka-clients</artifactId>\n", | |
" <version>0.9.0.1</version>\n", | |
" </dependency>\n", | |
" \n", | |
"```\n", | |
"\n", | |
"\n", | |
"\n", | |
"Since the next steps are proprietary, please see my other Gist on JSON, DataFrames, PostgreSQL & Spark ... \n", | |
"\n", | |
"\n", | |
"The plan is to ingest a JSON file and parse this JSON as a consumable topic into a DataFrame and then insert the JSON data into a database such as Cassandra or PostgreSQL or Hive/HDFS. The following will force a file to load:\n", | |
"\n", | |
"```\n", | |
" kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt\n", | |
"```\n", | |
"\n", | |
"After this ... my other gist can help to get to the next steps.\n", | |
"\n", | |
"\n", | |
"### Further Reading on the Web\n", | |
"\n", | |
"Trusting what you read on the web ... some blogs may have cut-n-pasted other blogs' content -- even the wrong parts. Look for a github repo.\n", | |
"\n", | |
"That said ... Look for blogs by Jay Kreps, Neha Narkhede, Gwen Shapiro,\n", | |
"\n", | |
"Thanks for the pointers from Asim Jalis and Jesse Anderson.\n", | |
"\n", | |
"\n", | |
"### References\n", | |
"\n", | |
"Starting with Jesse's Blog: http://www.jesse-anderson.com/2016/04/kafka-0-9-0-changes-for-developers/ \n", | |
"\n", | |
"Kafka Quickstart: http://kafka.apache.org/documentation.html#quickstart\n", | |
"\n", | |
"API: http://kafka.apache.org/090/javadoc/\n", | |
"\n", | |
"Download Kafka: http://kafka.apache.org/downloads.html\n", | |
"\n", | |
"Converting Java to Scala: http://javatoscala.com/\n", | |
"\n", | |
"\n", | |
"### How Did I Make This Jupyter Toree Document?\n", | |
"I used Asim Jalis's blog, https://github.com/asimjalis/apache-toree-quickstart, to install Toree and write this article. BTW - Please do not install Toree and Juypter with sudo, just use (or with pip3):\n", | |
"\n", | |
"pip install jupyter\n", | |
"pip install notebook\n", | |
"pip install toree\n", | |
"\n", | |
"When you get to the step of Toree in Jupyter add the JDBC driver:\n", | |
"```\n", | |
"SPARK_OPTS=\"--packages=$SPARK_PKGS\"\n", | |
"jupyter toree install --spark_home=$SPARK_HOME --spark_opts=\"--driver-class-path=/SomePathTo/postgresql-9.4.1208.jar $SPARK_OPTS\"\n", | |
"```\n", | |
"\n", | |
"\n", | |
"### Contact:\n", | |
"You can contact me at stephanwarren on ymail.com" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Toree", | |
"language": "", | |
"name": "toree" | |
}, | |
"language_info": { | |
"name": "scala" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment