Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo
# Write messages from a CSV file to Kafka topicC
# to simulate real-time streaming sales data
# Author: Gary A. Stafford
# Date: 2022-07-27
# Note: Requires --bootstrap_servers and --s3_bucket arguments
import argparse
import time
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, \
StringType, FloatType
def main():
args = parse_args()
spark = SparkSession \
.builder \
.appName("04-stream-sales-to-kafka") \
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
df_sales = read_from_csv(spark, schema, args)
write_to_kafka(spark, df_sales, args)
def read_from_csv(spark, schema, args):
df_sales = \
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, df_sales, args):
options_write = {
" required;",
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp())
df_message \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
def parse_args():
"""Parse argument values from command-line"""
parser = argparse.ArgumentParser(description="Arguments required for script.")
parser.add_argument("--bootstrap_servers", required=True, help="Kafka bootstrap servers")
parser.add_argument("--s3_bucket", required=True, help="Amazon S3 bucket")
parser.add_argument("--write_topic", default="topicC", required=False, help="Kafka topic to write to")
parser.add_argument("--sample_data_file", default="sales_incremental_large.csv", required=False, help="data file")
parser.add_argument("--message_delay", default=0.5, required=False, help="message publishing delay")
args = parser.parse_args()
return args
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment