Skip to content

Instantly share code, notes, and snippets.

@aliabidzaidi
Last active February 18, 2022 07:23
Show Gist options
  • Save aliabidzaidi/4c3f0b06b20be1f1bae02272156c2e51 to your computer and use it in GitHub Desktop.
Save aliabidzaidi/4c3f0b06b20be1f1bae02272156c2e51 to your computer and use it in GitHub Desktop.
Notes on Apache Spark

Hadoop comma****nds

hdfs getconf -confKey fs.defaultFS  # ( new property )  # Get namenode of hadoop****

https://data-flair.training/blogs/top-hadoop-hdfs-commands-tutorial/

Spark SQL

Datasets

  • In spark 2.0, dataframe is Dataset of row objects
  • Datasets can be wrapped into typed data too, mostly transparent in Python ( because its dynamically typed )

Shell Access

User Defined Functions: Easily create functions and use them as sql functions

Using Spark 2.0 lowest rated movies this time using Dataframes

Spark The Definitive Guide (Oreilly Book)

Introduction

  • Unified computing engine and a set of libraries for parallel data processing on computer clusters.
  • Includes libraries for diverse tasks ranging from SQL to streaming and machine learning
  • An easy system to start with and scale-up to big data processing or incredibly large scale
  • Components and Libraries Spark offers: image

Apache Spark's Philosophy

Unified computing engine and a set of libraries for Big data

Unified

  • By unified it means Spark supports a wide range of data analytics tasks, ranging from simple data loading and SQL queries to machine learning and streaming computation, over the same computing engine and with a consistent set of APIs
  • No open source systems tried to provide this type of unified engine for parallel data processing, meaning that users had to stitch together an application out of multiple APIs and systems

Computing Engine

  • Spark handles loading data from storage systems and performing computation on it, not permanent storage as the end itself
  • Can use Spark with a wide variety of persistent storage systems, including cloud storage systems such as Azure Storage and Amazon S3, distributed file systems such as Apache Hadoop, key-value stores such as Apache Cassandra, and message buses such as Apache Kafka
  • Data is expensive to move so Spark focuses on performing computations over the data, no matter where it resides
  • Spark neither stores data long term itself, nor favors one over another

Libraries

  • Spark supports both standard libraries that ship with the engine as well as a wide array of external libraries published as third-party packages by the open source communities
  • One index of external libraries is available at spark-packages.org

Context: The Big Data Problem

  • Computers became faster every year through processor speed increases: the new processors each year could run more instructions per second than the previous year’s
  • Unfortunately, this trend in hardware stopped around 2005: due to hard limits in heat dissipation, hardware developers stopped making individual processors faster, and switched toward adding more parallel CPU cores all running at the same speed
  • Software developed in the past 50 years cannot automatically scale up, and neither can the traditional programming models for data processing applications, creating the need for new programming models.
  • It is this world that Apache Spark was built for.

History of SPark

  • Apache Spark began at UC Berkeley in 2009 as the Spark research project: Research Paper
  • Spark’s core idea of composable APIs has also been refined over time. Early versions of Spark (before 1.0) largely defined this API in terms of functional operations—parallel operations such as maps and reduces over collections of Java objects.
  • Beginning with 1.0, the project added Spark SQL, a new API for working with structured data—tables with a fixed data format that is not tied to Java’s in-memory representation
  • Over time, the project added a plethora of new APIs that build on this more powerful structured foundation, including DataFrames, machine learning pipelines, and Structured Streaming, a high-level, automatically optimized streaming API.

Running Spark

Spark for SQL

./bin/spark-sql
OR
spark-sql

Spark for Scala

spark-shell

Spark for Python

pyspark

Data Used

Gentle Introduction to Spark

  • We will walk through the core architecture of a cluster, Spark Application, and Spark’s structured APIs using DataFrames and SQL
  • A cluster, or group, of computers, pools the resources of many machines together, giving us the ability to use all the cumulative resources as if they were a single computer
  • Now, a group of machines alone is not powerful, you need a framework to coordinate work across them
  • Spark does just that, managing and coordinating the execution of tasks on data across a cluster of computers.
  • The cluster of machines that Spark will use to execute tasks is managed by a cluster manager like Spark’s standalone cluster manager, YARN, or Mesos
  • We then submit Spark Applications to these cluster managers, which will grant resources to our application so that we can complete our work

Spark Applications

  • Spark Applications consist of a driver process and a set of executor processes
  • The driver process runs your main() function, sits on a node in the cluster, and is responsible for three things:
    • maintaining information about the Spark Application
    • responding to a user’s program or input
    • analyzing, distributing, and scheduling work across the executors
  • The driver process is absolutely essential—it’s the heart of a Spark Application and maintains all relevant information during the lifetime of the application.
  • The executors are responsible for actually carrying out the work that the driver assigns them
  • The executors, for the most part, will always be running Spark code. However, the driver can be “driven” from a number of different languages through Spark’s language APIs

image


  • There is a SparkSession object available to the user, which is the entrance point to running Spark code.
  • When using Spark from Python or R, you don’t write explicit JVM instructions; instead, you write Python and R code that Spark translates into code that it then can run on the executor JVMs

image


Spark APIs

Spark has two fundamental sets of APIs

  • the low-level “unstructured” APIs, and
  • the higher-level structured APIs (primary focused)

SparkSession

  • Its used to send user commands and data to Spark
  • SparkSession is created automatically when you run spark-shell or pyspark, but with spark-submit have to create it on your own
  • There is a one-to-one correspondence between a SparkSession and a Spark Application

Writing your first spark code in pyspark

# Spark Global Variable
spark 

# Create a column containing 0-999 numbers, this is a spark DataFrame
myRange = spark.range(1000).toDF("number")

DataFrames

  • A DataFrame is the most common Structured API and simply represents a table of data with rows and columns
  • The list that defines the columns and the types within those columns is called the schema
  • You can think of a DataFrame as a spreadsheet with named columns
  • These Spark Data Frame can span thousands of computers
  • The reason for putting the data on more than one computer should be intuitive
    • Either the data is too large to fit on one machine OR
    • It would simply take too long to perform that computation on one machine.

image


Note: Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). The easierst and most efficient are DataFrames

Partitions

  • To allow every executor to perform work in parallel, Spark breaks up the data into chunks called partitions.
  • A partition is a collection of rows that sit on one physical machine in your cluster.
  • A DataFrame’s partitions represent how the data is physically distributed across the cluster of machines during execution
  • with DataFrames you do not (for the most part) manipulate partitions manually or individually.
  • You simply specify high-level transformations of data in the physical partitions, and Spark determines how this work will actually execute on the cluster.
  • Lower-level APIs do exist (via the RDD interface)

Running spark without hadoop


Transforamtions

  • In Spark, the core data structures are immutable, meaning they cannot be changed after they’re created.
  • This might seem like a strange concept at first: if you cannot change it, how are you supposed to use it?
  • To “change” a DataFrame, you need to instruct Spark how you would like to modify it to do what you want. These instructions are called transformations.

Using pyspark again:

divisBy2 = myRange.where("number%2 = 0")

divisBy2.show()

  • Notice that these return no output. This is because we specified only an abstract transformation, and Spark will not act on transformations until we call an action
  • Transformations are the core of how you express your business logic using Spark. There are two types of transformations:
    • those that specify narrow dependencies, and
    • those that specify wide dependencies.
  • Transformations consisting of narrow dependencies (we’ll call them narrow transformations) are those for which each input partition will contribute to only one output partition.
  • A wide dependency (or wide transformation) style transformation will have input partitions contributing to many output partitions

image

image


Lazy Evaluation

  • Lazy evaulation means that Spark will wait until the very last moment to execute the graph of computation instructions
  • In Spark, instead of modifying the data immediately when you express some operation, you build up a plan of transformations that you would like to apply to your source data
  • By waiting until the last minute to execute the code, Spark compiles this plan from your raw DataFrame transformations to a streamlined physical plan that will run as efficiently as possible across the cluster
  • This provides immense benefits because Spark can optimize the entire data flow from end to end

An example of this is something called predicate pushdown on DataFrames. If we build a large Spark job but specify a filter at the end that only requires us to fetch one row from our source data, the most efficient way to execute this is to access the single record that we need. Spark will actually optimize this for us by pushing the filter down automatically.

Actions

  • Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action.
  • An action instructs Spark to compute a result from a series of transformations.
  • There are 3 kinds of actions
    • Actions to view data in the console
    • Actions to collect data to native objects in the respective language
    • Actions to write to output data sources

A simple example of action is count pyspark

divisBy2.count()

Spark UI

So our previous commands on pyspark show that in specifying this action,

  • we started a Spark job that runs our filter transformation (a narrowtransformation),
  • then an aggregation (a wide transformation) that performs the counts on a per partition basis,
  • and then a collect, which brings our result to a native object in the respective language

You can see all of this by inspecting the Spark UI localhost:4040 The Spark UI displays information on the state of your Spark jobs, its environment, and cluster state


An End to End Example on Spark

flight data

Loading csv into DataFrame

  • We'll use DataFrameReader to read csv file
  • Use Schema Inference to guess schema of our DF, with first row as header
  • Each of these DataFrames (in Scala and Python) have a set of columns with an unspecified number of rows,
  • Unspecified because reading data is a transformation, and is therefore a lazy operation
  • Spark peeked at only a couple of rows of data to try to guess what types each column should be
flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("/home/hdoop/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv") 
flightData2015.take(3) # 

image


Transformations: Sort a DataFrame

  • Remember, sort does not modify the DataFrame.
  • We use sort as a transformation that returns a new DataFrame by transforming the previous DataFrame.
  • Let’s illustrate what’s happening when we call take on that resulting DataFrame
  • Nothing happens to the data when we call sort because it’s just a transformation
  • We can see that Spark is building up a plan for how it will execute this across the cluster by looking at the explain plan

image

flightData2015.take(3)

flightData2015.sort("count")

flightData2015.sort("count").explain() #  to see the DataFrame’s lineage (execution plan)

Output: 
== Physical Plan ==
*(1) Sort [count#38 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#38 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#109]
   +- FileScan csv [DEST_COUNTRY_NAME#36,ORIGIN_COUNTRY_NAME#37,count#38] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hdoop/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>



Shuffle Partitions

  • By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let’s set this value to 5 to reduce the number of the output partitions from the shuffle
  • The logical plan of transformations that we build up defines a lineage for the DataFrame so that at any given point in time, Spark knows how to recompute any partition by performing all of the operations it had before on the same input data.
  • This sits at the heart of Spark’s programming model—functional programming where the same inputs always result in the same outputs when the transformations on that data stay constant
  • We do not manipulate the physical data; instead, we configure physical execution characteristics through things like the shuffle partitions parameter that we set a few moments ago

image

spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

flightData2015.sort("count").explain()

Only Difference in output: 
Exchange rangepartitioning(count#38 ASC NULLS FIRST, **5**), ENSURE_REQUIREMENTS, [id=#133]

Using SQL with DF

  • Spark can run the same transformations, regardless of the language, in the exact same way
  • You can express your business logic in SQL or DataFrames (either in R, Python, Scala, or Java) and Spark will compile that logic down to an underlying plan
  • With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL.
  • There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.
  • We’ll use the spark.sql function (remember, spark is our SparkSession variable) that conveniently returns a new DataFrame
# Create a DF to table or view
flightData2015.createOrReplaceTempView("flight_data_2015")

#
sqlWay = spark.sql("""SELECT DEST_COUNTRY_NAME, count(1) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME""")
dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()
sqlWay.explain()
dataFrameWay.explain()

Stats from data

  • DataFrames (and SQL) in Spark already have a huge number of manipulations available
  • We will use the max function, to establish the maximum number of flights to and from any given location.
  • This just scans each value in the relevant column in the DataFrame and checks whether it’s greater than the previous values that have been seen.
  • flightData2015.select(max("count")).take(1)
spark.sql("SELECT max(count) from flight_data_2015").take(1)

#
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

Lets find the top five destination countries in the data


maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

  • let’s move to the DataFrame syntax that is semantically similar but slightly different in implementation and ordering.
  • But, as we mentioned, the underlying plans for both of them are the same
#
from pyspark.sql.functions import desc

flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

  • Now there are seven steps that take us all the way back to the source data
  • This execution plan is a directed acyclic graph (DAG) of transformations, each resulting in a new immutable DataFrame, on which we call an action to generate a result
  • The true execution plan (the one visible in explain) will differ from that shown in Figure because of optimizations in the physical execution;
    • The first step is to read in the data
    • The second step is our grouping; technically when we call groupBy, we end up with a RelationalGroupedDataset, and that now we’re going to perform an aggregation over each one of those keys.
    • the third step is to specify the aggregation. Let’s use the sum aggregation method.
    • The fourth step is a simple renaming. We use the withColumnRenamed method
    • The fifth step sorts the data such that if we were to take results off of the top of the DataFrame,
  • Explain both SQL and DF and comparing both of them

image

#
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#227L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#36,destination_total#227L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[sum(cast(count#38 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#36, 5), ENSURE_REQUIREMENTS, [id=#520]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#36], functions=[partial_sum(cast(count#38 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#36,count#38] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hdoop/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>

Although this explain plan doesn’t match our exact “conceptual plan,” all of the pieces are there. You can see the limit statement as well as the orderBy (in the first line). You can also see how our aggregation happens in two phases, in the partial_sum calls. This is because summing a list of numbers is commutative, and Spark can perform the sum, partition by partition. Of course we can see how we read in the DataFrame, as well.

  • Naturally, we don’t always need to collect the data
  • We can also write it out to any data source that Spark supports. For instance, suppose we want to store the information in a database
  • We talked about transformations and actions, and how Spark lazily executes a DAG of transformations in order to optimize the execution plan on DataFrames.
  • We also discussed how data is organized into partitions and set the stage for working with more complex transformations


A Tour of Spark’s Toolset

image

  • Spark’s libraries support a variety of different tasks, from graph analysis and machine learning to streaming and integrations with a host of computing and storage systems
  • This section will include
    • Running production applications with spark-submit
    • Datasets: type-safe APIs for structured data
    • Structured Streaming
    • Machine learning and advanced analytics
    • Resilient Distributed Datasets (RDD): Spark’s low level APIs
    • SparkR
    • The third-party package ecosystem

Running Production Application

  • Spark makes it easy to develop and create big data programs and turn your interactive exploration into production applications with spark-submit, a built-in command-line tool
  • spark-submit does one thing: it lets you send your application code to a cluster and launch it to execute there
  • Upon submission, the application will run until it exits (completes the task) or encounters an error.
  • You can do this with all of Spark’s support cluster managers including Standalone, Mesos, and YARN

This sample application calculates the digits of pi to a certain level of estimation

./bin/spark-submit \
--master local \
./examples/src/main/python/pi.py 10

By changing the master argument of spark-submit, we can also submit the same application to a cluster running Spark’s standalone cluster manager, Mesos or YARN.

Datasets: Type-Safe Structured APIs

  • A type-safe version of Spark’s structured API called Datasets,
  • The Dataset API is not available in Python and R, because those languages are dynamically typed.
  • One great thing about Datasets is that you can use them only when you need or want to
  • This makes it easy to drop down to lower level, perform type-safe coding when necessary, and move higher up to SQL for more rapid analysis
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flightsDF = spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
  • One final advantage is that when you call collect or take on a Dataset, it will collect objects of the proper type in your Dataset, not DataFrame Rows
// in Scala
flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(flight_row => flight_row).take(5)

flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))

Structured Streaming

  • a high-level API for stream processing that became production-ready in Spark 2.2.
  • With Structured Streaming, you can take the same operations that you perform in batch mode using Spark’s structured APIs and run them in a streaming fashion
# in Python
staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

  • working with time–series data, it’s worth mentioning how we might go along grouping and aggregating our data
  • The window function will include all data from each day in the aggregation
  • This is a helpful tool for manipulating date and timestamps because we can specify our requirements in a more human form (via intervals),
# For example, let’s add a total cost column and see on what days a customer spent the most.
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_

Structured API Overview

  • APIs are a tool for manipulating all sorts of data, from unstructured log files to semi-structured CSV files and highly structured Parquet files
  • three core types of distributed collection
    • Datasets
    • DataFrames
    • SQL tables and views
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment