Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Ingest Event Hub Capture data with Autoloader
# Import widgets for Event Hub (i.e. topic) name
# Set default
dbutils.widgets.text("topic", "your--topic--name")
topic = dbutils.widgets.get("topic")
# Import required modules
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Localize environment
# ADLS where Event Hub Capture is configured
adls_account = "your--adls--acount"
adls_key = "your--adls--key"
queue_sas = "your--storage--account--queue--sas--key"
sp_subscriptionId = "your--subscription--id"
sp_tenantId = "your--tenant--id"
sp_clientId = "your--client--id"
sp_clientKey = "your--client--key"
sp_rgName = "your--resource--group"
# Set configuration objects
spark.conf.set("" + adls_account + "", adls_key)
spark.conf.set("" + adls_account + "", adls_key)
# Schema
# Capture Schema
capture_tmp ="avro").load("/mnt/bronze/tmp/{}-sample.avro".format(topic))
capture_schema = capture_tmp.schema
# Payload Schema
payload_tmp ="/mnt/bronze/tmp/{}-payload-sample.json".format(topic))
payload_schema = payload_tmp.schema
# cloudFile settings
cloudFilesConf = {
"cloudFiles.subscriptionId": sp_subscriptionId,
"cloudFiles.connectionString": queue_sas,
"cloudFiles.format": "avro",
"cloudFiles.tenantId": sp_tenantId,
"cloudFiles.clientId": sp_clientId,
"cloudFiles.clientSecret": sp_clientKey,
"cloudFiles.resourceGroup": sp_rgName,
"cloudFiles.useNotifications": "true",
"cloudFiles.includeExistingFiles": "true",
"cloudFiles.validateOptions": "true",
capture_container = "your--capture--container"
eh_namespace = "your--eventhub--namespace"
autoloader_df = (spark.readStream.format("cloudFiles")
.option("recursiveFileLookup", "true") # This lets us ignore folder level partitioning into the incoming Dataframe
.load("wasbs://{}@{}{}/{}/".format(capture_container, adls_account, eh_namespace, topic))
# Convert Body to String for our upcoming transformations
autoloader_df = autoloader_df.withColumn("Body", autoloader_df["Body"].cast(StringType()))
payload_df = autoloader_df \
.select(from_json(col("Body"), payload_schema).alias("json_payload")) \
We now have our Event Hub Capture being parsed by the Autoloader pipeline, available in `payload_df`
At this point, we're free to do whatever we want with our `payload_df`, such as writing to Delta Tables (after removing the `display` above of course):
.option("checkpointLocation", "your-checkpoint-location")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment