- According to Apache, Spark is "a fast and general engine for large-scale data processing"
- Since it runs on a cluster, it is very scalable
- It has a built in cluster manager, but it can also run on top of a Hadoop cluster, which would then use YARN
- According to Apache, Spark can "run programs up to 100x faster than Hadoop
MapReduce in memory, or 10x faster on disk"
- This may be hyperbolic, instructor has seen speed improvements of 3-4x
- DAG (directed acyclic graph) engine optimizes workflows
- Very popular in industry
- Can code in Python, Java, or Scala
- Built around one main concept: the Resilient Distributed Dataset (RDD)
- Components of Spark
- Spark Core
- Spark Streaming
- Spark SQL
- MLLib
- GraphX
- Spark Core
- Why Python?
- No need to compile, manage dependencies, etc.
- Less coding overhead
- Many people already know Python
- But...
- Scala is probably a more popular choice with Spark
- Spark is built in Scala, so coding in Scala is "native" to Spark
- New features, libraries tend to be Scala first
- Python code in Spark looks a lot like Scala code
- Created by your driver program
- Is responsible for making RDDs resilient and distributed
- Creates RDDs
- The Spark shell creates a 'sc' object for you
- Can create RDDs from:
- Text files
- JDBC
- Cassandra
- HBase
- Elastisearch
- JSON, CSV, sequence files, object files, various compressed formats
- Most common transformations:
- map: allows you to take a set of data and transform it to another set of
data using a function that operates on the data
- Has a 1-to-1 relationship between input RDDs and output RDDs (i.e. you'll get the same number of RDDs before and after the map operation)
- flatmap: similar to map, but it has the capability to produce multiple RDDs for every input RDD
filter
: can filter out information you don't need- Takes a function and returns a boolean
distinct
: gets the unique values in an RDDsample
: pulls a random sample from an RDDunion
,intersection
,subtract
,cartesian
: operations that manipulate multiple RDDs
- map: allows you to take a set of data and transform it to another set of
data using a function that operates on the data
- Most common actions:
collect
: print out all values in an RDDcount
: counts the number of items in an RDDcountByValue
: breakdown by unique value that each element appearstake
: lets you sample a few values from an RDDtop
: similar to take, it lets you sample values from an RDDreduce
: lets you write a function that combines all the values for each given key
- Nothing actually happens in your driver program until an action is called (Lazy Evaluation)
- RDDs can hold key/value pairs
- Simple one-line function to map pairs of data into the RDD
- For example, to map all the keys with a value of 1 (for the purpose of
counting by key perhaps) you could use the following statement:
- keyValuePairs = rdd.map(lambda x: (x, 1))
- It's ok to have lists as values as well
- For example, to map all the keys with a value of 1 (for the purpose of
counting by key perhaps) you could use the following statement:
- Key/Value Functions:
reduceByKey()
: combines values with the same key using some function- rdd.reduceByKey(lambda x, y: x + y) adds up all values by key
groupByKey()
: groups values into a list by keysortByKey()
: sorts RDD by key valueskeys()
,values()
: creates an RDD of just the keys or just the values
- You can do SQL-style joins on two key/value RDDs
- join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey
- With key/value data, use mapValues() or flatMapValues() if your transformation
doesn't affect the keys
- It's more efficient because it allows Spark to maintain the partitioning from your original RDD
map
will transform each element of an RDD into a one new element in a new RDD, i.e. it has a 1-to-1 relationship between the input and output RDDs- Example: capitalizing every character in a line of text
flatmap
will transform each element of an RDD into more or fewer than one element- Example: breaking up a line of text into individual words
- Spark allows you to send information one time to all the executor nodes in
your cluster and keep it there
- This would be useful for lookup tables or anything that needs to be referenced within your script, especially if the script is massive
- This is called broadcasting & is as simple as
sc.broadcast()
- You then use
.value()
to get the object back
- You then use
- A way to traverse through nodes in a graph, searching all neighbor nodes emanating from a root node and working outward
- An accumulator allows many executors to increment a shared variable across a cluster, basically a counter that is maintained and synchronized across all the nodes in your cluster
- Example: find similarities between movies based on user ratings
- Find every pair of movies that were watched by the same person
- Measure the similarity of their ratings across all users who watched both
- Sort by movie, then by similarity strength
- In this example, you would query the final RDD of movie similarities multiple
times
- Any time you will perform more than one action on an RDD, you should cache it
- Otherwise, Spark might re-evaluate the entire RDD all over again
- Use
.cache()
or.persist()
to do this- Difference:
.persist()
optionally lets you cache it to disk instead of memory, just in case a node fails
- Difference:
- Amazon service
- Quick and easy way to rent time on a cluster
- Sets up a default spark configuration for you on top of Hadoop's YARN cluster manager
- Spark also has a built-in standalone cluster manager and scripts to set up
its own EC2-based cluster
- But the AWS console is even easier
- Spark on EMR isn't really expensive, but it's not free
- Unlike MapReduce with MRJob, Spark uses m3.xlarge instances
- You have to remember to shut down your clusters when you're done, or you'll be charged for that time
- You'll also want to make sure things run locally on a subset of your data
first
- Can use RDD operations such as
top
orsample
to create smaller datasets to test on
- Can use RDD operations such as
- You still need to know your data to run Spark jobs efficiently, it's not magic
- Spark does not distribute automatically on its own
- Use .partitionBy() on an RDD before running a large operation that benefits
from partitioning
cogroup()
,groupWith()
,join()
,leftOuterJoin()
,rightOuterJoin()
,groupByKey()
,reduceByKey()
,combineByKey()
, andlookup()
preserve your partitioning in their result- With any large RDDs, you should check to see if you have partitioned the RDD before calling any of these methods
- Choosing a partition size
- Too few partitions won't take full advantage of your cluster
- Too many partitions results in too much overhead from shuffling data
- You should use at least as many partitions as you have cores, or executors that fit within your available memory
- partitionBy(100) is usually a reasonable place to start for large operations
- If you use an empty SparkConf when using EMR, it will use all the defaults as set up by AWS. You can override these defaults with command line arguments
Logs
- If you're running on your own cluster (i.e. not on AWS), logs are shown on the web UI
- In YARN though, the logs are distributed - you need to collect them after the fact using yarn logs -applicationID
- While your driver script runs, it will log errors like executors failing to
issue heartbeats
- This generally means you are asking too much of each executor
- You may need more executors, i.e. more machines on your cluster
- Each executor may need more memory
- Or use partitionBy() to demand less work from individual executors by using smaller partitions
Managing Dependencies
- Executors aren't necessarily on the same box as your driver script
- Use broadcast variables to share data outside of RDDs
- Need a Python package that's not pre-loaded on EMR?
- Set up a stem in EMR to run pip for what you need on each worker machine
- Or use --py-files with spark-submit to add individual libraries that are on the master
- Try to avoid using obscure packages you don't need in the first place - time is money on an EMR cluster (and your own cluster for that matter)
- Extends RDDs to a "DataFrame" object
- Dataframes:
- Contain Row objects
- Can run SQL queries
- Have a schema (leading to more efficient storage)
- Read & write to JSON, Hive, Parquet
- Communicate with JDBC/ODBC, Tableau
- In Spark 2.0, a DataFrame is really a DataSet of Row objects
- DataSets can wrap known, typed data too. But this is mostly transparent in Python, since Python is untyped (more important in Java & Scala)
- It doesn't matter as much with Python, but the Spark 2.0 way is to use DataSets instead of DataFrames when you can
-
You can extend the SQL syntax yourself to do specialized operations
-
Example of a UDF to square a number:
from pyspark.sql.types import IntegerType
hiveCtx.registerFunction("square", lambda x: d*d, IntegerType())
df = hiveCtx.sql("SELECT square('someNumericField') FROM tableName)
- Feature extraction
- Term Frequency/Inverse Document Frequency useful for search
- Basis statistics
- Chi-squared test, Pearson or Spearman correlation, min, max, mean, variance
- Linear regression, logistic regression
- Support Vector Machines
- Naive Bayes classifier
- Decision trees
- K-means clustering
- Principal component analysis, singular value decomposition
- Recommendations using Alternating Least Squares (winner of Netflix prize)
- Vector (dense or sparse)
- LabeledPoint
- Rating
- ALS is very sensitive to the parameters chosen. It takes more work to find
optimal parameters for a dataset than to run the recommendations
- Can use "train/test" to evaluate various permutations of parameters
- But this could lead to overfitting
- Can use "train/test" to evaluate various permutations of parameters
- No way to be sure the algorithm is working properly internally
- Don't always want to just put your faith in a black box
- Complicated isn't always better
- Never blindly trust results when analyzing big data
- Small problems in algorithms become big ones
- Very often the quality of your input data is the real issue
- Analyzes continual streams of data
- Common example: processing log data from a website or server
- Data is aggregated and analyzed at some given interval
- Can take data fed to a port, Amazon Kinesis, HDFS, Kafka, Flume, and others
- "Checkpointing" stores state to disk periodically for fault tolerance
- A "Dstream" object breaks up the stream into distinct RDDs
- Not technically "real-time" - it uses these Dstream microbatches
- A common batch increment is 1 second
- Not technically "real-time" - it uses these Dstream microbatches
- RDDs only contain one little chunk of incoming data
- "Windowed operations" cancombine results from mlutiple batches over some
sliding time window
window()
,reduceByWindow()
,reduceByKeyAndWindow()
updateStateByKey()
- Lets you maintain a state across many batches as time goes on
- For example, running counts of some event
- See stateful_network_wordcount.py example in Spark SDK
- Instead of using Dstreams and discrete RDDs, structured streaming models the stream as a Dataframe that just keeps expanding
- Streaming code ends up looking a lot like non-streaming code
- Dataframes provide interoperability with MLLib
- This deals with graphs in the computer science/network sense, e.g. a graph of a social network
- Currently (as of the time of the recording) it is Scala only
- It is only really useful for specific applications
- It can measure things like "connectedness", degree distribution, average path length, triangle counts - high level measures of a graph
- It can also join graphs together and transform graphs quickly
- It introduces a couple of new datatypes under the hood: VertexRDD & EdgeRDD
- Otherwise, GraphX code looks like any other Spark code for the most part , GraphX code looks like any other Spark code for the most part