Skip to content

Instantly share code, notes, and snippets.

@rajkrrsingh
Last active December 18, 2019 09:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rajkrrsingh/0809f155c72194fd69cf1503b68589ac to your computer and use it in GitHub Desktop.
Save rajkrrsingh/0809f155c72194fd69cf1503b68589ac to your computer and use it in GitHub Desktop.
Spark Kafka Consumer in secure( Kerberos) enviornment

Sample Application

using direct stream
 import kafka.serializer.StringDecoder;
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
 
 
 object SparkKafkaConsumer2 {
 
   def main(args: Array[String]) {
 
     // TODO: Print out line in log of authenticated user
     val Array(brokerlist, group, topics, numThreads) = args
     var kafkaParams = Map(
       "bootstrap.servers"->"rks253secure.hdp.local:6667",
       "key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer",
       "value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer",
       "group.id"-> "test",
       "security.protocol"->"PLAINTEXTSASL",
       "auto.offset.reset"-> "smallest"
     )
 
     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
     val ssc = new StreamingContext(sparkConf, Seconds(100))
 
     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, Set(topics))
 
     // TODO: change to be a variable
     kafkaStream.saveAsTextFiles("/tmp/streaming_output")
     ssc.start()
     ssc.awaitTermination()
   }
 }
using createStream
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils

object Kafka_Word_Count {

 def main(args: Array[String]) {
   
   val conf = new SparkConf().setAppName("KafkaWordCount")
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     .set("spark.driver.allowMultipleContexts", "true")

   val ssc = new StreamingContext(conf, Seconds(3))
   val groupID = "test"
   val numThreads = "2"
   val topic = "kafkatopic"
   val topicMap = topic.split(",").map((_, numThreads.toInt)).toMap

   val kafkaParams = Map[String, String](
     "zookeeper.connect" -> "rks253secure.hdp.local:2181",
     "group.id" -> groupID,
     "zookeeper.connection.timeout.ms" -> "10000",
     "security.protocol"->"PLAINTEXTSASL"
   )

   val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,
     StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
   lines.print()
   ssc.start()
   ssc.awaitTermination()
 }
}

kafka_jaas.conf (for spark local mode)

KafkaServer { 
com.sun.security.auth.module.Krb5LoginModule required 
useKeyTab=true 
keyTab="/etc/security/keytabs/kafka.service.keytab" 
storeKey=true 
useTicketCache=false 
serviceName="kafka" 
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM"; 
}; 
KafkaClient { 
com.sun.security.auth.module.Krb5LoginModule required 
useTicketCache=true 
renewTicket=true 
serviceName="kafka"; 
}; 
Client { 
com.sun.security.auth.module.Krb5LoginModule required 
useKeyTab=true 
keyTab="/etc/security/keytabs/kafka.service.keytab" 
storeKey=true 
useTicketCache=false 
serviceName="zookeeper" 
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM"; 
};

kafka_jaas.conf (for spark yarn client mode)

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
serviceName="kafka"
principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/kafka.service.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="zookeeper"
   principal="kafka/rks253secure.hdp.local@EXAMPLE.COM";
};

Spark Submit command

kinit from kafka user..

spark-submit --files /etc/kafka/conf/kafka_jaas.conf,/etc/security/keytabs/kafka.service.keytab --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/etc/kafka/conf/kafka_jaas.conf" --class SparkKafkaConsumer2 --master local[2] 
/tmp/SparkKafkaSampleApp-1.0-SNAPSHOT-jar-with-dependencies.jar "rks253secure.hdp.local:6667" test kafkatopic 1 
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment