Skip to content

Instantly share code, notes, and snippets.

@garystafford
Last active October 4, 2021 00:03
Show Gist options
  • Save garystafford/067fab6d8a3afccf09705123a93015aa to your computer and use it in GitHub Desktop.
Save garystafford/067fab6d8a3afccf09705123a93015aa to your computer and use it in GitHub Desktop.
def get_schema(artifact_id):
response = requests.get(
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}")
json_format_schema = response.content.decode("utf-8")
return json_format_schema
def write_to_kafka(spark, df_sales):
sales_schema_key = get_schema("pagila.sales.avro-key")
sales_schema_value = get_schema("pagila.sales.avro-value")
df_message \
.select(to_avro("customer_id", sales_schema_key).alias("key"),
to_avro(F.struct("*"), sales_schema_value).alias("value")) \
.write \
.format("kafka") \
.options(**options_write) \
.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment