Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
# Purpose: Batch write incremental sales data from S3 to a new Kafka topic
# Use a delay between each message to simulate real-time streaming data
# Author: Gary A. Stafford
# Date: 2021-09-26
import os
import time
import boto3
import pyspark.sql.functions as F
from ec2_metadata import ec2_metadata
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType
sales_data = "sales_incremental_large.csv"
topic_output = "pagila.sales.spark.streaming"
time_between_messages = 0.5 # 1800 messages * .5 seconds = ~15 minutes
os.environ['AWS_DEFAULT_REGION'] = ec2_metadata.region
ssm_client = boto3.client("ssm")
def main():
params = get_parameters()
spark = SparkSession \
.builder \
.appName("kafka-incremental-sales") \
schema = StructType([
StructField("payment_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("amount", FloatType(), False),
StructField("payment_date", StringType(), False),
StructField("city", StringType(), True),
StructField("district", StringType(), True),
StructField("country", StringType(), False),
df_sales = read_from_csv(spark, params, schema)
write_to_kafka(spark, params, df_sales)
def read_from_csv(spark, params, schema):
df_sales = \
schema=schema, header=True, sep="|")
return df_sales
def write_to_kafka(spark, params, df_sales):
options_write = {
" required;",
sales_count = df_sales.count()
for r in range(0, sales_count):
row = df_sales.collect()[r]
df_message = spark.createDataFrame([row], df_sales.schema)
df_message = df_message \
.drop("payment_date") \
.withColumn("payment_date", F.current_timestamp()) \
.selectExpr("CAST(payment_id AS STRING) AS key",
"to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.options(**options_write) \
def get_parameters():
"""Load parameter values from AWS Systems Manager (SSM) Parameter Store"""
params = {
"kafka_servers": ssm_client.get_parameter(
"kafka_demo_bucket": ssm_client.get_parameter(
return params
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment