Skip to content

Instantly share code, notes, and snippets.

@jammann
Created June 10, 2019 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jammann/b58bfbe0f4374b89ecea63c1e32c8f17 to your computer and use it in GitHub Desktop.
Save jammann/b58bfbe0f4374b89ecea63c1e32c8f17 to your computer and use it in GitHub Desktop.
Demonstrate JOIN issues with Spark 2.4
- put all messages onto the topics
KAFKA_HOME=/opt/products/kafka_2.11-2.2.1
for i in A B C D
do
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $i < $i.msgs
done
- run the test program
SPARK_HOME=/opt/products/spark-2.4.3-bin-hadoop2.7
SPARK_CONF='--conf spark.default.parallelism=1 --conf spark.sql.shuffle.partitions=1 --conf spark.sql.streaming.multipleWatermarkPolicy=max'
SPARK_PKGS='--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 --exclude-packages net.jpountz.lz4:lz4'
$SPARK_HOME/bin/spark-submit $SPARK_PKGS $SPARK_CONF --master local --driver-memory 4g simple_joins.py
The test program has 6 different combination of joins. The following are the expectations and observed bevaviour
1) A innerjoin B: to topic A_B
expected: 10 inner joined messages
observation: OK
2) A innerjoin B outerjoin C: to topic A_B_outer_C
expected: 9 inner joined messages, last one in watermark
observation: OK
3) A innerjoin B outerjoin C outerjoin D: to topic A_B_outer_C_outer_D
expected: 9 inner/outer joined messages, 3 match C, 1 match D, last one in watermark
observation: NOK, only 3 messages are produced, where the C outer join matches
4) B outerjoin C: to topic B_outer_C
expected: 9 outer joined messages, 3 match C, 1 match D, last one in watermark
observation: NOK, same as 3
5) A innerjoin B agg on field of A: to topic A_inner_B_agg
expected: 3 groups with a total of 9 inner joined messages, last one in watermark
observation: OK
6) A innerjoin B outerjoin C agg on field of A: to topic A_inner_B_outer_C_agg
expected: 3 groups with a total of 9 inner joined messages, last one in watermark
observation: NOK, 3 groups with 1 message each, where C outer join matches
{"A_ID":"A1","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B1","GROUP_FK":"P1"}
{"A_ID":"A2","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B2","GROUP_FK":"P1"}
{"A_ID":"A3","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B3","GROUP_FK":"P1"}
{"A_ID":"A4","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B4","GROUP_FK":"P1"}
{"A_ID":"A5","A_LAST_MOD":"2019-06-01T17:40:12.826Z","B_FK":"B5","GROUP_FK":"P2"}
{"A_ID":"A6","A_LAST_MOD":"2019-06-01T17:40:12.826Z","B_FK":"B6","GROUP_FK":"P2"}
{"A_ID":"A7","A_LAST_MOD":"2019-06-01T18:37:28.769Z","B_FK":"B7","GROUP_FK":"P3"}
{"A_ID":"A8","A_LAST_MOD":"2019-06-01T18:37:28.769Z","B_FK":"B8","GROUP_FK":"P3"}
{"A_ID":"A9","A_LAST_MOD":"2019-06-01T18:37:28.769Z","B_FK":"B9","GROUP_FK":"P3"}
{"A_ID":"A10","A_LAST_MOD":"2019-06-01T23:58:49.774Z","B_FK":"B10","GROUP_FK":"P4"}
{"B_ID":"B1","B_LAST_MOD":"2019-06-01T17:40:09.723Z"}
{"B_ID":"B2","B_LAST_MOD":"2019-06-01T17:40:09.723Z"}
{"B_ID":"B3","B_LAST_MOD":"2019-06-01T17:40:09.723Z","C_FK":"C3"}
{"B_ID":"B4","B_LAST_MOD":"2019-06-01T17:40:09.723Z"}
{"B_ID":"B5","B_LAST_MOD":"2019-06-01T17:40:12.826Z"}
{"B_ID":"B6","B_LAST_MOD":"2019-06-01T17:40:12.826Z","C_FK":"C6"}
{"B_ID":"B7","B_LAST_MOD":"2019-06-01T18:37:28.769Z"}
{"B_ID":"B8","B_LAST_MOD":"2019-06-01T18:37:28.769Z","D_FK":"D8"}
{"B_ID":"B9","B_LAST_MOD":"2019-06-01T18:37:28.769Z","C_FK":"C9"}
{"B_ID":"B10","B_LAST_MOD":"2019-06-01T23:58:49.774Z"}
{"C_LAST_MOD":"2019-06-01T17:40:09.723Z","C_ID":"C3"}
{"C_LAST_MOD":"2019-06-01T17:40:12.826Z","C_ID":"C6"}
{"C_LAST_MOD":"2019-06-01T18:37:28.769Z","C_ID":"C9"}
{"D_LAST_MOD":"2019-06-01T18:37:28.769Z","D_ID":"D8"}
from pyspark import SparkFiles, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, expr, to_json, struct, hex, unhex, base64, unbase64, col, collect_list, window, count, sum, min
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, TimestampType
spark_builder = SparkSession.builder.appName("simple_joins")
spark = spark_builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1)
spark.conf.set("spark.default.parallelism", 1)
spark.conf.set("spark.sql.session.timeZone", "UTC")
bootstrapServers = "localhost:9092"
# define the schemas of the input topics
A_schema = StructType([
StructField("A_ID", StringType(), False),
StructField("A_LAST_MOD", TimestampType(), False),
StructField("B_FK", StringType(), True),
StructField("GROUP_FK", StringType(), True),
])
B_schema = StructType([
StructField("B_ID", StringType(), False),
StructField("B_LAST_MOD", TimestampType(), False),
StructField("C_FK", StringType(), True),
StructField("D_FK", StringType(), True),
])
C_schema = StructType([
StructField("C_ID", StringType(), False),
StructField("C_LAST_MOD", TimestampType(), False),
])
D_schema = StructType([
StructField("D_ID", StringType(), False),
StructField("D_LAST_MOD", TimestampType(), False),
])
# open all 4 input streams, doing basic conversion and watermarking
A_stream = (spark.readStream
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "A")
.option("startingOffsets", "earliest").option("inferSchema", "true")
.load()
.selectExpr("CAST(value AS STRING)").select(from_json("value", A_schema).alias("v")).select("v.*")
.withWatermark("A_LAST_MOD", "30 seconds")
)
B_stream = (spark.readStream
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "B")
.option("startingOffsets", "earliest").option("inferSchema", "true")
.load()
.selectExpr("CAST(value AS STRING)").select(from_json("value", B_schema).alias("v")).select("v.*")
.withWatermark("B_LAST_MOD", "30 seconds")
)
C_stream = (spark.readStream
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "C")
.option("startingOffsets", "earliest").option("inferSchema", "true")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json("value", C_schema).alias("v"))
.select("v.*")
.withWatermark("C_LAST_MOD", "30 seconds")
)
D_stream = (spark.readStream
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "D")
.option("startingOffsets", "earliest").option("inferSchema", "true")
.load()
.selectExpr("CAST(value AS STRING)").select(from_json("value", D_schema).alias("v")).select("v.*")
.withWatermark("D_LAST_MOD", "30 seconds")
)
# first use case A innerjoin B
# OK
inner = (A_stream
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD"))
)
inner.printSchema()
inner_kafka = (inner
.select(col("A_ID").alias("key"), to_json(struct("*")).alias("value"))
.writeStream.outputMode("append")
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "A_B")
.option("checkpointLocation", "A_B")
.queryName("A_B")
.outputMode("append")
.trigger(processingTime = '5 seconds')
.start())
# second use case A innerjoin B leftouter C
# OK
one_outer = (A_stream
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD"))
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter")
)
one_outer.printSchema()
one_outer_kafka = (one_outer
.select(col("A_ID").alias("key"), to_json(struct("*")).alias("value"))
.writeStream.outputMode("append")
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "A_B_outer_C")
.option("checkpointLocation", "A_B_outer_C")
.queryName("A_B_outer_C")
.outputMode("append")
.trigger(processingTime = '5 seconds')
.start())
# third use case A innerjoin B leftouter C leftouter D
# NOK
two_outer = (A_stream
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD"))
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter")
.join(D_stream, expr("D_FK = D_ID and B_LAST_MOD = D_LAST_MOD"), "leftOuter")
)
two_outer.printSchema()
two_outer_kafka = (two_outer
.select(col("A_ID").alias("key"), to_json(struct("*")).alias("value"))
.writeStream.outputMode("append")
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "A_B_outer_C_outer_D")
.option("checkpointLocation", "A_B_outer_C_outer_D")
.queryName("A_B_outer_C_outer_D")
.outputMode("append")
.trigger(processingTime = '5 seconds')
.start())
# 4th use case: B leftouter C leftouter D
# NOK
two_outer_no_inner = (B_stream
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter")
.join(D_stream, expr("D_FK = D_ID and B_LAST_MOD = D_LAST_MOD"), "leftOuter")
)
two_outer_no_inner.printSchema()
two_outer_no_inner_kafka = (two_outer_no_inner
.select(col("B_ID").alias("key"), to_json(struct("*")).alias("value"))
.writeStream.outputMode("append")
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "B_outer_C_outer_D")
.option("checkpointLocation", "B_outer_C_outer_D")
.queryName("B_outer_C_outer_D")
.outputMode("append")
.trigger(processingTime = '5 seconds')
.start())
# 5th use case: A innerjoin B agg
# OK
inner_agg = (A_stream
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD"))
.groupBy("GROUP_FK", window("A_LAST_MOD", "2 seconds"))
.agg(
collect_list(struct("*")).alias("records"),
count("A_ID").alias("number_of_records"),
min("A_LAST_MOD").alias("first_record_tsd")
)
)
inner_agg.printSchema()
inner_agg_kafka = (inner_agg
.select(col("GROUP_FK").alias("key"), to_json(struct("*")).alias("value"))
.writeStream.outputMode("append")
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "A_inner_B_agg")
.option("checkpointLocation", "A_inner_B_agg")
.queryName("A_inner_B_agg")
.outputMode("append")
.trigger(processingTime = '5 seconds')
.start())
# 6th use case: A innerjoin B outjoin C agg
# NOK
inner_outer_agg = (A_stream
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD"))
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter")
.groupBy("GROUP_FK", window("A_LAST_MOD", "2 seconds"))
.agg(
collect_list(struct("*")).alias("records"),
count("A_ID").alias("number_of_records"),
min("A_LAST_MOD").alias("first_record_tsd")
)
)
inner_outer_agg.printSchema()
inner_outer_agg_kafka = (inner_outer_agg
.select(col("GROUP_FK").alias("key"), to_json(struct("*")).alias("value"))
.writeStream.outputMode("append")
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "A_inner_B_outer_C_agg")
.option("checkpointLocation", "A_inner_B_outer_C_agg")
.queryName("A_inner_B_outer_C_agg")
.outputMode("append")
.trigger(processingTime = '5 seconds')
.start())
# wait for the streams to finish
inner_kafka.awaitTermination()
one_outer_kafka.awaitTermination()
two_outer_kafka.awaitTermination()
two_outer_no_inner_kafka.awaitTermination()
inner_agg_kafka.awaitTermination()
inner_outer_agg_kafka.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment