Last active
February 5, 2019 23:24
-
-
Save FelAl/5f214de773c01c90c7ce8f410164959d to your computer and use it in GitHub Desktop.
SparkQuestions.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. What are pros and cons of NoSQL DB vs. traditional relational DB? | |
1) NoSQL have column oriented databases(we can take only 3 rows out of 200, it saves memory space) | |
2) NoSQL is more optimal solution for unstructured data. | |
3) SQL scales vertically(example, update CPU from 3.0 MHZ to 4.0 MHZ on the server), | |
NoSQL scales horizontally(example, add one or two new server) | |
4) Maintence of SQL DB can require downtime, when with NoSQL it's possible to do without any downtime. | |
5) NoSQL requires additional storage for storing field's name | |
6) SQL DBs have ACID compliance (Atomicity, Consistency, Isolation, Durability), | |
while NoSQL has not(this is how flexibility and access speed achieves). | |
2. Cassandra vs. HBase - pros and cons? | |
1) HBase is a part of Hadoop Stack | |
2) HBase(because of HDFS) has single point of failure(namenodes) | |
3) HBase is strongly consistent, while Cass. is not. | |
3. How to tune and/or control shuffle behaviour in Spark? | |
1) Use `.toDebugString` method to see cases of shuffling. | |
2) Check Spark Web UI , section "shuffle read/write" | |
3) It's good to know which operations triggers shuffle and which not. | |
For example, use reduceByKey instead of groupByKey on RDD, etc. | |
4. Dataset vs. DataFrame vs. RDD in Spark? | |
Please list all pros and cons from the performance standpoint. | |
DataFrame(alias of Dataset[Row]) and Dataset have to be used on a high-level functionalty (sql joins, | |
selects, etc.) when we have structured data. | |
RDD wins on low-level operations with unstructured data and has no optimizations | |
for `sql-like` operations out of the box at all. | |
DF and DS are faster than RDD. | |
DF and DS have similar perfomance. | |
5. How Spark on YARN works? | |
Please describe what YARN do in each step of spark application life ( from spark-submit | |
to the end of app ). | |
There are two modes: client and cluster. | |
We submit job with parameters(such as mode, memory per executor, etc) | |
YARN starts Application Master | |
App requests resources for execution | |
YARN launches containers | |
Each container has it's own copy of application | |
Spark Executors are started and connected to driver | |
Finally, start execution of our computation | |
If some container fails, YARN can start a new one and try to proceed execution. | |
Number of retries is limited for app master.(app restarts not for any kind of errors) | |
6. What is the purpose of Hive on Spark? | |
Authors want to replace MapReduce with Spark so computation speed of Hive queries significantly increase. | |
7. Say some AWS EMR cluster with Spark job ‘hangs’. | |
Please list all possible ways ( tools, metrics and so on) to understand what is going on. | |
1) Spark Web UI. | |
We can identify problem stage and view logs for executors | |
2) Check YARN logs. | |
3) Enable aws emr debugging tool. I didn't use this tool, because did't work with spark on aws. | |
4) Significantly reduce input data volume and make many debug `println`s. | |
After reducing, we rerun job on small volume of data and try to see bottlenecks. | |
8. Write short example ( pseudo-code ) of creating DataFrame from data from socket using | |
Spark data source API. | |
import org.apache.spark.sql.SparkSession | |
val spark = SparkSession | |
.builder | |
.appName("ApplicationName") | |
.getOrCreate() | |
val dataFrame = spark.readStream | |
.format("socket") | |
.option("host", "localhost") | |
.option("port", 9999) | |
.load() | |
9. | |
// Say there is AWS EMR cluster with some Spark job ( see code below ). | |
// EMR option maximizeResourceAllocation is enabled. | |
// It computes amount of purchases for middle-aged customers by each day of month. | |
// How would you modify the code ( make it better from performance and style | |
// standpoints ) ? | |
// Please describe in details all of your modifications ( any modified/added line ). | |
// BEFORE REFACTORING | |
val numberOfDaysInMonth = 30// assume always 30 days in month | |
val getDayOfMonth = udf {... get number of day of month from some date .. } | |
val transactions = sql.read.option(..).load(..).where (..)// transactions for 30 days from DB | |
val transactionsByDay = | |
for (i <- 1 to numberOfDaysInMonth ) yield { | |
i -> transactions.filter(getDayOfMonth(col("date"))===lit(i)) | |
} | |
val customers = sql.read.option(..).load(..) | |
.filter(!(col("age")<30|| col("age") >50))// only middle-aged customers from DB | |
val customerTransactionsByDay = transactionsByDay.map( kv=> kv match{ | |
case (numOfDay, dataframe ) => | |
numOfDay-> { | |
dataframe .join ( | |
customers, | |
customers("customer") === dataframe("customer"), | |
"left_outer " | |
) | |
.where(dataframe ("customer")!== lit( null)) | |
.drop(dataframe("customer")) | |
} | |
case _=> ??? | |
}).toMap | |
var customerTransactionsAllDays = customerTransactionsByDay(0) | |
for(i <- 1 until customerTransactionsByDay.length ){ | |
customerTransactionsAllDays = | |
customerTransactionsAllDays .unionAll(customerTransactionsByDay(i)) | |
} | |
customerTransactionsAllDays .groupBy( | |
col("customer"), | |
getDayOfMonth(col("date")).as("day") | |
) | |
.agg (sum("amount").as("amount")) | |
.write .option (..). save (...)// save to DB | |
/////// AFTER Refactoring | |
import spark.implicits._ // for better syntax style $"col_name" instead of col("col_name") | |
val getDayOfMonth = udf {get number of day of month from some date } | |
val transactions = sql.read.option(..).load(..).where (..) | |
.filter(getDayOfMonth($"date") === lit(i) && | |
$"customer" =!= lit(null) | |
// it's better to filter data on loading(as early as you can), before any join operations | |
// because it reduces computation load | |
// and if you have many `null` values in $"customer" column | |
// you app can hang on groupBy($"customer") or on joining on `null` value. | |
) | |
val customers = sql.read.option(..).load(..) | |
.filter($"age" >= 30 && $"age"<= 50) // this is easier to read than (!(col("age") < 30|| col("age") >50)) | |
val result = transactions.join( | |
sc.broadcast(customers), // customers has to be significantly smaller than transactions | |
customers("customer") === dataframe("customer"), | |
"left_outer" | |
).drop(dataframe("customer")) | |
.groupBy( | |
$"customer", | |
getDayOfMonth($"date").as("date") | |
) | |
.agg(sum("amount").as("amount")) | |
result.write.option(..).save (...) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment