Skip to content

Instantly share code, notes, and snippets.

View KeesCBakker's full-sized avatar
😎
Out huntin' bugz!

Kees C. Bakker KeesCBakker

😎
Out huntin' bugz!
View GitHub Profile
(spark
.createDataFrame([], df.schema)
.write
.option("mergeSchema", "true")
.format("delta")
.mode("append")
.save(delta_location))
df = read_stream_kafka_topic(topic, topic_schema)
def read_stream_kafka_topic(topic, schema):
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
def read_stream_kafka_topic(topic, schema):
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
topic_schema = StructType.fromJson(json.loads(topic_schema_txt))
print(topic_schema)
infer_schema = update_kafka_schema
if not infer_schema:
try:
topic_schema_txt = dbutils.fs.head(schema_location)
except:
infer_schema = True
pass
if infer_schema:
def infer_topic_schema_json(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
import json, os, re
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
bucket = "/mnt/{}{}-delta".format(bucket_prefix, topic)
delta_location = bucket + "/delta-table"
checkpoint_location = bucket + "/checkpoints";
schema_location = bucket + "/kafka_schema.json";
if debug:
delta_location += "_debug"
checkpoint_location += "_debug"
schema_location += "_debug.json"
debug = str_to_bool(getArgument("debug"))
topic = validate_required_argument_and_return_value("topic")
streaming = str_to_bool(getArgument("streaming"))
update_kafka_schema = str_to_bool(getArgument("update_kafka_schema"))