Skip to content

Instantly share code, notes, and snippets.

@FelAl
Last active February 5, 2019 23:24
Show Gist options
  • Save FelAl/5f214de773c01c90c7ce8f410164959d to your computer and use it in GitHub Desktop.
Save FelAl/5f214de773c01c90c7ce8f410164959d to your computer and use it in GitHub Desktop.
SparkQuestions.scala
1. What are pros and cons of NoSQL DB vs. traditional relational DB?
1) NoSQL have column oriented databases(we can take only 3 rows out of 200, it saves memory space)
2) NoSQL is more optimal solution for unstructured data.
3) SQL scales vertically(example, update CPU from 3.0 MHZ to 4.0 MHZ on the server),
NoSQL scales horizontally(example, add one or two new server)
4) Maintence of SQL DB can require downtime, when with NoSQL it's possible to do without any downtime.
5) NoSQL requires additional storage for storing field's name
6) SQL DBs have ACID compliance (Atomicity, Consistency, Isolation, Durability),
while NoSQL has not(this is how flexibility and access speed achieves).
2. Cassandra vs. HBase - pros and cons?
1) HBase is a part of Hadoop Stack
2) HBase(because of HDFS) has single point of failure(namenodes)
3) HBase is strongly consistent, while Cass. is not.
3. How to tune and/or control shuffle behaviour in Spark?
1) Use `.toDebugString` method to see cases of shuffling.
2) Check Spark Web UI , section "shuffle read/write"
3) It's good to know which operations triggers shuffle and which not.
For example, use reduceByKey instead of groupByKey on RDD, etc.
4. Dataset vs. DataFrame vs. RDD in Spark?
Please list all pros and cons from the performance standpoint.
DataFrame(alias of Dataset[Row]) and Dataset have to be used on a high-level functionalty (sql joins,
selects, etc.) when we have structured data.
RDD wins on low-level operations with unstructured data and has no optimizations
for `sql-like` operations out of the box at all.
DF and DS are faster than RDD.
DF and DS have similar perfomance.
5. How Spark on YARN works?
Please describe what YARN do in each step of spark application life (​ from spark-submit
to the end of app ​ ).
There are two modes: client and cluster.
We submit job with parameters(such as mode, memory per executor, etc)
YARN starts Application Master
App requests resources for execution
YARN launches containers
Each container has it's own copy of application
Spark Executors are started and connected to driver
Finally, start execution of our computation
If some container fails, YARN can start a new one and try to proceed execution.
Number of retries is limited for app master.(app restarts not for any kind of errors)
6. What is the purpose of Hive on Spark?
Authors want to replace MapReduce with Spark so computation speed of Hive queries significantly increase.
7. Say some AWS EMR cluster with Spark job ‘hangs’.
Please list all possible ways (​ tools, metrics and so on) ​ to understand what is going on.
1) Spark Web UI.
We can identify problem stage and view logs for executors
2) Check YARN logs.
3) Enable aws emr debugging tool. I didn't use this tool, because did't work with spark on aws.
4) Significantly reduce input data volume and make many debug `println`s.
After reducing, we rerun job on small volume of data and try to see bottlenecks.
8. Write short example (​ pseudo-code ​ ) of creating DataFrame from data from socket using
Spark data source API.
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("ApplicationName")
.getOrCreate()
val dataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
9.
// Say there is AWS EMR cluster with some Spark job (​ see code below ​ ).
// EMR option ​ maximizeResourceAllocation ​ is enabled.
// It computes amount of purchases for middle-aged customers by each day of month.
// How would you modify the code (​ make it better from performance and style
// standpoints ) ​ ?
// Please describe in details all of your modifications (​ any modified/added line ​ ).
// BEFORE REFACTORING
val numberOfDaysInMonth = 30​// assume always 30 days in month
val getDayOfMonth ​ = ​ udf {... ​ get ​ number of day of month from ​ some date ​ .. ​ }
val transactions ​ = ​ sql​.read​.option​(..).​load​(..).where​ (..)​// transactions for 30 days from DB
val transactionsByDay =
for​ ​(i ​<-​ ​1 to numberOfDaysInMonth​ ) yield​ {
i ->​ transactions​.filter​(getDayOfMonth​(col​("​date​"))===lit(i))
}
val customers ​= ​sql​.read​.option​(..).​load​(..)
.​filter​(!(col​("​age​")​<30​||​ col​("​age​") >​50​))// only middle-aged customers from DB
val customerTransactionsByDay = transactionsByDay​.map( kv=>​ kv match{
case​ (numOfDay​, dataframe​ ) =>
numOfDay->​ {
dataframe​ .join​ (
customers​,
customers​("​customer​")​ ===​ dataframe​("​customer​"),
"left_outer​ "
)
.​where​(dataframe​ ("​customer​")​!==​ lit(​ null))
.​drop​(dataframe​("​customer​"))
}
case​ _=> ???
}).​toMap
var​ customerTransactionsAllDays = customerTransactionsByDay​(0)
for​(i <-​ 1 until​ customerTransactionsByDay​.length​ ){
customerTransactionsAllDays =
customerTransactionsAllDays​ .unionAll​(customerTransactionsByDay​(i))
}
customerTransactionsAllDays​ .groupBy(
col​("​customer​"),
getDayOfMonth​(col​("​date​")​).​as​("​day​")
)
.​agg​ (sum​("​amount​").​as​("​amount​"))
.​write​ .option​ (..​).​ save​ (...)​// save to DB
/////// AFTER Refactoring
import spark.implicits._ // for better syntax style $"col_name" instead of col("col_name")
val getDayOfMonth = ​udf {get ​ number of day of month from ​ some date ​}
val transactions = ​sql​.read​.option​(..).​load​(..).where​ (..)​
​.filter​(getDayOfMonth​($"​date​") === lit(i) &&
$"customer" =!= lit(null)
// it's better to filter data on loading(as early as you can), before any join operations
// because it reduces computation load
// and if you have many `null` values in $"customer" column
// you app can hang on groupBy($"customer") or on joining on `null` value.
)
val customers ​ = ​ sql​.read​.option​(..).​load​(..)
.​filter​($"​age​" >= 30​ &&​ $​"​age​"<= 50​) // this is easier to read than (!(col​("​age​")​ < 30​||​ col​("​age​") >​50​))
val result = transactions.join(
sc.broadcast(customers), // customers has to be significantly smaller than transactions
customers​("​customer​")​ ===​ dataframe​("​customer​"),
"left_outer​"
).​drop​(dataframe​("​customer​"))
.groupBy(
$"​customer​",
getDayOfMonth​($"​date​"​).​as​("​date")
)
.​agg​(sum​("​amount​").​as​("​amount"))
result.​write​.option​(..​).​save​ (...)​
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment