Created
February 14, 2022 16:25
-
-
Save mdrakiburrahman/8b12450a51dd6abeefb0699f210dab94 to your computer and use it in GitHub Desktop.
Spark Streaming in Synapse from Kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Kafka Topic ➡ Delta tables: `scene` via Spark Streaming | |
// `SOURCE`: Connect to Kafka topic as a streaming Dataframe: `raw_DF` | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types.StructType | |
import org.apache.spark.sql.functions.from_json | |
// Pull from Key Vault for non-sandbox | |
val TOPIC = "scene" | |
val BOOTSTRAP_SERVERS = "fraudkafka.servicebus.windows.net:9093" | |
val EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://fraudkafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...C2xw=\";" | |
// Read Raw Dataframe from Kafka | |
val raw_DF = (spark.readStream | |
.format("kafka") | |
.option("subscribe", TOPIC) | |
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) | |
.option("kafka.sasl.mechanism", "PLAIN") | |
.option("kafka.security.protocol", "SASL_SSL") | |
.option("kafka.sasl.jaas.config", EH_SASL) | |
.option("kafka.request.timeout.ms", "60000") | |
.option("kafka.session.timeout.ms", "60000") | |
.option("failOnDataLoss", "false") | |
.option("startingOffsets", "earliest") | |
.load() | |
.selectExpr("offset", "timestamp", "CAST(value AS STRING)") | |
.filter(not(col("value").contains("error")))) // filter errors from Mockaroo API | |
// Schema inference from sample JSON payload | |
// Can be used to dynamically infer schema from significantly nested payloads. | |
val jsData = Seq( | |
("""{ | |
"scene_id": "984c9fc6-ad75-42ae-8df9-da8339fa8b5f", | |
"first_name": "Fraser", | |
"last_name": "Lewerenz", | |
"bank": "Some Bank", | |
"origin_state": "Ontario", | |
"job_title": "Biostatistician II", | |
"email_address": "flewerenz0@eventbrite.com", | |
"card_type": "americanexpress", | |
"card_number": "372301219799089", | |
"card_use_frequency": "Yearly", | |
"trx_amount": 31.99, | |
"trx_timestamp": "2021-05-30 16:56:15", | |
"trx_latitude": 46.4969139, | |
"trx_longitude": -84.3457671, | |
"trx_purchase_type": "Games" | |
} | |
""") | |
) | |
val schema: StructType = spark.read.json(jsData.toDS).schema | |
val parsed_DF = raw_DF | |
.select($"*", from_json(col("value"), schema).as("data")) | |
.select("offset", "timestamp", "data.*") | |
parsed_DF.printSchema() | |
// ADLS config | |
// Azure storage access info | |
val adls_account_name = "aiaohmdwdevadls" | |
val adls_container_name = "synapse" | |
val adls_relative_path = "kafka/" | |
val linked_service_name = "aia-oh-mdw-dev-WorkspaceDefaultStorage" | |
// Grab SAS token | |
val adls_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name) | |
// Configure Spark to access from DFS endpoint | |
val abfss_path = f"abfss://$adls_container_name@$adls_account_name.dfs.core.windows.net/$adls_relative_path" | |
spark.conf.set(f"fs.azure.sas.$adls_container_name.$adls_account_name.dfs.core.windows.net",adls_sas_token) | |
// `SINK`: Streaming writes to Delta tables | |
parsed_DF.writeStream | |
.format("delta") | |
.outputMode("append") | |
.option("checkpointLocation","abfss://synapse@aiaohmdwdevadls.dfs.core.windows.net/kafka/scene_raw_Checkpoints/") | |
.start("abfss://synapse@aiaohmdwdevadls.dfs.core.windows.net/kafka/scene_raw") | |
.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment