Last active
October 3, 2021 18:34
-
-
Save garystafford/e776d84e5c7cfbaa672bd27fe22aaa56 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
csv_sales_schema = get_schema("pagila.sales.csv") | |
schema = struct_from_json(spark, csv_sales_schema) | |
df_sales = read_from_csv(spark, "sales_incremental_large.csv", schema, "|") | |
write_to_kafka(spark, df_sales) | |
def get_schema(artifact_id): | |
"""Get Avro schema from Apicurio Registry""" | |
response = requests.get( | |
f"{params['schema_registry_url']}/apis/registry/v2/groups/default/artifacts/{artifact_id}") | |
json_format_schema = response.content.decode("utf-8") | |
return json_format_schema | |
def struct_from_json(spark, json_format_schema): | |
"""Returns a schema as a pyspark.sql.types.StructType from Avro schema""" | |
df = spark \ | |
.read \ | |
.format("avro") \ | |
.option("avroSchema", json_format_schema) \ | |
.load() | |
df.printSchema() | |
return df.schema | |
def read_from_csv(spark, source_data, schema, sep): | |
"""Read CSV data from S3""" | |
df = spark.read \ | |
.csv(path=f"s3a://{params['kafka_demo_bucket']}/spark/{source_data}", | |
schema=schema, header=True, sep=sep) | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment