Skip to content

Instantly share code, notes, and snippets.

@TariqAbughofa
Last active July 23, 2018 02:43
Show Gist options
  • Save TariqAbughofa/0f48d546271c41a14ffee506892bea94 to your computer and use it in GitHub Desktop.
Save TariqAbughofa/0f48d546271c41a14ffee506892bea94 to your computer and use it in GitHub Desktop.
Assesment

I used mainly python for the whole exerciese. I also created a sample data to test all the code snippet in pyspark and spark-shell.

Create Sample Data

sc = SparkContext()
sqlContext = SQLContext(sc)
part_df = sqlContext.createDataFrame([
        (1, 'bob', '2015-01-13', 14),
        (2, 'alice', '2015-04-23', 10),
        (3, 'john', '2015-04-23', 12)
    ], ['partkey','name','comment', 'size'])

partsupp_df = sqlContext.createDataFrame([
    (1, 100),
    (2, 23),
    (1, 120),
    (2, 28)
    ], ['partkey','supplycost'])

QUESTION #1: Joins in Core Spark

Join in dataframes

part_df = spark.read.csv('hdfs://data-sets/tpch/data/part')
partsupp_df = spark.read.csv('hdfs://data-sets/tpch/data/partsupp')

df = part_df.join(partsupp_df, "partkey")
df.show()

Join in RDDs

part_rdd = part_df.rdd.map(lambda row: (row[0], (row[1], row[2], row[3])))
partsupp_rdd = partsupp_df.rdd.map(lambda row: (row[0], row[1]))

rdd = part_rdd.join(partsupp_rdd)

def printx(my_list):
    for row in my_list:
        print row

printx(rdd.take(20))

Join in Datasets

This is in Scala since datasets does not exist in Python.

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val part_df = spark.createDataFrame(
    spark.sparkContext.parallelize(Seq(
        Row(1L, "bob", "2015-01-13", 14),
        Row(2L, "alice", "2015-04-23", 10),
        Row(3L, "john", "2015-04-23", 12)
    )),
    StructType(List(
        StructField("partkey", LongType),
        StructField("name", StringType),
        StructField("comment", StringType),
        StructField("size", IntegerType)
    ))
)

val partsupp_df = spark.createDataFrame(
    spark.sparkContext.parallelize(Seq(
        Row(1L, 100),
        Row(2L, 23),
        Row(1L, 120),
        Row(2L, 28)
    )),
    StructType(List(
        StructField("partkey", LongType),
        StructField("supplycost", IntegerType)
    ))
)

case class Part(partkey: Long, name: String, comment: String, size: Int)
case class PartSupp(partkey: Long, supplycost: Int)
val part_ds = part_df.as[Part]
val partsupp_ds = partsupp_df.as[PartSupp]
part_ds.join(partsupp_ds, Seq("partkey")).show()

QUESTION #2: Joins in Spark SQL

sqlContext.registerDataFrameAsTable(part_df, "part")
sqlContext.registerDataFrameAsTable(partsupp_df, "partsupp")

res = sqlContext.sql("""
    SELECT part.*, partsupp.supplycost
    FROM part
    JOIN partsupp ON PART.PARTKEY == PARTSUPP.PARTKEY
""")
res.show()

QUESTION #3: Alternate Data Formats

A more suitable data storage format in the case of frequent scans and reads of the dataset with Spark is "Parquet". Parquet is a columnar storage format which especialy efficient when loading certain columns of the data instead of all the columns which makes it good to use especially with SparkSQL.

import time

part_df.write.save("hdfs:///data-sets/tpch/data/part.parquet", format="parquet")

start = time.time()
df = spark.sql("SELECT name FROM parquet.`hdfs:///data-sets/tpch/data/part.parquet`")
df.count()
print(time.time() - start)

start = time.time()
part_df = sparkSession.read.csv('hdfs:///data-sets/tpch/data/part')
part_df.count()
print(time.time() - start)

QUESTION #4: Spark Executors Allocation

We can influence the number of executors are used with dynamic allocation by setting a number of spark configurations:

  1. spark.dynamicAllocation.initialExecutors The initial number of executors to run.
  2. spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors define the lower and upper limits of the executor number.

Also spark uses heuristics to determine when to remove and request executors. These heuristsics are controlled by another set of configurations:

  1. spark.dynamicAllocation.schedulerBacklogTimeout if there have been pending tasks backlogged for more than this duration, new executors will be requested.
  2. spark.dynamicAllocation.sustainedSchedulerBacklogTimeout same as the previous one but for subsequent executor requests.
  3. spark.dynamicAllocation.executorIdleTimeout an executor is removed when it has been idle for more than this duration.
  4. spark.dynamicAllocation.cachedExecutorIdleTimeout if an executor which has cached data blocks has been idle for more than this duration, the executor will be removed.

Pros

  1. Maximize resource utilization.
  2. Prevent starvation in the case of multiple applications sharing resources in the Spark cluster. In this way, each application is guarenteed to finish in a good time without overusing the resources.
  3. No need to estimate how much resources are need from the application.

Cons

  1. An external shuffle service must be set up on each worker node in the same cluster to keep the shuffle files after removing executors.
  2. Executors containing cached data are removed when they become idle for spark.dynamicAllocation.cachedExecutorIdleTimeout seconds. After that, the application has to recompute the lost data. If this happened regularly, it can affect the execution time.

QUESTION #5: Create Tables Programmatically

spark.sql("SET hive.exec.dynamic.partition = true")
spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")

spark.sql("DROP TABLE IF EXISTS customers")

sqlContext.sql("""
    CREATE EXTERNAL TABLE IF NOT EXISTS customers (
        key LONG,
        name STRING,
        birth_date STRING
    ) PARTITIONED BY (year INT, month INT)
    LOCATION 'hdfs:///data-sets/tpch/data/customers'
    """)

my_df = sqlContext.createDataFrame([
        (1, 'bob', '2015-01-13'),
        (2, 'alice', '2015-04-23'),
        (3, 'john', '2015-04-23')
    ], ['key','name','birth_date'])

from pyspark.sql.functions import expr

my_df.withColumn("year", expr("year(birth_date)")).withColumn("month", expr("month(birth_date)")).write.format("parquet").option("compression", "snappy").mode("overwrite").insertInto("customers")

spark.sql("SELECT * FROM customers").show()

QUESTION #6: Update Only Affected Partitions

This solution will make sure that the partitions are overwritten with the new data and that the other partitions are not touched.

new_df = sqlContext.createDataFrame([
        (1, 'clair', '2018-01-13'),
        (2, 'dani', '2018-01-23'),
        (3, 'grace', '2018-01-23'),
        (2, 'Jack', '2018-02-23'),
        (3, 'Derek', '2018-02-23')
    ], ['key','name','birth_date'])

new_df.createOrReplaceTempView("new_df")
spark.sql("""
  INSERT OVERWRITE TABLE customers
  PARTITION(year, month)
  SELECT new_df.*, year(birth_date) AS year, month(birth_date) AS month
  FROM new_df
""")

QUESTION #7: ETL Pipeline Automated Testing

  1. Automating the deployment process: using continuous integration tools such as Jenkins which helps later in automating the test process itself in terms of when, where, and how to execute the tests.
  2. Having documentation and flowchart diagrams before starting to build the tests as they help cover the main paths in the business process.
  3. Building test cases based on the documentation and flowchart diagrams.
  4. Automate test data creation: create test data using data generation tools like CA Test Data Manager.
  5. Automate the test execution process: The tests should be executed on each deployment.

The automated testing should test for:

  1. Data completeness: Ensures that all expected data is loaded.
  2. Data cleanliness: Ensures that all the unnecessary columns are removed.
  3. Data quality: exhastive data validation that make sure the data passes sanity tests and business rules.
  4. Performance: The operations in the ETL Pipeline do not take more than expected.
  5. Regression testing: Ensure existing functionality remains intact each time a new release of code is completed.

We can measure the test coverage using tools such as scoverage for Scala (https://github.com/scoverage/scalac-scoverage-plugin) or coverage.py for python (http://coverage.readthedocs.io/en/latest/)

QUESTION #8: Performance Tuning

Data format:

make sure to use an extensioble object format (I would suggest Parquet for the reasons I mentioned before) instead of plain text formats such as JSON or CSV.

Serialization:

Switch object serialization to Kyro, which is faster and is more compact compared to the default java serialzier.

Caching and Persisting:

  1. Persist or cache RDDs that will be used more than once.
  2. If there is a need to cache large amounts of processed data, change Spark's cache from MEMORY_ONLY to MEMORY_AND_DISK. The first storage level may drop partitions and recalculate them while the second one saves overflow partitions into disk.

Job Optimization:

  1. dropping columns that are not used in the aggregations and the report from the loading process. This will make the loading faster in case we are using a columnar storage format. Also, the data will consume less memory and the shuffles will be faster.
  2. optimize the job by removing unnecessary transformations and picking more optimized ones. The goal is to minimize the needed shuffling.
  3. if a small table like the 'NATION' table is part of the join tables, we can broadcast the rdd/dataframe that contain that table to all executors before executing the join. This way no shuffling will be needed for the large tables ('SUPPLIER' and 'CUSTOMER') which are joined with the small one.

Resource Allocation:

if the execution is on Yarn, make sure that the requested resources in Spark fit into the resources available to YARN (yarn.nodemanager.resource.memory-mb, yarn.nodemanager.resource.cpu-vcores)

I noticed that the number of executors is very big with tiny resources for each. Also there aren't enough cores for 1,000 executors (12*16=192). If there isn't enough resources for the provided number of execures only the ones that fit will be created. Extra resources in the cluster should be left for Hadoop, Yarn, OS, etc. For memory, Spark job will consume: 1000*1 + 30 =1,030, available: 112*12=1,344 so we're good on the memory but not on the cores.

My suggestion is to use 160 cores 2 for each executor which gives 80 executors. Each executor will have 15G of memory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment