Skip to content

Instantly share code, notes, and snippets.

@joswlv
Last active October 26, 2017 00:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joswlv/f66e91bb2649f7035b4f54a4d9d0368a to your computer and use it in GitHub Desktop.
Save joswlv/f66e91bb2649f7035b4f54a4d9d0368a to your computer and use it in GitHub Desktop.
KafkaDricetStream만들기
import connector.stores.ZooKeeperOffsetsStore
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
class KafkaStreamSource() {
def kafkaStream(ssc: StreamingContext, zookeeperStore: ZooKeeperOffsetsStore, topic: String, groupId: String, zkHosts: String, bootstrapServer: String, autoCommit: String)
: org.apache.spark.streaming.api.java.JavaPairInputDStream[String, String] = {
val kafkaParams = Map[String, String](
"enable.auto.commit" -> "false",
"bootstrap.servers" -> bootstrapServer,
"zookeeper.connect" -> zkHosts,
"group.id" -> groupId,
"enable.auto.commit" -> autoCommit)
val topicsSet = Set(topic)
val storedOffsets = zookeeperStore.readOffsets()
val kafkaStream = storedOffsets match {
case map if map.isEmpty =>
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
case map =>
try {
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, map, messageHandler)
} catch {
case _: kafka.common.OffsetOutOfRangeException => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
}
}
// save the offsets
kafkaStream.foreachRDD(rdd => zookeeperStore.saveOffsets(rdd))
kafkaStream
}
}
import kafka.common.TopicAndPartition
import kafka.utils.{ZKGroupTopicDirs,ZKStringSerializer,ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.HasOffsetRanges
class ZooKeeperOffsetsStore(ssc: StreamingContext,topic: String, groupId: String, zkHosts: String) extends OffsetsStore{
val zkPath = "/consumers/"+groupId+"/offsets/"+topic
private val zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer)
override def readOffsets(): Map[TopicAndPartition, Long] = {
val topics = Seq(topic)
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicAndPartition, Long]
val partitionMap = ZkUtils.getPartitionsForTopics(zkClient, topics)
partitionMap.foreach(topicPartitions => {
val zKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zKGroupTopicDirs.consumerOffsetDir + "/" + partition
try {
val offsetStatTuple = ZkUtils.readData(zkClient, offsetPath)
if (offsetStatTuple != null) {
topicPartOffsetMap.put(new TopicAndPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
}
} catch {
case e: Exception => print("!!Offset Read Fail!! =>"+ e)
}
})
})
topicPartOffsetMap.toMap
}
override def saveOffsets(rdd: RDD[_]): Unit = {
val stBuilder = new StringBuilder
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsRanges.foreach(offsetRange => {
val partitionPath = zkPath + "/" + offsetRange.partition
stBuilder.append(offsetRange.partition).append(":").append(offsetRange.untilOffset).append(",")
ZkUtils.updatePersistentPath(zkClient, partitionPath, offsetRange.untilOffset.toString)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment