Last active
July 28, 2022 14:46
-
-
Save garystafford/643ab7b68f2071da4e248988eba23ee1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Purpose: Amazon EMR Serverless and Amazon MSK Serverless Demo | |
# Reads messages from Kafka topicA and write aggregated messages to topicB | |
# Author: Gary A. Stafford | |
# Date: 2022-07-27 | |
# Note: Requires --bootstrap_servers argument | |
import argparse | |
import pyspark.sql.functions as F | |
from pyspark.sql import SparkSession | |
from pyspark.sql.types import StructField, StructType, IntegerType, \ | |
StringType, FloatType, TimestampType | |
from pyspark.sql.window import Window | |
def main(): | |
args = parse_args() | |
spark = SparkSession \ | |
.builder \ | |
.appName("03-example-kafka") \ | |
.getOrCreate() | |
spark.sparkContext.setLogLevel("INFO") | |
df_sales = read_from_kafka(spark, args) | |
summarize_sales(df_sales, args) | |
def read_from_kafka(spark, args): | |
options_read = { | |
"kafka.bootstrap.servers": | |
args.bootstrap_servers, | |
"subscribe": | |
args.read_topic, | |
"startingOffsets": | |
"earliest", | |
"endingOffsets": | |
"latest", | |
"kafka.security.protocol": | |
"SASL_SSL", | |
"kafka.sasl.mechanism": | |
"AWS_MSK_IAM", | |
"kafka.sasl.jaas.config": | |
"software.amazon.msk.auth.iam.IAMLoginModule required;", | |
"kafka.sasl.client.callback.handler.class": | |
"software.amazon.msk.auth.iam.IAMClientCallbackHandler" | |
} | |
df_sales = spark \ | |
.read \ | |
.format("kafka") \ | |
.options(**options_read) \ | |
.load() | |
return df_sales | |
def summarize_sales(df_sales, args): | |
options_write = { | |
"kafka.bootstrap.servers": | |
args.bootstrap_servers, | |
"topic": | |
args.write_topic, | |
"kafka.security.protocol": | |
"SASL_SSL", | |
"kafka.sasl.mechanism": | |
"AWS_MSK_IAM", | |
"kafka.sasl.jaas.config": | |
"software.amazon.msk.auth.iam.IAMLoginModule required;", | |
"kafka.sasl.client.callback.handler.class": | |
"software.amazon.msk.auth.iam.IAMClientCallbackHandler" | |
} | |
schema = StructType([ | |
StructField("payment_id", IntegerType(), False), | |
StructField("customer_id", IntegerType(), False), | |
StructField("amount", FloatType(), False), | |
StructField("payment_date", TimestampType(), False), | |
StructField("city", StringType(), True), | |
StructField("district", StringType(), True), | |
StructField("country", StringType(), False), | |
]) | |
window = Window.partitionBy("country").orderBy("amount") | |
window_agg = Window.partitionBy("country") | |
df_sales \ | |
.selectExpr("CAST(value AS STRING)") \ | |
.select(F.from_json("value", schema=schema).alias("data")) \ | |
.select("data.*") \ | |
.withColumn("row", F.row_number().over(window)) \ | |
.withColumn("orders", F.count(F.col("amount")).over(window_agg)) \ | |
.withColumn("sales", F.sum(F.col("amount")).over(window_agg)) \ | |
.filter(F.col("row") == 1).drop("row") \ | |
.select("country", | |
F.format_number("sales", 2).alias("sales"), | |
F.format_number("orders", 0).alias("orders")) \ | |
.coalesce(1) \ | |
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False) \ | |
.select(F.to_json(F.struct("*"))).toDF("value") \ | |
.write \ | |
.format("kafka") \ | |
.options(**options_write) \ | |
.save() | |
def parse_args(): | |
"""Parse argument values from command-line""" | |
parser = argparse.ArgumentParser(description="Arguments required for script.") | |
parser.add_argument("--bootstrap_servers", required=True, help="Kafka bootstrap servers") | |
parser.add_argument("--read_topic", default="topicA", required=False, help="Kafka topic to read from") | |
parser.add_argument("--write_topic", default="topicB", required=False, help="Kafka topic to write to") | |
args = parser.parse_args() | |
return args | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment