I used mainly python for the whole exerciese. I also created a sample data to test all the code snippet in pyspark and spark-shell.
sc = SparkContext()
sqlContext = SQLContext(sc)
part_df = sqlContext.createDataFrame([
(1, 'bob', '2015-01-13', 14),
(2, 'alice', '2015-04-23', 10),
(3, 'john', '2015-04-23', 12)
], ['partkey','name','comment', 'size'])
partsupp_df = sqlContext.createDataFrame([
(1, 100),
(2, 23),
(1, 120),
(2, 28)
], ['partkey','supplycost'])
part_df = spark.read.csv('hdfs://data-sets/tpch/data/part')
partsupp_df = spark.read.csv('hdfs://data-sets/tpch/data/partsupp')
df = part_df.join(partsupp_df, "partkey")
df.show()
part_rdd = part_df.rdd.map(lambda row: (row[0], (row[1], row[2], row[3])))
partsupp_rdd = partsupp_df.rdd.map(lambda row: (row[0], row[1]))
rdd = part_rdd.join(partsupp_rdd)
def printx(my_list):
for row in my_list:
print row
printx(rdd.take(20))
This is in Scala since datasets does not exist in Python.
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val part_df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row(1L, "bob", "2015-01-13", 14),
Row(2L, "alice", "2015-04-23", 10),
Row(3L, "john", "2015-04-23", 12)
)),
StructType(List(
StructField("partkey", LongType),
StructField("name", StringType),
StructField("comment", StringType),
StructField("size", IntegerType)
))
)
val partsupp_df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row(1L, 100),
Row(2L, 23),
Row(1L, 120),
Row(2L, 28)
)),
StructType(List(
StructField("partkey", LongType),
StructField("supplycost", IntegerType)
))
)
case class Part(partkey: Long, name: String, comment: String, size: Int)
case class PartSupp(partkey: Long, supplycost: Int)
val part_ds = part_df.as[Part]
val partsupp_ds = partsupp_df.as[PartSupp]
part_ds.join(partsupp_ds, Seq("partkey")).show()
sqlContext.registerDataFrameAsTable(part_df, "part")
sqlContext.registerDataFrameAsTable(partsupp_df, "partsupp")
res = sqlContext.sql("""
SELECT part.*, partsupp.supplycost
FROM part
JOIN partsupp ON PART.PARTKEY == PARTSUPP.PARTKEY
""")
res.show()
A more suitable data storage format in the case of frequent scans and reads of the dataset with Spark is "Parquet". Parquet is a columnar storage format which especialy efficient when loading certain columns of the data instead of all the columns which makes it good to use especially with SparkSQL.
import time
part_df.write.save("hdfs:///data-sets/tpch/data/part.parquet", format="parquet")
start = time.time()
df = spark.sql("SELECT name FROM parquet.`hdfs:///data-sets/tpch/data/part.parquet`")
df.count()
print(time.time() - start)
start = time.time()
part_df = sparkSession.read.csv('hdfs:///data-sets/tpch/data/part')
part_df.count()
print(time.time() - start)
We can influence the number of executors are used with dynamic allocation by setting a number of spark configurations:
spark.dynamicAllocation.initialExecutors
The initial number of executors to run.spark.dynamicAllocation.minExecutors
andspark.dynamicAllocation.maxExecutors
define the lower and upper limits of the executor number.
Also spark uses heuristics to determine when to remove and request executors. These heuristsics are controlled by another set of configurations:
spark.dynamicAllocation.schedulerBacklogTimeout
if there have been pending tasks backlogged for more than this duration, new executors will be requested.spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
same as the previous one but for subsequent executor requests.spark.dynamicAllocation.executorIdleTimeout
an executor is removed when it has been idle for more than this duration.spark.dynamicAllocation.cachedExecutorIdleTimeout
if an executor which has cached data blocks has been idle for more than this duration, the executor will be removed.
- Maximize resource utilization.
- Prevent starvation in the case of multiple applications sharing resources in the Spark cluster. In this way, each application is guarenteed to finish in a good time without overusing the resources.
- No need to estimate how much resources are need from the application.
- An external shuffle service must be set up on each worker node in the same cluster to keep the shuffle files after removing executors.
- Executors containing cached data are removed when they become idle for
spark.dynamicAllocation.cachedExecutorIdleTimeout
seconds. After that, the application has to recompute the lost data. If this happened regularly, it can affect the execution time.
spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("DROP TABLE IF EXISTS customers")
sqlContext.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS customers (
key LONG,
name STRING,
birth_date STRING
) PARTITIONED BY (year INT, month INT)
LOCATION 'hdfs:///data-sets/tpch/data/customers'
""")
my_df = sqlContext.createDataFrame([
(1, 'bob', '2015-01-13'),
(2, 'alice', '2015-04-23'),
(3, 'john', '2015-04-23')
], ['key','name','birth_date'])
from pyspark.sql.functions import expr
my_df.withColumn("year", expr("year(birth_date)")).withColumn("month", expr("month(birth_date)")).write.format("parquet").option("compression", "snappy").mode("overwrite").insertInto("customers")
spark.sql("SELECT * FROM customers").show()
This solution will make sure that the partitions are overwritten with the new data and that the other partitions are not touched.
new_df = sqlContext.createDataFrame([
(1, 'clair', '2018-01-13'),
(2, 'dani', '2018-01-23'),
(3, 'grace', '2018-01-23'),
(2, 'Jack', '2018-02-23'),
(3, 'Derek', '2018-02-23')
], ['key','name','birth_date'])
new_df.createOrReplaceTempView("new_df")
spark.sql("""
INSERT OVERWRITE TABLE customers
PARTITION(year, month)
SELECT new_df.*, year(birth_date) AS year, month(birth_date) AS month
FROM new_df
""")
- Automating the deployment process: using continuous integration tools such as Jenkins which helps later in automating the test process itself in terms of when, where, and how to execute the tests.
- Having documentation and flowchart diagrams before starting to build the tests as they help cover the main paths in the business process.
- Building test cases based on the documentation and flowchart diagrams.
- Automate test data creation: create test data using data generation tools like CA Test Data Manager.
- Automate the test execution process: The tests should be executed on each deployment.
The automated testing should test for:
- Data completeness: Ensures that all expected data is loaded.
- Data cleanliness: Ensures that all the unnecessary columns are removed.
- Data quality: exhastive data validation that make sure the data passes sanity tests and business rules.
- Performance: The operations in the ETL Pipeline do not take more than expected.
- Regression testing: Ensure existing functionality remains intact each time a new release of code is completed.
We can measure the test coverage using tools such as scoverage for Scala (https://github.com/scoverage/scalac-scoverage-plugin) or coverage.py for python (http://coverage.readthedocs.io/en/latest/)
make sure to use an extensioble object format (I would suggest Parquet for the reasons I mentioned before) instead of plain text formats such as JSON or CSV.
Switch object serialization to Kyro, which is faster and is more compact compared to the default java serialzier.
- Persist or cache RDDs that will be used more than once.
- If there is a need to cache large amounts of processed data, change Spark's cache from
MEMORY_ONLY
toMEMORY_AND_DISK
. The first storage level may drop partitions and recalculate them while the second one saves overflow partitions into disk.
- dropping columns that are not used in the aggregations and the report from the loading process. This will make the loading faster in case we are using a columnar storage format. Also, the data will consume less memory and the shuffles will be faster.
- optimize the job by removing unnecessary transformations and picking more optimized ones. The goal is to minimize the needed shuffling.
- if a small table like the
'NATION'
table is part of the join tables, we can broadcast the rdd/dataframe that contain that table to all executors before executing the join. This way no shuffling will be needed for the large tables ('SUPPLIER'
and'CUSTOMER'
) which are joined with the small one.
if the execution is on Yarn, make sure that the requested resources in Spark fit into the resources available to YARN (yarn.nodemanager.resource.memory-mb
, yarn.nodemanager.resource.cpu-vcores
)
I noticed that the number of executors is very big with tiny resources for each. Also there aren't enough cores for 1,000 executors (12*16=192). If there isn't enough resources for the provided number of execures only the ones that fit will be created. Extra resources in the cluster should be left for Hadoop, Yarn, OS, etc. For memory, Spark job will consume: 1000*1 + 30 =1,030, available: 112*12=1,344 so we're good on the memory but not on the cores.
My suggestion is to use 160 cores 2 for each executor which gives 80 executors. Each executor will have 15G of memory.