Last active
October 26, 2017 00:20
-
-
Save joswlv/f66e91bb2649f7035b4f54a4d9d0368a to your computer and use it in GitHub Desktop.
KafkaDricetStream만들기
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import connector.stores.ZooKeeperOffsetsStore | |
import kafka.message.MessageAndMetadata | |
import kafka.serializer.StringDecoder | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.kafka.KafkaUtils | |
class KafkaStreamSource() { | |
def kafkaStream(ssc: StreamingContext, zookeeperStore: ZooKeeperOffsetsStore, topic: String, groupId: String, zkHosts: String, bootstrapServer: String, autoCommit: String) | |
: org.apache.spark.streaming.api.java.JavaPairInputDStream[String, String] = { | |
val kafkaParams = Map[String, String]( | |
"enable.auto.commit" -> "false", | |
"bootstrap.servers" -> bootstrapServer, | |
"zookeeper.connect" -> zkHosts, | |
"group.id" -> groupId, | |
"enable.auto.commit" -> autoCommit) | |
val topicsSet = Set(topic) | |
val storedOffsets = zookeeperStore.readOffsets() | |
val kafkaStream = storedOffsets match { | |
case map if map.isEmpty => | |
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) | |
case map => | |
try { | |
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) | |
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, map, messageHandler) | |
} catch { | |
case _: kafka.common.OffsetOutOfRangeException => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) | |
} | |
} | |
// save the offsets | |
kafkaStream.foreachRDD(rdd => zookeeperStore.saveOffsets(rdd)) | |
kafkaStream | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kafka.common.TopicAndPartition | |
import kafka.utils.{ZKGroupTopicDirs,ZKStringSerializer,ZkUtils} | |
import org.I0Itec.zkclient.ZkClient | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.kafka.HasOffsetRanges | |
class ZooKeeperOffsetsStore(ssc: StreamingContext,topic: String, groupId: String, zkHosts: String) extends OffsetsStore{ | |
val zkPath = "/consumers/"+groupId+"/offsets/"+topic | |
private val zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer) | |
override def readOffsets(): Map[TopicAndPartition, Long] = { | |
val topics = Seq(topic) | |
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicAndPartition, Long] | |
val partitionMap = ZkUtils.getPartitionsForTopics(zkClient, topics) | |
partitionMap.foreach(topicPartitions => { | |
val zKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1) | |
topicPartitions._2.foreach(partition => { | |
val offsetPath = zKGroupTopicDirs.consumerOffsetDir + "/" + partition | |
try { | |
val offsetStatTuple = ZkUtils.readData(zkClient, offsetPath) | |
if (offsetStatTuple != null) { | |
topicPartOffsetMap.put(new TopicAndPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) | |
} | |
} catch { | |
case e: Exception => print("!!Offset Read Fail!! =>"+ e) | |
} | |
}) | |
}) | |
topicPartOffsetMap.toMap | |
} | |
override def saveOffsets(rdd: RDD[_]): Unit = { | |
val stBuilder = new StringBuilder | |
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | |
offsetsRanges.foreach(offsetRange => { | |
val partitionPath = zkPath + "/" + offsetRange.partition | |
stBuilder.append(offsetRange.partition).append(":").append(offsetRange.untilOffset).append(",") | |
ZkUtils.updatePersistentPath(zkClient, partitionPath, offsetRange.untilOffset.toString) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment