Created
June 10, 2019 22:03
-
-
Save jammann/b58bfbe0f4374b89ecea63c1e32c8f17 to your computer and use it in GitHub Desktop.
Demonstrate JOIN issues with Spark 2.4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- put all messages onto the topics | |
KAFKA_HOME=/opt/products/kafka_2.11-2.2.1 | |
for i in A B C D | |
do | |
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic $i < $i.msgs | |
done | |
- run the test program | |
SPARK_HOME=/opt/products/spark-2.4.3-bin-hadoop2.7 | |
SPARK_CONF='--conf spark.default.parallelism=1 --conf spark.sql.shuffle.partitions=1 --conf spark.sql.streaming.multipleWatermarkPolicy=max' | |
SPARK_PKGS='--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 --exclude-packages net.jpountz.lz4:lz4' | |
$SPARK_HOME/bin/spark-submit $SPARK_PKGS $SPARK_CONF --master local --driver-memory 4g simple_joins.py | |
The test program has 6 different combination of joins. The following are the expectations and observed bevaviour | |
1) A innerjoin B: to topic A_B | |
expected: 10 inner joined messages | |
observation: OK | |
2) A innerjoin B outerjoin C: to topic A_B_outer_C | |
expected: 9 inner joined messages, last one in watermark | |
observation: OK | |
3) A innerjoin B outerjoin C outerjoin D: to topic A_B_outer_C_outer_D | |
expected: 9 inner/outer joined messages, 3 match C, 1 match D, last one in watermark | |
observation: NOK, only 3 messages are produced, where the C outer join matches | |
4) B outerjoin C: to topic B_outer_C | |
expected: 9 outer joined messages, 3 match C, 1 match D, last one in watermark | |
observation: NOK, same as 3 | |
5) A innerjoin B agg on field of A: to topic A_inner_B_agg | |
expected: 3 groups with a total of 9 inner joined messages, last one in watermark | |
observation: OK | |
6) A innerjoin B outerjoin C agg on field of A: to topic A_inner_B_outer_C_agg | |
expected: 3 groups with a total of 9 inner joined messages, last one in watermark | |
observation: NOK, 3 groups with 1 message each, where C outer join matches | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"A_ID":"A1","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B1","GROUP_FK":"P1"} | |
{"A_ID":"A2","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B2","GROUP_FK":"P1"} | |
{"A_ID":"A3","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B3","GROUP_FK":"P1"} | |
{"A_ID":"A4","A_LAST_MOD":"2019-06-01T17:40:09.723Z","B_FK":"B4","GROUP_FK":"P1"} | |
{"A_ID":"A5","A_LAST_MOD":"2019-06-01T17:40:12.826Z","B_FK":"B5","GROUP_FK":"P2"} | |
{"A_ID":"A6","A_LAST_MOD":"2019-06-01T17:40:12.826Z","B_FK":"B6","GROUP_FK":"P2"} | |
{"A_ID":"A7","A_LAST_MOD":"2019-06-01T18:37:28.769Z","B_FK":"B7","GROUP_FK":"P3"} | |
{"A_ID":"A8","A_LAST_MOD":"2019-06-01T18:37:28.769Z","B_FK":"B8","GROUP_FK":"P3"} | |
{"A_ID":"A9","A_LAST_MOD":"2019-06-01T18:37:28.769Z","B_FK":"B9","GROUP_FK":"P3"} | |
{"A_ID":"A10","A_LAST_MOD":"2019-06-01T23:58:49.774Z","B_FK":"B10","GROUP_FK":"P4"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"B_ID":"B1","B_LAST_MOD":"2019-06-01T17:40:09.723Z"} | |
{"B_ID":"B2","B_LAST_MOD":"2019-06-01T17:40:09.723Z"} | |
{"B_ID":"B3","B_LAST_MOD":"2019-06-01T17:40:09.723Z","C_FK":"C3"} | |
{"B_ID":"B4","B_LAST_MOD":"2019-06-01T17:40:09.723Z"} | |
{"B_ID":"B5","B_LAST_MOD":"2019-06-01T17:40:12.826Z"} | |
{"B_ID":"B6","B_LAST_MOD":"2019-06-01T17:40:12.826Z","C_FK":"C6"} | |
{"B_ID":"B7","B_LAST_MOD":"2019-06-01T18:37:28.769Z"} | |
{"B_ID":"B8","B_LAST_MOD":"2019-06-01T18:37:28.769Z","D_FK":"D8"} | |
{"B_ID":"B9","B_LAST_MOD":"2019-06-01T18:37:28.769Z","C_FK":"C9"} | |
{"B_ID":"B10","B_LAST_MOD":"2019-06-01T23:58:49.774Z"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"C_LAST_MOD":"2019-06-01T17:40:09.723Z","C_ID":"C3"} | |
{"C_LAST_MOD":"2019-06-01T17:40:12.826Z","C_ID":"C6"} | |
{"C_LAST_MOD":"2019-06-01T18:37:28.769Z","C_ID":"C9"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"D_LAST_MOD":"2019-06-01T18:37:28.769Z","D_ID":"D8"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkFiles, SparkConf | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import from_json, expr, to_json, struct, hex, unhex, base64, unbase64, col, collect_list, window, count, sum, min | |
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, TimestampType | |
spark_builder = SparkSession.builder.appName("simple_joins") | |
spark = spark_builder.getOrCreate() | |
spark.conf.set("spark.sql.shuffle.partitions", 1) | |
spark.conf.set("spark.default.parallelism", 1) | |
spark.conf.set("spark.sql.session.timeZone", "UTC") | |
bootstrapServers = "localhost:9092" | |
# define the schemas of the input topics | |
A_schema = StructType([ | |
StructField("A_ID", StringType(), False), | |
StructField("A_LAST_MOD", TimestampType(), False), | |
StructField("B_FK", StringType(), True), | |
StructField("GROUP_FK", StringType(), True), | |
]) | |
B_schema = StructType([ | |
StructField("B_ID", StringType(), False), | |
StructField("B_LAST_MOD", TimestampType(), False), | |
StructField("C_FK", StringType(), True), | |
StructField("D_FK", StringType(), True), | |
]) | |
C_schema = StructType([ | |
StructField("C_ID", StringType(), False), | |
StructField("C_LAST_MOD", TimestampType(), False), | |
]) | |
D_schema = StructType([ | |
StructField("D_ID", StringType(), False), | |
StructField("D_LAST_MOD", TimestampType(), False), | |
]) | |
# open all 4 input streams, doing basic conversion and watermarking | |
A_stream = (spark.readStream | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "A") | |
.option("startingOffsets", "earliest").option("inferSchema", "true") | |
.load() | |
.selectExpr("CAST(value AS STRING)").select(from_json("value", A_schema).alias("v")).select("v.*") | |
.withWatermark("A_LAST_MOD", "30 seconds") | |
) | |
B_stream = (spark.readStream | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "B") | |
.option("startingOffsets", "earliest").option("inferSchema", "true") | |
.load() | |
.selectExpr("CAST(value AS STRING)").select(from_json("value", B_schema).alias("v")).select("v.*") | |
.withWatermark("B_LAST_MOD", "30 seconds") | |
) | |
C_stream = (spark.readStream | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "C") | |
.option("startingOffsets", "earliest").option("inferSchema", "true") | |
.load() | |
.selectExpr("CAST(value AS STRING)") | |
.select(from_json("value", C_schema).alias("v")) | |
.select("v.*") | |
.withWatermark("C_LAST_MOD", "30 seconds") | |
) | |
D_stream = (spark.readStream | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", "D") | |
.option("startingOffsets", "earliest").option("inferSchema", "true") | |
.load() | |
.selectExpr("CAST(value AS STRING)").select(from_json("value", D_schema).alias("v")).select("v.*") | |
.withWatermark("D_LAST_MOD", "30 seconds") | |
) | |
# first use case A innerjoin B | |
# OK | |
inner = (A_stream | |
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD")) | |
) | |
inner.printSchema() | |
inner_kafka = (inner | |
.select(col("A_ID").alias("key"), to_json(struct("*")).alias("value")) | |
.writeStream.outputMode("append") | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers) | |
.option("topic", "A_B") | |
.option("checkpointLocation", "A_B") | |
.queryName("A_B") | |
.outputMode("append") | |
.trigger(processingTime = '5 seconds') | |
.start()) | |
# second use case A innerjoin B leftouter C | |
# OK | |
one_outer = (A_stream | |
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD")) | |
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter") | |
) | |
one_outer.printSchema() | |
one_outer_kafka = (one_outer | |
.select(col("A_ID").alias("key"), to_json(struct("*")).alias("value")) | |
.writeStream.outputMode("append") | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers) | |
.option("topic", "A_B_outer_C") | |
.option("checkpointLocation", "A_B_outer_C") | |
.queryName("A_B_outer_C") | |
.outputMode("append") | |
.trigger(processingTime = '5 seconds') | |
.start()) | |
# third use case A innerjoin B leftouter C leftouter D | |
# NOK | |
two_outer = (A_stream | |
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD")) | |
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter") | |
.join(D_stream, expr("D_FK = D_ID and B_LAST_MOD = D_LAST_MOD"), "leftOuter") | |
) | |
two_outer.printSchema() | |
two_outer_kafka = (two_outer | |
.select(col("A_ID").alias("key"), to_json(struct("*")).alias("value")) | |
.writeStream.outputMode("append") | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers) | |
.option("topic", "A_B_outer_C_outer_D") | |
.option("checkpointLocation", "A_B_outer_C_outer_D") | |
.queryName("A_B_outer_C_outer_D") | |
.outputMode("append") | |
.trigger(processingTime = '5 seconds') | |
.start()) | |
# 4th use case: B leftouter C leftouter D | |
# NOK | |
two_outer_no_inner = (B_stream | |
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter") | |
.join(D_stream, expr("D_FK = D_ID and B_LAST_MOD = D_LAST_MOD"), "leftOuter") | |
) | |
two_outer_no_inner.printSchema() | |
two_outer_no_inner_kafka = (two_outer_no_inner | |
.select(col("B_ID").alias("key"), to_json(struct("*")).alias("value")) | |
.writeStream.outputMode("append") | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers) | |
.option("topic", "B_outer_C_outer_D") | |
.option("checkpointLocation", "B_outer_C_outer_D") | |
.queryName("B_outer_C_outer_D") | |
.outputMode("append") | |
.trigger(processingTime = '5 seconds') | |
.start()) | |
# 5th use case: A innerjoin B agg | |
# OK | |
inner_agg = (A_stream | |
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD")) | |
.groupBy("GROUP_FK", window("A_LAST_MOD", "2 seconds")) | |
.agg( | |
collect_list(struct("*")).alias("records"), | |
count("A_ID").alias("number_of_records"), | |
min("A_LAST_MOD").alias("first_record_tsd") | |
) | |
) | |
inner_agg.printSchema() | |
inner_agg_kafka = (inner_agg | |
.select(col("GROUP_FK").alias("key"), to_json(struct("*")).alias("value")) | |
.writeStream.outputMode("append") | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers) | |
.option("topic", "A_inner_B_agg") | |
.option("checkpointLocation", "A_inner_B_agg") | |
.queryName("A_inner_B_agg") | |
.outputMode("append") | |
.trigger(processingTime = '5 seconds') | |
.start()) | |
# 6th use case: A innerjoin B outjoin C agg | |
# NOK | |
inner_outer_agg = (A_stream | |
.join(B_stream, expr("B_FK = B_ID and A_LAST_MOD = B_LAST_MOD")) | |
.join(C_stream, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD"), "leftOuter") | |
.groupBy("GROUP_FK", window("A_LAST_MOD", "2 seconds")) | |
.agg( | |
collect_list(struct("*")).alias("records"), | |
count("A_ID").alias("number_of_records"), | |
min("A_LAST_MOD").alias("first_record_tsd") | |
) | |
) | |
inner_outer_agg.printSchema() | |
inner_outer_agg_kafka = (inner_outer_agg | |
.select(col("GROUP_FK").alias("key"), to_json(struct("*")).alias("value")) | |
.writeStream.outputMode("append") | |
.format("kafka").option("kafka.bootstrap.servers", bootstrapServers) | |
.option("topic", "A_inner_B_outer_C_agg") | |
.option("checkpointLocation", "A_inner_B_outer_C_agg") | |
.queryName("A_inner_B_outer_C_agg") | |
.outputMode("append") | |
.trigger(processingTime = '5 seconds') | |
.start()) | |
# wait for the streams to finish | |
inner_kafka.awaitTermination() | |
one_outer_kafka.awaitTermination() | |
two_outer_kafka.awaitTermination() | |
two_outer_no_inner_kafka.awaitTermination() | |
inner_agg_kafka.awaitTermination() | |
inner_outer_agg_kafka.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment