Skip to content

Instantly share code, notes, and snippets.

@natewave
Created September 13, 2015 21:36
Show Gist options
  • Save natewave/fd2d54063171564dd0ec to your computer and use it in GitHub Desktop.
Save natewave/fd2d54063171564dd0ec to your computer and use it in GitHub Desktop.
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE)
(implicit keyRule: RuleLike[AvroValue, A], valueRule: RuleLike[AvroValue, B]): DStream[(VA[A], VA[B])] = {
val sparkConf = streamingContext.sparkContext.getConf
val appName = sparkConf.get("spark.app.name")
val offsetsCoordinator = OffsetsCoordinator.get(
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port),
clientId = consumerConfig.clientId,
groupId = consumerConfig.groupId)
offsetsCoordinator.acquireAndGet { coordinator =>
val topicPartitions: Seq[TopicAndPartition] =
OffsetsOperations.getTopicPartitions(consumerConfig.topic, coordinator.broker, consumerConfig.clientId)
val lastOffsets: Try[Map[TopicAndPartition, Long]] =
OffsetsOperations.fetch(
topicsAndPartitions = topicPartitions,
groupId = consumerConfig.groupId,
clientId = consumerConfig.clientId,
coordinator = coordinator)
lastOffsets match {
case Success(offsets: Map[TopicAndPartition, Long]) => {
// set maxRate if not already set, otherwise spark will try to load all the data into one RDD
if (!sparkConf.contains("spark.streaming.kafka.maxRatePerPartition")) {
sparkConf.set(
"spark.streaming.kafka.maxRatePerPartition",
maxRate.toString)
}
//
val params = consumerConfig.params ++ Map("schema.registry.url" -> "localhost:8081")
val messageHandler = (mmd: MessageAndMetadata[VA[A], VA[B]]) => (mmd.key, mmd.message)
val streamKafka: InputDStream[(VA[A], VA[B])] = KafkaUtils.createDirectStream[VA[A], VA[B], KafkaCustomDecoder[A], KafkaCustomDecoder[B], (VA[A], VA[B])](streamingContext, params, offsets, messageHandler)
//
val listener = new KafkaListener(offsetRanges => commitOffsets(offsetRanges, consumerConfig.groupId, consumerConfig.clientId, coordinator))
streamingContext.sparkContext.addSparkListener(listener)
streamKafka.transform { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val rddId = rdd.id
listener.registerKafkaRDD(rddId, offsetRanges)
rdd
}
}
case Failure(reason) => throw new RuntimeException("Could not fetch offsets. Refusing to start from begining")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment