Skip to content

Instantly share code, notes, and snippets.

@codeaperature
Last active April 11, 2016 17:54
Show Gist options
  • Save codeaperature/15088179a11fe13bd61e62fd169de1b4 to your computer and use it in GitHub Desktop.
Save codeaperature/15088179a11fe13bd61e62fd169de1b4 to your computer and use it in GitHub Desktop.
Apache Kafka Scala Polled Example
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Kafka - Scala Example of Kafka Polled Consumer\n",
"\n",
"### Stephan Warren - April 9th, 2016\n",
"\n",
"Stephan Warren - stephanwarren at ymail\n",
"\n",
"Initially we are going to follow the quickstart. (See below for where to find this quickstart)\n",
"\n",
"\n",
"#### Download Kafka from http://kafka.apache.org/downloads.html \n",
"Choose the version that is kafka_2.11-0.9.0.1.tgz\n",
"Verify the MD5 (as required)\n",
"\n",
"> tar -xzf kafka_2.11-0.9.0.0.tgz\n",
"> cd kafka_2.11-0.9.0.0\n",
"\n",
"\n",
"#### Docs\n",
"In the decompressed directory, in the subdirectory called site-docs is a file called kafka_2.11-0.9.0.1-site-docs.tgz, you can find the file ./kafka_2.11-0.9.0.1/site-docs/site-docs/quickstart.html. Some of this file is contained in this Toree Notebook.\n",
"\n",
"#### Start Zookeeper (ZK) & Kafka\n",
"Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.\n",
"```\n",
"$ bin/zookeeper-server-start.sh config/zookeeper.properties\n",
"[2016-04-09 19:57:33,818] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)\n",
"[2016-04-09 19:57:33,821] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)\n",
"[2016-04-09 19:57:33,821] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)\n",
"[2016-04-09 19:57:33,821] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)\n",
"... (etc)\n",
"```\n",
"In another shell, start the Kafka server (excuse this long list - the settings may be interesting at a later point):\n",
"```\n",
"bin/kafka-server-start.sh config/server.properties\n",
"[2016-04-09 20:00:33,936] INFO KafkaConfig values: \n",
"\tadvertised.host.name = null\n",
"\tmetric.reporters = []\n",
"\tquota.producer.default = 9223372036854775807\n",
"\toffsets.topic.num.partitions = 50\n",
"\tlog.flush.interval.messages = 9223372036854775807\n",
"\tauto.create.topics.enable = true\n",
"\tcontroller.socket.timeout.ms = 30000\n",
"\tlog.flush.interval.ms = null\n",
"\tprincipal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder\n",
"\treplica.socket.receive.buffer.bytes = 65536\n",
"\tmin.insync.replicas = 1\n",
"\treplica.fetch.wait.max.ms = 500\n",
"\tnum.recovery.threads.per.data.dir = 1\n",
"\tssl.keystore.type = JKS\n",
"\tdefault.replication.factor = 1\n",
"\tssl.truststore.password = null\n",
"\tlog.preallocate = false\n",
"\tsasl.kerberos.principal.to.local.rules = [DEFAULT]\n",
"\tfetch.purgatory.purge.interval.requests = 1000\n",
"\tssl.endpoint.identification.algorithm = null\n",
"\treplica.socket.timeout.ms = 30000\n",
"\tmessage.max.bytes = 1000012\n",
"\tnum.io.threads = 8\n",
"\toffsets.commit.required.acks = -1\n",
"\tlog.flush.offset.checkpoint.interval.ms = 60000\n",
"\tdelete.topic.enable = false\n",
"\tquota.window.size.seconds = 1\n",
"\tssl.truststore.type = JKS\n",
"\toffsets.commit.timeout.ms = 5000\n",
"\tquota.window.num = 11\n",
"\tzookeeper.connect = localhost:2181\n",
"\tauthorizer.class.name = \n",
"\tnum.replica.fetchers = 1\n",
"\tlog.retention.ms = null\n",
"\tlog.roll.jitter.hours = 0\n",
"\tlog.cleaner.enable = true\n",
"\toffsets.load.buffer.size = 5242880\n",
"\tlog.cleaner.delete.retention.ms = 86400000\n",
"\tssl.client.auth = none\n",
"\tcontrolled.shutdown.max.retries = 3\n",
"\tqueued.max.requests = 500\n",
"\toffsets.topic.replication.factor = 3\n",
"\tlog.cleaner.threads = 1\n",
"\tsasl.kerberos.service.name = null\n",
"\tsasl.kerberos.ticket.renew.jitter = 0.05\n",
"\tsocket.request.max.bytes = 104857600\n",
"\tssl.trustmanager.algorithm = PKIX\n",
"\tzookeeper.session.timeout.ms = 6000\n",
"\tlog.retention.bytes = -1\n",
"\tsasl.kerberos.min.time.before.relogin = 60000\n",
"\tzookeeper.set.acl = false\n",
"\tconnections.max.idle.ms = 600000\n",
"\toffsets.retention.minutes = 1440\n",
"\treplica.fetch.backoff.ms = 1000\n",
"\tinter.broker.protocol.version = 0.9.0.X\n",
"\tlog.retention.hours = 168\n",
"\tnum.partitions = 1\n",
"\tbroker.id.generation.enable = true\n",
"\tlisteners = PLAINTEXT://:9092\n",
"\tssl.provider = null\n",
"\tssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\n",
"\tlog.roll.ms = null\n",
"\tlog.flush.scheduler.interval.ms = 9223372036854775807\n",
"\tssl.cipher.suites = null\n",
"\tlog.index.size.max.bytes = 10485760\n",
"\tssl.keymanager.algorithm = SunX509\n",
"\tsecurity.inter.broker.protocol = PLAINTEXT\n",
"\treplica.fetch.max.bytes = 1048576\n",
"\tadvertised.port = null\n",
"\tlog.cleaner.dedupe.buffer.size = 134217728\n",
"\treplica.high.watermark.checkpoint.interval.ms = 5000\n",
"\tlog.cleaner.io.buffer.size = 524288\n",
"\tsasl.kerberos.ticket.renew.window.factor = 0.8\n",
"\tzookeeper.connection.timeout.ms = 6000\n",
"\tcontrolled.shutdown.retry.backoff.ms = 5000\n",
"\tlog.roll.hours = 168\n",
"\tlog.cleanup.policy = delete\n",
"\thost.name = \n",
"\tlog.roll.jitter.ms = null\n",
"\tmax.connections.per.ip = 2147483647\n",
"\toffsets.topic.segment.bytes = 104857600\n",
"\tbackground.threads = 10\n",
"\tquota.consumer.default = 9223372036854775807\n",
"\trequest.timeout.ms = 30000\n",
"\tlog.index.interval.bytes = 4096\n",
"\tlog.dir = /tmp/kafka-logs\n",
"\tlog.segment.bytes = 1073741824\n",
"\tlog.cleaner.backoff.ms = 15000\n",
"\toffset.metadata.max.bytes = 4096\n",
"\tssl.truststore.location = null\n",
"\tgroup.max.session.timeout.ms = 30000\n",
"\tssl.keystore.password = null\n",
"\tzookeeper.sync.time.ms = 2000\n",
"\tport = 9092\n",
"\tlog.retention.minutes = null\n",
"\tlog.segment.delete.delay.ms = 60000\n",
"\tlog.dirs = /tmp/kafka-logs\n",
"\tcontrolled.shutdown.enable = true\n",
"\tcompression.type = producer\n",
"\tmax.connections.per.ip.overrides = \n",
"\tsasl.kerberos.kinit.cmd = /usr/bin/kinit\n",
"\tlog.cleaner.io.max.bytes.per.second = 1.7976931348623157E308\n",
"\tauto.leader.rebalance.enable = true\n",
"\tleader.imbalance.check.interval.seconds = 300\n",
"\tlog.cleaner.min.cleanable.ratio = 0.5\n",
"\treplica.lag.time.max.ms = 10000\n",
"\tnum.network.threads = 3\n",
"\tssl.key.password = null\n",
"\treserved.broker.max.id = 1000\n",
"\tmetrics.num.samples = 2\n",
"\tsocket.send.buffer.bytes = 102400\n",
"\tssl.protocol = TLS\n",
"\tsocket.receive.buffer.bytes = 102400\n",
"\tssl.keystore.location = null\n",
"\treplica.fetch.min.bytes = 1\n",
"\tunclean.leader.election.enable = true\n",
"\tgroup.min.session.timeout.ms = 6000\n",
"\tlog.cleaner.io.buffer.load.factor = 0.9\n",
"\toffsets.retention.check.interval.ms = 600000\n",
"\tproducer.purgatory.purge.interval.requests = 1000\n",
"\tmetrics.sample.window.ms = 30000\n",
"\tbroker.id = 0\n",
"\toffsets.topic.compression.codec = 0\n",
"\tlog.retention.check.interval.ms = 300000\n",
"\tadvertised.listeners = null\n",
"\tleader.imbalance.per.broker.percentage = 10\n",
" (kafka.server.KafkaConfig)\n",
"[2016-04-09 20:00:33,985] INFO starting (kafka.server.KafkaServer)\n",
"[2016-04-09 20:00:33,988] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)\n",
"[2016-04-09 20:00:33,997] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)\n",
"```\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Setup Kafka\n",
"\n",
"To set-up a topic for testing, use the following (in yet another shell):\n",
"\n",
"```\n",
"./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic_name\n",
"```\n",
"\n",
"##### WARNING: Do not use underscores for a topic - This messed up my system. The console would not echo messages despite the benign warning: \n",
"Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.\n",
"Created topic \"test_topic_name\".\n",
"```\n",
"./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test\n",
"Created topic \"test\".\n",
"\n",
"./bin/kafka-topics.sh --zookeeper localhost:2181 --list\n",
"test\n",
"```\n",
"\n",
"Note: Above, there will be one partition for this topic and one replication. Typically 3 replicants are used in production and each partitions will be serviced by one of the consumers in what's called a consumer group. Ypu can have several consumer groups consuming from the same topic:\n",
"\n",
"See:\n",
"Partititions - http://kafka.apache.org/images/log_anatomy.png\n",
"\n",
"and:\n",
"\n",
"Consumer Groups: http://kafka.apache.org/images/consumer-groups.png\n",
"\n",
"\n",
"\n",
"Let's check the broker is working by sending (producing messages):\n",
"```\n",
"/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test\n",
"This is a message\n",
"This is another message\n",
"This is a 3rd message\n",
"This 4th message is getting tacky \n",
"```\n",
"\n",
"And did we get these messages?\n",
"\n",
"```\n",
"bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning\n",
"```\n",
"Looks like it:\n",
"```\n",
"This is a message\n",
"This is another message\n",
"This is a 3rd message\n",
"This 4th message is getting tacky\n",
"```\n",
"\n",
"\n",
"For setting up more Kafka instances, see the quickstart.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"### Consumer for Polled Messages\n",
"Here is the Spark code needed to get this going:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import org.apache.kafka._\n",
"import java.util.{Calendar, Properties}\n",
"import java.util\n",
"import org.apache.kafka.clients.consumer._\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
" def runKafkaConsumer : Unit = {\n",
"// val topic = \"test\"\n",
" val props = new Properties()\n",
" props.put(\"bootstrap.servers\", \"localhost:9092\")\n",
" props.put(\"group.id\", \"test_IntelliJ\")\n",
" props.put(\"enable.auto.commit\", \"true\")\n",
" props.put(\"auto.commit.interval.ms\", \"1000\")\n",
" props.put(\"session.timeout.ms\", \"30000\")\n",
" props.put(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")\n",
" props.put(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")\n",
" // no partitions yet\n",
" // props.put(\"partition.assignment.strategy\", \"range\");\n",
"\n",
"\n",
" val iterConsumer = new KafkaConsumer[String, String](props)\n",
" iterConsumer.subscribe(util.Arrays.asList(\"test\"))\n",
" var isRunning = true\n",
" while(isRunning) {\n",
" val records = iterConsumer.poll(1000)\n",
" if(!records.isEmpty) {\n",
" println(\"record: \"+ records)\n",
" println(\"count: \"+ records.count)\n",
" println(\"isEmpty: \"+ records.isEmpty)\n",
" println(\"partition: \"+ records.partitions)\n",
" println(\"records: \"+ records.records(\"test\"))\n",
" var i = 0\n",
" val iterRecords = records.iterator\n",
" while (iterRecords.hasNext) {\n",
" val consumerRec = iterRecords.next()\n",
" println(i+ \": Key :: \" + consumerRec.key)\n",
" println(i+ \": Value :: \" + consumerRec.value)\n",
" println(i+ \": Part...:: \" + consumerRec.partition)\n",
" println(i+ \": Topic :: \" + consumerRec.topic)\n",
" println(i+ \": Offset :: \" + consumerRec.offset)\n",
" i += 1\n",
" }\n",
" }\n",
" println(Calendar.getInstance().getTime())\n",
" }\n",
" iterConsumer.close()\n",
" }\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"### Getting some data:\n",
"Bear in mind that the data is being quick keyed with:\n",
"```\n",
"/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test\n",
"```\n",
"\n",
"Here is some sample output (the key is missing with the .sh producer):\n",
"\n",
"```\n",
"Sun Apr 10 20:36:41 PDT 2016\n",
"record: org.apache.kafka.clients.consumer.ConsumerRecords@79ca92b9\n",
"count: 3\n",
"isEmpty: false\n",
"partition: [test-0]\n",
"records: org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable@4387b79e\n",
"0: Key :: null\n",
"0: Value :: 123\n",
"0: Part...:: 0\n",
"0: Topic :: test\n",
"0: Offset :: 55\n",
"1: Key :: null\n",
"1: Value :: 333\n",
"1: Part...:: 0\n",
"1: Topic :: test\n",
"1: Offset :: 56\n",
"2: Key :: null\n",
"2: Value :: 34\n",
"2: Part...:: 0\n",
"2: Topic :: test\n",
"2: Offset :: 57\n",
"Sun Apr 10 20:36:41 PDT 2016\n",
"Sun Apr 10 20:36:42 PDT 2016\n",
"record: org.apache.kafka.clients.consumer.ConsumerRecords@78dd667e\n",
"count: 2\n",
"isEmpty: false\n",
"partition: [test-0]\n",
"records: org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable@10db82ae\n",
"0: Key :: null\n",
"0: Value :: \n",
"0: Part...:: 0\n",
"0: Topic :: test\n",
"0: Offset :: 58\n",
"1: Key :: null\n",
"1: Value :: \n",
"1: Part...:: 0\n",
"1: Topic :: test\n",
"1: Offset :: 59\n",
"Sun Apr 10 20:36:43 PDT 2016\n",
"```\n",
"\n",
"Here are some of the details in the POM file (streaming not used yet):\n",
"```\n",
"\n",
" <dependency>\n",
" <groupId>org.apache.spark</groupId>\n",
" <artifactId>spark-streaming-kafka_2.11</artifactId>\n",
" <version>1.6.1</version>\n",
" </dependency>\n",
"\n",
" <dependency>\n",
" <groupId>org.apache.kafka</groupId>\n",
" <artifactId>kafka_2.11</artifactId>\n",
" <version>0.9.0.1</version>\n",
" </dependency>\n",
" \n",
" <dependency>\n",
" <groupId>org.apache.kafka</groupId>\n",
" <artifactId>kafka-clients</artifactId>\n",
" <version>0.9.0.1</version>\n",
" </dependency>\n",
" \n",
"```\n",
"\n",
"\n",
"\n",
"Since the next steps are proprietary, please see my other Gist on JSON, DataFrames, PostgreSQL & Spark ... \n",
"\n",
"\n",
"The plan is to ingest a JSON file and parse this JSON as a consumable topic into a DataFrame and then insert the JSON data into a database such as Cassandra or PostgreSQL or Hive/HDFS. The following will force a file to load:\n",
"\n",
"```\n",
" kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt\n",
"```\n",
"\n",
"After this ... my other gist can help to get to the next steps.\n",
"\n",
"\n",
"### Further Reading on the Web\n",
"\n",
"Trusting what you read on the web ... some blogs may have cut-n-pasted other blogs' content -- even the wrong parts. Look for a github repo.\n",
"\n",
"That said ... Look for blogs by Jay Kreps, Neha Narkhede, Gwen Shapiro,\n",
"\n",
"Thanks for the pointers from Asim Jalis and Jesse Anderson.\n",
"\n",
"\n",
"### References\n",
"\n",
"Starting with Jesse's Blog: http://www.jesse-anderson.com/2016/04/kafka-0-9-0-changes-for-developers/ \n",
"\n",
"Kafka Quickstart: http://kafka.apache.org/documentation.html#quickstart\n",
"\n",
"API: http://kafka.apache.org/090/javadoc/\n",
"\n",
"Download Kafka: http://kafka.apache.org/downloads.html\n",
"\n",
"Converting Java to Scala: http://javatoscala.com/\n",
"\n",
"\n",
"### How Did I Make This Jupyter Toree Document?\n",
"I used Asim Jalis's blog, https://github.com/asimjalis/apache-toree-quickstart, to install Toree and write this article. BTW - Please do not install Toree and Juypter with sudo, just use (or with pip3):\n",
"\n",
"pip install jupyter\n",
"pip install notebook\n",
"pip install toree\n",
"\n",
"When you get to the step of Toree in Jupyter add the JDBC driver:\n",
"```\n",
"SPARK_OPTS=\"--packages=$SPARK_PKGS\"\n",
"jupyter toree install --spark_home=$SPARK_HOME --spark_opts=\"--driver-class-path=/SomePathTo/postgresql-9.4.1208.jar $SPARK_OPTS\"\n",
"```\n",
"\n",
"\n",
"### Contact:\n",
"You can contact me at stephanwarren on ymail.com"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Toree",
"language": "",
"name": "toree"
},
"language_info": {
"name": "scala"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment