Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# Spark Streaming with Cassandra (Azure Cosmos DB)
1. https://github.com/mspnp/azure-databricks-streaming-analytics
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "false")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Write to Event Hubs for Kafka
Writing to a Kafka-enabled Event Hub is easier than ever before. Just make sure to update the BOOTSTRAP_SERVERS and EH_SASL variables with the information from your EventHub namespace. Check out our example Spark producer for the full sample.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment