Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Created February 14, 2022 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mdrakiburrahman/8b12450a51dd6abeefb0699f210dab94 to your computer and use it in GitHub Desktop.
Save mdrakiburrahman/8b12450a51dd6abeefb0699f210dab94 to your computer and use it in GitHub Desktop.
Spark Streaming in Synapse from Kafka
// Kafka Topic ➡ Delta tables: `scene` via Spark Streaming
// `SOURCE`: Connect to Kafka topic as a streaming Dataframe: `raw_DF`
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.from_json
// Pull from Key Vault for non-sandbox
val TOPIC = "scene"
val BOOTSTRAP_SERVERS = "fraudkafka.servicebus.windows.net:9093"
val EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://fraudkafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...C2xw=\";"
// Read Raw Dataframe from Kafka
val raw_DF = (spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.load()
.selectExpr("offset", "timestamp", "CAST(value AS STRING)")
.filter(not(col("value").contains("error")))) // filter errors from Mockaroo API
// Schema inference from sample JSON payload
// Can be used to dynamically infer schema from significantly nested payloads.
val jsData = Seq(
("""{
"scene_id": "984c9fc6-ad75-42ae-8df9-da8339fa8b5f",
"first_name": "Fraser",
"last_name": "Lewerenz",
"bank": "Some Bank",
"origin_state": "Ontario",
"job_title": "Biostatistician II",
"email_address": "flewerenz0@eventbrite.com",
"card_type": "americanexpress",
"card_number": "372301219799089",
"card_use_frequency": "Yearly",
"trx_amount": 31.99,
"trx_timestamp": "2021-05-30 16:56:15",
"trx_latitude": 46.4969139,
"trx_longitude": -84.3457671,
"trx_purchase_type": "Games"
}
""")
)
val schema: StructType = spark.read.json(jsData.toDS).schema
val parsed_DF = raw_DF
.select($"*", from_json(col("value"), schema).as("data"))
.select("offset", "timestamp", "data.*")
parsed_DF.printSchema()
// ADLS config
// Azure storage access info
val adls_account_name = "aiaohmdwdevadls"
val adls_container_name = "synapse"
val adls_relative_path = "kafka/"
val linked_service_name = "aia-oh-mdw-dev-WorkspaceDefaultStorage"
// Grab SAS token
val adls_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
// Configure Spark to access from DFS endpoint
val abfss_path = f"abfss://$adls_container_name@$adls_account_name.dfs.core.windows.net/$adls_relative_path"
spark.conf.set(f"fs.azure.sas.$adls_container_name.$adls_account_name.dfs.core.windows.net",adls_sas_token)
// `SINK`: Streaming writes to Delta tables
parsed_DF.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","abfss://synapse@aiaohmdwdevadls.dfs.core.windows.net/kafka/scene_raw_Checkpoints/")
.start("abfss://synapse@aiaohmdwdevadls.dfs.core.windows.net/kafka/scene_raw")
.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment