Created
January 4, 2021 16:57
-
-
Save mdrakiburrahman/5a0bc0c55eebe5173ed54a8db5e394c4 to your computer and use it in GitHub Desktop.
Ingest Event Hub Capture data with Autoloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import widgets for Event Hub (i.e. topic) name | |
# Set default | |
dbutils.widgets.text("topic", "your--topic--name") | |
topic = dbutils.widgets.get("topic") | |
# Import required modules | |
from pyspark.sql import * | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
# Localize environment | |
# ADLS where Event Hub Capture is configured | |
adls_account = "your--adls--acount" | |
adls_key = "your--adls--key" | |
queue_sas = "your--storage--account--queue--sas--key" | |
sp_subscriptionId = "your--subscription--id" | |
sp_tenantId = "your--tenant--id" | |
sp_clientId = "your--client--id" | |
sp_clientKey = "your--client--key" | |
sp_rgName = "your--resource--group" | |
# Set configuration objects | |
spark.conf.set("fs.azure.account.key." + adls_account + ".dfs.core.windows.net", adls_key) | |
spark.conf.set("fs.azure.account.key." + adls_account + ".blob.core.windows.net", adls_key) | |
# Schema | |
# Capture Schema | |
capture_tmp = spark.read.format("avro").load("/mnt/bronze/tmp/{}-sample.avro".format(topic)) | |
capture_schema = capture_tmp.schema | |
# Payload Schema | |
payload_tmp = spark.read.json("/mnt/bronze/tmp/{}-payload-sample.json".format(topic)) | |
payload_schema = payload_tmp.schema | |
# cloudFile settings | |
cloudFilesConf = { | |
"cloudFiles.subscriptionId": sp_subscriptionId, | |
"cloudFiles.connectionString": queue_sas, | |
"cloudFiles.format": "avro", | |
"cloudFiles.tenantId": sp_tenantId, | |
"cloudFiles.clientId": sp_clientId, | |
"cloudFiles.clientSecret": sp_clientKey, | |
"cloudFiles.resourceGroup": sp_rgName, | |
"cloudFiles.useNotifications": "true", | |
"cloudFiles.includeExistingFiles": "true", | |
"cloudFiles.validateOptions": "true", | |
} | |
capture_container = "your--capture--container" | |
eh_namespace = "your--eventhub--namespace" | |
autoloader_df = (spark.readStream.format("cloudFiles") | |
.options(**cloudFilesConf) | |
.option("recursiveFileLookup", "true") # This lets us ignore folder level partitioning into the incoming Dataframe | |
.schema(capture_schema) | |
.load("wasbs://{}@{}.blob.core.windows.net/{}/{}/".format(capture_container, adls_account, eh_namespace, topic)) | |
) | |
# Convert Body to String for our upcoming transformations | |
autoloader_df = autoloader_df.withColumn("Body", autoloader_df["Body"].cast(StringType())) | |
payload_df = autoloader_df \ | |
.select(from_json(col("Body"), payload_schema).alias("json_payload")) \ | |
.select("json_payload.*") | |
display(payload_df) | |
""" | |
We now have our Event Hub Capture being parsed by the Autoloader pipeline, available in `payload_df` | |
At this point, we're free to do whatever we want with our `payload_df`, such as writing to Delta Tables (after removing the `display` above of course): | |
(payload_df.writeStream | |
.format("delta") | |
.trigger(once=True) | |
.outputMode("append") | |
.option("checkpointLocation", "your-checkpoint-location") | |
.start("your-delta-table-path") | |
) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment