Created
November 9, 2019 09:41
-
-
Save KeesCBakker/6bf95ad2f392e9dadcf991853be807f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def infer_topic_schema_json(topic): | |
df_json = (spark.read | |
.format("kafka") | |
.option("kafka.bootstrap.servers", kafka_broker) | |
.option("subscribe", topic) | |
.option("startingOffsets", "earliest") | |
.option("endingOffsets", "latest") | |
.option("failOnDataLoss", "false") | |
.load() | |
# filter out empty values | |
.withColumn("value", expr("string(value)")) | |
.filter(col("value").isNotNull()) | |
# get latest version of each record | |
.select("key", expr("struct(offset, value) r")) | |
.groupBy("key").agg(expr("max(r) r")) | |
.select("r.value")) | |
# decode the json values | |
df_read = spark.read.json( | |
df_json.rdd.map(lambda x: x.value), multiLine=True) | |
# drop corrupt records | |
if "_corrupt_record" in df_read.columns: | |
df_read = (df_read | |
.filter(col("_corrupt_record").isNotNull()) | |
.drop("_corrupt_record")) | |
return df_read.schema.json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment