Skip to content

Instantly share code, notes, and snippets.

@vinodkc
Last active September 19, 2020 15:45
Show Gist options
  • Save vinodkc/f60578358ca55455c10fb72a19ec4b89 to your computer and use it in GitHub Desktop.
Save vinodkc/f60578358ca55455c10fb72a19ec4b89 to your computer and use it in GitHub Desktop.
HDP3 - Spark structured streaming Kafka integration
A) Spark structured streaming Kafka integration - SASL_PLAINTEXT
1) Prerequisites
[consumer-user@c220-node1 sslss]$ ll
-rw------- 1 consumer-user root 144 Apr 21 08:56 consumer-user.keytab
-rw-rw-r-- 1 consumer-user consumer-user 229 Apr 21 09:40 kafka_client_jaas.conf
[consumer-user@c220-node1 sslss]$ cat kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
principal="consumer-user@HWX.COM"
useKeyTab=true
serviceName="kafka"
keyTab="consumer-user.keytab"
client=true;
};
2) Start spark-shell [Kafka SASL_PLAINTEXT port - 6667]
cd ~/sslss
/usr/hdp/current/spark2-client/bin/spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --conf spark.security.credentials.hiveserver2.enabled=false --conf spark.driver.extraJavaOptions=" -Djava.security.auth.login.config=kafka_client_jaas.conf" --conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=kafka_client_jaas.conf " --files ./kafka_client_jaas.conf,./consumer-user.keytab
3) Paste the code
spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node1.squadron-labs.com:6667").option("subscribe", "test1").option("kafka.security.protocol", "SASL_PLAINTEXT").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].writeStream.outputMode("update").format("console").start()
4) From another host/terminal, run kafka-console-producer.sh and send test messages :
kinit -kt /etc/security/keytabs/producer-user.keytab producer-user
[root@c220-node2 bin]# ./kafka-console-producer.sh --broker-list c220-node3.squadron-labs.com:6668 --producer.config ~/sslnew/client-ssl.properties --topic test1
5) Sample output from Spark shell
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+
| key|value|
+----+-----+
|null| fs|
+----+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+
| key|value|
+----+-----+
|null| ffs|
B) Spark structured streaming Kafka integration - SASL_SSL
1) 1) Prerequisites
[consumer-user@c220-node1 sslss]$ ll
-rw-r--r-- 1 root root 997 Apr 21 08:54 client.truststore.jks
-rw------- 1 consumer-user root 144 Apr 21 08:56 consumer-user.keytab
-rw-rw-r-- 1 consumer-user consumer-user 229 Apr 21 09:40 kafka_client_jaas.conf
[consumer-user@c220-node1 sslss]$ cat kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
principal="consumer-user@HWX.COM"
useKeyTab=true
serviceName="kafka"
keyTab="consumer-user.keytab"
client=true;
};
2) Start spark-shell [kafka SASL_SSL port - 6668]
cd ~/sslss
/usr/hdp/current/spark2-client/bin/spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 --conf spark.security.credentials.hiveserver2.enabled=false --conf spark.driver.extraJavaOptions=" -Djava.security.auth.login.config=kafka_client_jaas.conf" --conf spark.executor.extraJavaOptions="-Djava.security.auth.login.config=kafka_client_jaas.conf " --files ./kafka_client_jaas.conf,./consumer-user.keytab,./client.truststore.jks
3) Paste the code
spark.readStream.format("kafka").option("kafka.bootstrap.servers", "c220-node1.squadron-labs.com:6668").option("subscribe", "test1").option("kafka.security.protocol", "SASL_SSL").option("kafka.ssl.truststore.location","client.truststore.jks").option("kafka.ssl.truststore.password","yourpassword").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].writeStream.outputMode("update").format("console").start()
4) From another host/terminal, run kafka-console-producer.sh and send test messages :
kinit -kt /etc/security/keytabs/producer-user.keytab producer-user
[root@c220-node2 bin]# ./kafka-console-producer.sh --broker-list c220-node3.squadron-labs.com:6668 --producer.config ~/sslnew/client-ssl.properties --topic test1
>SSL TEST
5) Sample output from Spark shell
-------------------------------------------
Batch: 2
-------------------------------------------
+----+--------+
| key| value|
+----+--------+
|null|SSL TEST|
+----+--------+
Happy Coding :)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment