Last active September 21, 2016
"text": "%md\n\nFirst thing you need to do to run this notebook is to make sure Zeppelin knows how to pull in MapR Streams maven dependencies.\n\nAdd this artifact to the Spark interpreter (update the version as needed):\n\n`org.apache.kafka:kafka-clients:`\n\nAlso make sure that the MapR repository is set up:\n\n\n\nFollow the [Zeppelin documentation for dependency management]( for instructions on to do this.\n",
First thing you need to do to run this notebook is to make sure Zeppelin knows how to pull in MapR Streams maven dependencies.

Add this artifact to the Spark interpreter (update the version as needed):

`org.apache.kafka:kafka-clients:`

Also make sure that the MapR repository is set up:



Follow the Zeppelin documentation for dependency management for instructions on to do this.
"text": "%sh\nmaprcli stream info -path /tmp/stream || maprcli stream create -path /tmp/stream ",
produceperm defaultpartitions topicperm consumeperm autocreate clientcompression logicalsize path copyperm compression ttl numtopics physicalsize adminperm 
u:mapr 1 u:mapr u:mapr true true 57344 /tmp/stream u:mapr lz4 604800 1 106496 u:mapr
"text": " import java.util.Properties\n import org.apache.kafka.clients.producer.KafkaProducer\n import org.apache.kafka.clients.producer.RecordMetadata\n import org.apache.kafka.clients.producer.Callback\n\n class PrintAckCallback extends Callback {\n def onCompletion(metadata: RecordMetadata, exception: Exception) {\n if (exception != null) {\n println(\"Send callback returns the following exception\", exception)\n } else {\n println(\"The offset of the record we just sent is: \", metadata.offset())\n }\n }\n }\n \n val callback = new PrintAckCallback\n val props = new Properties()\n //props.put(\"bootstrap.servers\", \"localhost:9092\")\n\n props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\")\n props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\")\n props.put(\"acks\", \"all\")\n\n val TOPIC = \"/tmp/stream:test\"\n val producer = new KafkaProducer[String, String](props)\n\n for (i <- 1 to 50) {\n val record = new ProducerRecord(TOPIC, \"key\", s\"hello $i\")\n producer.send(record, callback)\n }\n producer.close()",
import java.util.Properties

import org.apache.kafka.clients.producer.KafkaProducer

import org.apache.kafka.clients.producer.RecordMetadata

import org.apache.kafka.clients.producer.Callback

defined class PrintAckCallback

callback: PrintAckCallback = $iwC$$iwC$PrintAckCallback@3cdc6479

props: java.util.Properties = {}

res104: Object = null

res105: Object = null

res106: Object = null

TOPIC: String = /tmp/stream:test

producer: org.apache.kafka.clients.producer.KafkaProducer[String,String] = org.apache.kafka.clients.producer.KafkaProducer@6eaa49fe
"text": "%sh\nmaprcli stream info -path /tmp/stream\nmaprcli stream delete -path /tmp/stream",
produceperm defaultpartitions topicperm consumeperm autocreate clientcompression logicalsize path copyperm compression ttl numtopics physicalsize adminperm 
u:mapr 1 u:mapr u:mapr true true 57344 /tmp/stream u:mapr lz4 604800 1 106496 u:mapr
