Skip to content

Instantly share code, notes, and snippets.

😎
Out huntin' bugz!

Kees C. Bakker KeesCBakker

😎
Out huntin' bugz!
Block or report user

Report or block KeesCBakker

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View my.py
df_delta = (spark.read
.format("delta")
.load("/mnt/{prefix}-{topic}-delta/delta-table"))
View my.py
w = df.writeStream;
if not streaming:
w = w.trigger(once=True)
(w.format("delta")
.option("checkpointLocation", checkpoint_location)
.foreachBatch(upsertToDelta)
.outputMode("update")
.start(delta_location))
View my.py
def upsertToDelta(df, batch_id):
(DeltaTable
.forPath(spark, delta_location)
.alias("t")
.merge(df.alias("s"), "s.kafka_key = t.kafka_key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
View my.py
(spark
.createDataFrame([], df.schema)
.write
.option("mergeSchema", "true")
.format("delta")
.mode("append")
.save(delta_location))
View my.py
df = read_stream_kafka_topic(topic, topic_schema)
View my.py
def read_stream_kafka_topic(topic, schema):
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
View my.py
def read_stream_kafka_topic(topic, schema):
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
View my.py
topic_schema = StructType.fromJson(json.loads(topic_schema_txt))
print(topic_schema)
View my.py
infer_schema = update_kafka_schema
if not infer_schema:
try:
topic_schema_txt = dbutils.fs.head(schema_location)
except:
infer_schema = True
pass
if infer_schema:
View my.py
def infer_topic_schema_json(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
You can’t perform that action at this time.