Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mdrakiburrahman/5a0bc0c55eebe5173ed54a8db5e394c4 to your computer and use it in GitHub Desktop.
Save mdrakiburrahman/5a0bc0c55eebe5173ed54a8db5e394c4 to your computer and use it in GitHub Desktop.
Ingest Event Hub Capture data with Autoloader
# Import widgets for Event Hub (i.e. topic) name
# Set default
dbutils.widgets.text("topic", "your--topic--name")
topic = dbutils.widgets.get("topic")
# Import required modules
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Localize environment
# ADLS where Event Hub Capture is configured
adls_account = "your--adls--acount"
adls_key = "your--adls--key"
queue_sas = "your--storage--account--queue--sas--key"
sp_subscriptionId = "your--subscription--id"
sp_tenantId = "your--tenant--id"
sp_clientId = "your--client--id"
sp_clientKey = "your--client--key"
sp_rgName = "your--resource--group"
# Set configuration objects
spark.conf.set("fs.azure.account.key." + adls_account + ".dfs.core.windows.net", adls_key)
spark.conf.set("fs.azure.account.key." + adls_account + ".blob.core.windows.net", adls_key)
# Schema
# Capture Schema
capture_tmp = spark.read.format("avro").load("/mnt/bronze/tmp/{}-sample.avro".format(topic))
capture_schema = capture_tmp.schema
# Payload Schema
payload_tmp = spark.read.json("/mnt/bronze/tmp/{}-payload-sample.json".format(topic))
payload_schema = payload_tmp.schema
# cloudFile settings
cloudFilesConf = {
"cloudFiles.subscriptionId": sp_subscriptionId,
"cloudFiles.connectionString": queue_sas,
"cloudFiles.format": "avro",
"cloudFiles.tenantId": sp_tenantId,
"cloudFiles.clientId": sp_clientId,
"cloudFiles.clientSecret": sp_clientKey,
"cloudFiles.resourceGroup": sp_rgName,
"cloudFiles.useNotifications": "true",
"cloudFiles.includeExistingFiles": "true",
"cloudFiles.validateOptions": "true",
}
capture_container = "your--capture--container"
eh_namespace = "your--eventhub--namespace"
autoloader_df = (spark.readStream.format("cloudFiles")
.options(**cloudFilesConf)
.option("recursiveFileLookup", "true") # This lets us ignore folder level partitioning into the incoming Dataframe
.schema(capture_schema)
.load("wasbs://{}@{}.blob.core.windows.net/{}/{}/".format(capture_container, adls_account, eh_namespace, topic))
)
# Convert Body to String for our upcoming transformations
autoloader_df = autoloader_df.withColumn("Body", autoloader_df["Body"].cast(StringType()))
payload_df = autoloader_df \
.select(from_json(col("Body"), payload_schema).alias("json_payload")) \
.select("json_payload.*")
display(payload_df)
"""
We now have our Event Hub Capture being parsed by the Autoloader pipeline, available in `payload_df`
At this point, we're free to do whatever we want with our `payload_df`, such as writing to Delta Tables (after removing the `display` above of course):
(payload_df.writeStream
.format("delta")
.trigger(once=True)
.outputMode("append")
.option("checkpointLocation", "your-checkpoint-location")
.start("your-delta-table-path")
)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment