Skip to content

Instantly share code, notes, and snippets.

@natewave
Created September 17, 2015 08:49
Show Gist options
  • Save natewave/094344befa19b2fa0fb4 to your computer and use it in GitHub Desktop.
Save natewave/094344befa19b2fa0fb4 to your computer and use it in GitHub Desktop.
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE)
(keyRule:GenericRecord => VA[A], valueRule: GenericRecord => VA[B]): DStream[(A, B)] = {
val sparkConf = streamingContext.sparkContext.getConf
val appName = sparkConf.get("spark.app.name")
val offsetsCoordinator = OffsetsCoordinator.get(
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port),
clientId = consumerConfig.clientId,
groupId = consumerConfig.groupId)
offsetsCoordinator.acquireAndGet { coordinator =>
val topicPartitions: Seq[TopicAndPartition] =
OffsetsOperations.getTopicPartitions(consumerConfig.topic, coordinator.broker, consumerConfig.clientId)
val lastOffsets: Try[Map[TopicAndPartition, Long]] =
OffsetsOperations.fetch(
topicsAndPartitions = topicPartitions,
groupId = consumerConfig.groupId,
clientId = consumerConfig.clientId,
coordinator = coordinator)
lastOffsets match {
case Success(offsets: Map[TopicAndPartition, Long]) => {
// set maxRate if not already set, otherwise spark will try to load all the data into one RDD
if (!sparkConf.contains("spark.streaming.kafka.maxRatePerPartition")) {
sparkConf.set(
"spark.streaming.kafka.maxRatePerPartition",
maxRate.toString)
}
//
val params = consumerConfig.params
val messageHandler = (mmd: MessageAndMetadata[GenericRecord, GenericRecord]) => (mmd.key, mmd.message)
val genericKafkaStream: DStream[(GenericRecord, GenericRecord)] = KafkaUtils.createDirectStream[GenericRecord, GenericRecord, GenericRecordDecoder, GenericRecordDecoder, (GenericRecord, GenericRecord)](streamingContext, params, offsets, messageHandler)
val validatedStream: DStream[(VA[A], VA[B])] = genericKafkaStream.transform { rdd =>
rdd.map(record => (keyRule(record._1), valueRule(record._2)))
}
//
val listener = new KafkaListener(offsetRanges => commitOffsets(offsetRanges, consumerConfig.groupId, consumerConfig.clientId, coordinator))
streamingContext.sparkContext.addSparkListener(listener)
validatedStream.transform { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val rddId = rdd.id
val successRDD = rdd.filter(kv => kv._1.isSuccess && kv._2.isSuccess).map(kv => (kv._1.get, kv._2.get))
listener.registerKafkaRDD(rddId, offsetRanges)
successRDD
}
}
case Failure(reason) => throw new RuntimeException("Could not fetch offsets. Refusing to start from beginning")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment