Skip to content

Instantly share code, notes, and snippets.

@KeesCBakker
Created November 9, 2019 09:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KeesCBakker/6bf95ad2f392e9dadcf991853be807f0 to your computer and use it in GitHub Desktop.
Save KeesCBakker/6bf95ad2f392e9dadcf991853be807f0 to your computer and use it in GitHub Desktop.
def infer_topic_schema_json(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
.filter(col("value").isNotNull())
# get latest version of each record
.select("key", expr("struct(offset, value) r"))
.groupBy("key").agg(expr("max(r) r"))
.select("r.value"))
# decode the json values
df_read = spark.read.json(
df_json.rdd.map(lambda x: x.value), multiLine=True)
# drop corrupt records
if "_corrupt_record" in df_read.columns:
df_read = (df_read
.filter(col("_corrupt_record").isNotNull())
.drop("_corrupt_record"))
return df_read.schema.json()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment