Skip to content

Instantly share code, notes, and snippets.

@rupeshtiwari
Last active April 23, 2024 19:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rupeshtiwari/af5434930b88824866560e88a63a91e6 to your computer and use it in GitHub Desktop.
Save rupeshtiwari/af5434930b88824866560e88a63a91e6 to your computer and use it in GitHub Desktop.
Learning Apache spark notes

Apache Spark Learning Journey

This document outlines the structured content of my learning journey through Apache Spark, covering various topics from installation to advanced data processing techniques.

Course name: Spark Programming in Python for Beginners with Apache Spark 3

History

image image image

GFS --> HDFS Google MapReducer --> Hadoop MapReduce

image image image

Data Lake

image image

Before HDFS and MR what was there? Data warehouses like Teradata and Exadata

image image image image image image image image

Processing framework - core development framework , Apache Spark Orchestration - creating cluster, scale up down , manage cluster ; Hadoop Yarn, Kubernetes and Mesos

image image

Introduction to Apache Spark

image

Spark Core layer has 2 part: Spark Engine and Spark Core

image

You need cluster manager also known as Resource Manager / Container orchestrator could be YARN, K8, Mesos

Spark also does not have storage system so you use HDFS, S3, GCS, CFS (Casandra file system)

image

Spark can run data processing workload.

image

Spark compute engine: 1/ break work to task, 2/ schedule task, 3/ Monitor task, 4/ Fault taulrence.

Spark Core is programming interface api (support java , scala, python, R) these api uses RDD (Resielient distributed dataframes).

image

set of libraries , packages they use spark core api. They are of 4 categories used for data crunching.

  1. Spark SQL and Spark Data Frames.
  2. Streaming Libraries - allow to processes continous data streaming
  3. MLLib - machine learning
  4. GraphX - graph computation
image

Unified Platform ( batch, stream, structure, unstructure, semi structure , graph processing , machine learning , deep learning)

Spark code is easy compare to Hadoop mapreduce code.

Spark Environment

image image image image

Anoconda and Pycharm if u install you can run and test spark program.

Databricks Community Cloud environment

image image

You can create 1 cluster only in databricks community, if u dont use for 2 hrs they will delete it.

image

Jupyter Notebook

Used by developer and scientist

image

Anoconda community edition for individual edition that gives python environment and spark engine also. It will also create juptyer notebook.

image image

Apache Spark Execution Methods

What is spark? Distributed computing platform we create program and execute them on a spark cluster.

image image image image image image

Spark Processing Model

image image

When spark-submit happens then Application and containers will create. Spark Engine will ask container from underline cluster manager to start driver process. Driver will ask more container for execution process and start slave executers.

image image

A1 and A2 are 2 independent application they have their own driver and executers.

Spark Local and Execution Modes

image image

How does spark run on local machine?

image image

Spark runs locally in multi-thread execution or Single threaded application then u have only Driver and no execution then driver does everything. If u have 3 Thread then you have one thread for driver and 2 executer thread.

image image image

If u quit or log off client machine then exectuer will die in absense of driver, so for long running job this is not good.

image

If u quit or log off client machine then exectuer will die in absense of driver, so for long running job this is not good.

image

cluster mode = driver runs on cluster and good for long running jobs

image

Execution model - When to use what?

Cluster Manager local = IDE or local machine YARN = on real cluster 1/on-premise 2/on-cloud

image image

Spark cluster in GCP Dataproc cluster

Google Dataproc is on-demand YARN cluster that comes with Spark Setup.

image

1 master and 3 data nodes.

Application Logs

image

Yarn is going to collect all the log files from all nodes.

Spark is made to work with Log4J python logger is not compatible.

image

Spark Session

Each program will create driver and driver will create executer to run program.

image

Create a spark session object is the first thing in Spark. Look Spark Session as your Driver.

image

Spark session created and available as variable spark

image

In program you have to crate spark session. It is a singleton object and it should be one for one application.

image

Spark is highly configurable (appName, master, config for other config settings)

Lets create spark session and stop the session finally

image

Spark Configuration

image image image

All config can be overwritten by application code.

image

When to use which config, leave env variable and spark-defaults for cluster admin. So use command line or application code.

image image

Spark Data Frame

Spark data frame is a distributed structure.

Data Frame reader can read any type of files, its the abstraction over data file types. Data Frame reader needs to know the schema of csv file so it reads correctly, user option method for spark data frame.

image

image

Spark data frame has fixed schema and database table 2 dimentional.

You have to mention data types of the col of the csv to infer correct data type or you can use inferSchema option to intelligent guess for data type.

image

Spark Data Frame Partitions

image

Spark Data Frame is distributed structure that helps to do distributed data processing.

image

Your data file will be Stored as partition across the distribution node.

image

Each data frame is in memory rep of data partitions stored in HDFS (Hadoop or GCS or S3)

image

We ask driver to read the data file, driver reach out to cluster manager (YARN) and storage manager (HDFS / Hadoop) to get the data partitions and create a logical in-memory structure as data frame but the data did not load immedietely.

image

Spark session can be configured to create n executers with desired memory and cpu config. You configured 5 Executers, 10GB memory 5CPU cores for each executers. Driver will reach out to the Cluster Manager to ask for Containers. Once containers are allocated driver will start the executer, each executers is a JVM process with 5 CPU core and 10GB memory.

image

Now Driver is ready to distribute work to these executers. Driver will assign some data frame partitions to each JVM core. image

These executer core will load their own data partition to work on.

image

Spark will also try to minimize the network bandwidth, while assign partition to the executer it will assign the partition to the closest executers.

Spark Transformation and Actions

Data frame is internally a distributed data structure composed of bunch of partitions. While developing u work data frame as 2D table. Spark data frame is immutable data structure.

Spark Transformation

You give instruction to driver to do any changes on data frame called transformations (filter, where), driver will work with executers to acheive.

image

We are creating a graph of operation using transformation

image

So we create a DAG of activities of 2 types transformation and action

Transformation like where works on input DF and creates new output DF the original input DF remains unchanged.

image

Narrow vs Wide Dependencies Transformations

Narrow Transformations: A transformation can be performed in a single partition like where and they dont depend on other partition to produce correct result.

image

image

Valid result after combining each partition result.

image

Wide Dependencies Transformations: A transformation requires other partitions to create correct results like groupBy, orderBy, joint, distinct .

image

GroupBy breaks the partitions they need both partition to work on. We dont get correct result by combining each partition groupby.

image

How to fix it? We can repartition the grouped data, combine all the partitions and create the new partition such that all the data of the same group remains in the single partition, this is called shuffle/sort operation, spark will internally does shuffle/sort operation you do not need to do this.

image

Lazy Evaluations:

We use the builder pattern to create the DAG of transformation that is terminated and triggered by an anction then driver will create execution plan executed by executer.

image

Until action is called DAG will not execute

image

Transformatin is lazy and Action happens immedietly

image

image

Spark jobs, stages and tasks

read data file , infer schema, result is data frame, apply chain of ops result is data frame, then show result

image

Spark will take code, create low level spark code and prepare the execution plan.

show method is utility method to show the data.

When u load a csv file u get single partition, so u want to repartition it in 2 it is a transformation ops, so u get 2 partition, then next operations will execute on those 2 partitions. We forced repartitions.

image

countBy will do shuffle/sort and we dont know how many partitions will be created after shuffle/sort. You can use Spark App Config to control the shuffle/sort partition.

image

So the whole transformation will cause only 2 partitions.

Spark UI is available only during the life of the application to see the plan.

Spark UI has jobs, each job is broken down to stages, We have 5 Stage here.

image

Each stage is broken down to Tasks. Stage 3 and 4 has 2 task rest has 1 task.

image

Spark aplication will generate internal job, stage and task.

image

Spark Execution Plan

If you just load the csv data file into data frame is an action.

Each spark action is converted in spark job. This job is caused by csv action.

image

Every job has 1 stage and each stage has 1 task image

Each stage has 1 task.

image

Each stage has the DAG of internal operations. The sequence is the compiled code generated by spark shown in the DAG. Spark is a compiler that compiles high level code to low level code.

image

spark.read : Reading the physical file is action that will create one more job.

image

Now we have 2 jobs,

image

2 stages and 2 tasks total.

Reading a file, infering schema, are 2 internal jobs.

image

Break it into stages separated by shuffle operation..

image

3 stages image

image

1stage worked on 1 partition and created 2 partition write in internal buffer called exchange

2nd stage run 2 tasks since 2 partition, finished by group by write to exchange

3rd stage will read from exchange and work on 2 partition tasks.

Finally the collect runs in driver to collect the final results

image

Check all this in Spark UI, 3 jobs

image

This DAG is for the whole 3 jobs. See 3 stages.

image

Chapter 5: Spark Structured API Foundation

Spark APIs

RDD APIs: is at the core, use it to develop the application, most challenging to learn and use. Spark community recommending to not use RDD.

Catalyst Optimizer: Code pases to the catalyst to decide how to create executed plan

Spark SQL, Dataframe API and Dataset API: Use spark sql everywhere. A sophisticated data pipeline use dataframe api. The dataset api are the language native api in scala and java. They are strongly typed objects such as scala or java. They can't be used in python.

image

RDD is raw you have to read the file type, compress, and do lot of other work. DataFrame is SQL like operations on top of RDD make it easy.

image

Spark SQL over Dataframe

image

image

image

Spark SQL Engine

Spark SQL Engine is the compiler that optimizes code and generates code

image

If you use Spark SQL api then u get:

  • Runtime error will come during Analysis phase

  • Logical optimizatin such as predicate push down, boolean expression simplification, constant failing.

  • Physical plan is RDD ops

  • Code Generate to generate efficient JAVA byte code to run on each executer.

Summary

Use Spark SQL but for data crunching problem use Dataframe and do not use RDD. Spark SQL Engine automatically make catalyst to work on optimization.

image

Chapter 6 : Spark Data Sources and Sinks

Spark data sources / sinks

Spark is used to process big data.

image

Data sources required for spark can be 2 types 1. Internal and 2. External datasources.

image

All external data sources must be read in dataframe format to process in spark.

image

So you use 2 approaches. 1/ bring external data to the datalake 2/ use spark datasource api to directly connect with external system.

1/ data from external data sources to your datalake and store in distributed storage by using data integration tools like kafka connect, Informatica, AWS Glue etc.

image

image

  • Batch Operations: Option 1: Bringing data to the datalake then process in spark
  • Data Stream Operations : Option 2: Connect spark data processing directly to external data source.

Batch Processing Requirement for external data sources

Use data integration tool to bring data to the distributed data storage and then start processing it.

image

In batch processing why do we bring data to data lake first then proces, why do we do 2 step process?

  • Modularity: Before processing data, Bringing data correctly and efficiently to the lake is a complex goal. We want to decouple the ingestion from processing to improve manageability.

  • Load Balance: Source system was designed for different business purpose and it has defined capacity. If you connect spark workload to these system then u have to replan the sourcedata capacity.

  • Security: You have to redesign the security aspect of the datasource if you will want to connect spark workload directly

  • Flexibility: Spark is an excellent tool for data processing , it is not designed for data ingestion.

Well designed applications will not directly connect to datasoures.

For Internal Data Sources

It is a distributed storage could be HDFS or GSC or S3. Data stored in the sytem in various datafile format. Spark SQL Tables and Delta Lakes has meta data that is stored outside the data-file, so they are not file sources category.

image

Data sink

Data Sink is the final destination of the processed data. Load data from internal or external source, then handle it using spark API once processing is complete save the outcome to external or internal system.

image

Working with data source is all about reading data, working with sink is all about writing the data.

Spark DataFrameReader API

image

image

Read a CSV file using dataframe reader

image

Output: All data type is string.

image

Set option to infer schema , all numeric is integer but date is still a string

image

Use datfile format that comes with schema or explicitly define schema to fix all dat types.

image

Read JSON

JSON file will always infer schema out of the box.

image

integer is bigint and date is string

image

To fix reading JSON or CSV schema issue you have 2 options.

Read Parquet (Recommended and default file format in Apache Spark)

Parquet data file has correct schema so during read u dont need to infer.

image

It loaded in same schema as input file image

Data Frame Schema

Schema inference not working for JSON and CSV file.

image

Schema design with DDL or program

image

Program style, no need infer option , date type, string type ,integer type when type does not match it should fail at run time.

image

DDL format to define schema

image

Spark DataFrameWriter API

dataframe writer can write to internal or external

image

image

Data frame is partition, when u write for each partition it creates a file. You can re-partition to write to the sink by using repartition or use partitionBy(col1,col2) using key this is effecient, or bucketBy(n,col1,col2) is only available on Spark managed storage.

image

SortBy used by bucketBy Max record per file limit records per file protects to create huge files

image

If you want to work with Avro u have to add avro jar file in the spark cluster.

image

Create one or more avro files in destination depending on data frame partitions

image

We have 3 partitions but the record count in rest partitions are 0

image

image

Now lets repartition forcefully to 5

image

Partition data gives 2 benefit partition elimination for certain read operation.

Partition equally all data will not give elimination benefit so use partitionBy col method.

image

image

So now u have dir per carier and for each carrier u get origin as sub dir

image

image

image

This way you can search fastly your data reader can directly search a perticular carrier and its origin

image

image

Spark Databases and Tables

Spark is DB engine you can create database, table and views. It has table data and table metadata. Bydefault its parquet file.

Catalog Metastore

Spark has in-memory catalog persisted per spark session. Metastore has info about table and its schema, Schema, table name, col name,partitions, the physical location where data is resides. Spark catalog destroyes when session ends therefore, spark decided to re-uses Apache Hive metastore.

image

Spark Tables

  • Managed Table - spark manage both metadata and data. It creates metadata and store the data in the spark sql datawarehouse dir. You can't change the location at runtime.

image

  • Unmanaged Table - Designed for temp mapping existing data and using in spark table. for external table, spark will create the catalog metastore and sotre in the metastore. When u create the unmanaged table you have to define the data dir location for the table. Managed table must be stored inside the internal store. but for managed table u have flexibility to store outside external datasource.

image

Suppose you have data external dir location , u want to use spark sql on that data, spark sql engine not know about this data. image

so u create unmanged table and map the external data to spark metadata, now u can run spark sql on the external data image

If you drop unmanaged data table it just removes the metadata only. But in managed table it will delete both data and metadata. image

Managed tables are preffered they give bucketing and sorting on internal tables.

image

Working with Spark SQL Tables

Creating Managed Table needs persistent metastore , Spark depends on Hive metastore so u need Apache Hive.

image

Creating Managed Table and save the data in the parquet formate internally.

Spark is Database and managed tables can be used by SQL compliance tools like using SQL expression over JDBC/ODBC connectors, 3rd party tools Talend, Tableau, PowerBI.

However, plain datafiles like parquet, avro, csv and json will not be accessable by JDBC/ODBC interface.

image

saveAsTable will create file as a managed table inside the default database.

image

Lets save flights avro file as table

image

Spark Data warehouse created with database and data table inside table u have datafiles in snappy parquet format

image

Lets run the project image

You have persistent Metastore_db is common location for all spark applications, it can be changed by config file your admin does this

image

Now lets create partitioned table, partition by col origin and carrier. I have 200 diff origins and i get 200 partitions.

image

If you have too many unique values in the col then u should not partition by that col. Instead you the bucketBy.

BucketBy restrict the number of partitions.

You see 5 datafiles they are also called as bucket.

image

Spark created 5 buckets or files, spark created hashing to move the data to corresponding files

image

Each unique key combo will create same hash value and they will land in same file , these bucket will improve join operations.

image

If these records are sorted then u can use for many other options. So BucketBy has SortBy companion.

image

image

Chapter 7 : Spark Dataframe and Dataset Transformations

Introduction to Transformations

image

What is transformations?

Data Frame: program interface of data; Use in Spark Program Table : SQL interface of your data; Use Spark SQL

image

What is Transformations?

image

Working with Data Frame Row

1. Creating new Row

Unit testing and dev uses first 2 scenarios

image

Create new cluster in databrick account, create new notebook use python lang, create a function take dataframe and return new dataframe. image

image

image

image

2. Collecting Rows

for testing u need the rows image

3. Unstructured Rows

If you do not have proper schema then take each row and create the columnar structure.

image

Apache webserver log files. Pattern in there but its not a semi-structure file its a dump completely un-structured.

So u have only rows not a col. So u have to parse the datafile and extract cols.

data frame row has one value string. You cant use aggregation and grouping.

image

You can use regex, since it follows apache log format 11 elements in the given string

image

I get ip , date etc

image

You can do some analysis by using col transformation

image

image

Data Frame Columns

  • column String
  • column Object

image

How to create col expressions?

String or SQL 2. Col Object expression

combine all col create new col image select method uses col object expression image

using col object

image

Creating UDF (user defined function)

Gender col to standarize M,F.

image

  1. Create custom function parse gender

image

  1. Register your udf in the python function

image

  1. Use in expression your udf by reference returned by register withColumn apply trnaformation on one col only

For using in sql function u need to register in spark sql function ,

image

SQL udf goes in catalog the previous will not go

image

for SQL see catalog has entry

image

Misc Transformations

image

Chapter 8 : Aggregations in Apache Spark

image

image

Simple Aggregation

List of invoice numbers are repeating.

image

summarise the complete dataframe return single row

image

summarize group them by country and give total sales image

spark sql can use to grouping aggreagation

image

Grouping Aggregate using agg method

convert date to date field use inbuilt function to create week of the year image

Windowing Aggregate

compute week by week running total for each country.

image

Running total restart for each country. We break DF by country We order each partition by the week number Lastly compute the total using the sliding window of the record Sliding window of records: Take total and extend the window to 2 records, then take sum extend the window to 3 records and take sum again it goes for entire partition. It goes for all partition like this. Window starts at 1st record and include all untill the current record

image

Sliding window of records: Take total and extend the window to 2 records, then take sum extend the window to 3 records and take sum again it goes for entire partition. It goes for all partition like this. Window starts at 1st record and include all untill the current records

image

unboundedPreceding take all rows from the current row.

image

Chapter 9 : Spark Dataframe Joins

Dataframe inner join

start with left df and pass the right df then pass the expression, inner join is default join,

image

spark will take 1 row from left and evaluate with all rows from the right df to match it goes and does for all.

image

spark engine create col id for each col name , so u see duplicate col name

image

so rename the col before joining is good practice

image

Rename qty to reorder_qty temp using withColumnRenamed method, then do join and then drop that temp col after the join

image

we have another prod_id which is duplicate, so after join if u select prod_id then it will error out. so lets drop the product id after the join . So remove prod_id from product df.

image

Dataframe outer join

Product id -9 was missed from the output since non of the records had 9 in the right side. So outer join will help you to include your all records from left side and then show.

image

sort by order id, for right side no data for left side so it put null for those.

image

how to remove nulls from the report, instead showing the product id if no name found, for the list price is missing show unit price.

With coalesce function will take the first non-null value from the list of the col take prod_name if that is null show the prod id.

image

Join Internals and Shuffle

Shuffle sort join match with hadoop map reduce shuffle sort. image

Suppose u have 2 data frames for each you have 3 partitions.

image

and want to join them using key,

u have 3 executers each exec has 2 partitions.

image

can you perform join? No

You can't perform join because the same key is present in other exectuters. You cant join unless you have same key in the same executer. Unless u have all same key in same executer

image

Join is done in 2 stages

first stage: each executer will map the record using join and send to the exchange to be picked up by the spark framework spark will send them to reduce exchange. You can select how many shuffle partition u want to create I have 3 node so I did 3 partitions. kv 1-10 goes to 1st reducer, 11-20 kv goes to 2nd reducer and so on. All this transfer of data from map exchange to reduce exchange corresponding partition is called shuffle/sort it can choke ur cluster network and slow down ur join performance, tuning ur join operation is all about optimizing suffle operation . image

2nd stage: join the similar keys in each excuters after shuffle sort. image

image

check spark UI

Read file in 2 jobs and 1 job for join image

Shuffle write happens in 3 parallel task and shuffle read also happens from 3 parrallel task

image

Eliminating a shuffle is to remove the join

Optimizing Joins

Understand your data and then optimize your process.

Large df can not fit in memory of single exec small df can fit in memory of single exec

image

Common mistakes while join image

Filterout un-necessary data So we know shuffle / sort operation send all of your data from map phase to reduce phase costly operation, so filtering out your data will cut down the amount of data.

image

For filter you want to know your data set. suppose u have global online sales which has all city and u have new york sells only u want to do join operation. First filter other cities from global online sales to reduce size before doing join operation. You can have a tiny table with country city mapping that u can join with global sales and get rid of other countries data.

image

Look for all possible opportunities to reduce the size of the dataframe before doing join. common mistake people do is they first do join and then they cleanup that creates huge performance issue. Do aggregate before joinning smaller df faster join and smaller shuffle sort.

What is the maximum possible parallelism? # of executers.

image

If u have 400 shuffle sort but u have only 200 unique key then u have only 200 shuffle partitions only. Each node process one partition.

image

If u have less unique key then increase the key cardinality.

Larger partition will take long time smaller partitionwill join quick but entire join will not finish till larger is done. So increase the partition key carinality. image

image

Large to Large (always go to shuffle sort join), for large to small can take broadcast join

image

Large to Small df join (Broadcast join)

Sales df is huge millions of records, 100 partitions, 100 executers Products df is 200 records 2mb and stored in 1 exectuter with 1 partition

image

200 unique keys so 200 shuffle partition will happen

image

u send millions of records over network to shuffle partition

image

U dispatch product table over network to the sales partitions . U broadcast the product table df to sales df

image

Now u removed shuffle/sort join

Spark will automatically use broadcast for smaller and larget merge but u can force if u know ur df. Our examample created shuffle join image

image

Now use broadcast: right side table is small plz broad cast this to left table and avoid shuffle join.

image image Check shuffle join is removed. image

Shuffle join for large to large join can not be avoided but u can use Bucketing.

Joining Bucketing

You bucket your data before joining hence avoid shuffle sort merge join I have code that is doing shuffle join they are very large cant fit in one exec so broadcast can not be used. image Use spark bucketing and bucket both ur df , it also does shuffle sort but only once then if u do as many joining of df u wont see shuffle sort.

How many buckets u want to create? If you have number of buckets, number of partitions is critical dicision, suppose u have 100GB and 10GB per executer if u create 10 buckets. Suppose here i did 3 buckets with id key

image

My Db created with 2 tables with 3 buckets these are ready for joining operations

image

image

No shuffle only plain sort merge join

image

Appendix

Problem Explanation: In a distributed computing environment with Apache Spark, you have 500 executors and you've set up 400 shuffle partitions. However, you're only dealing with 200 unique keys. This setup indicates a low key cardinality which might result in inefficient utilization of your cluster resources because many executors might remain idle, especially during shuffle operations where tasks are divided based on the number of keys.

Increasing Key Cardinality: Key cardinality refers to the number of unique keys that data is partitioned or grouped by in distributed computing tasks. Higher cardinality can help utilize cluster resources more effectively as it increases parallelism during data shuffle and processing stages.

Concrete Example: If you're processing data where the key is "city" and only have 200 unique cities, you can increase the cardinality by combining "city" with another column like "year" (assuming the data spans multiple years). This increases the number of unique keys (e.g., "New York_2021", "New York_2022"), allowing more executors to be actively used because there are more groups for Spark to process in parallel.

Solution Approach:

  1. Identify Combinable Fields: Look for columns in your dataset that can logically be combined with your current key to increase uniqueness.
  2. Modify Key Generation: Adjust your data transformation to use these combined keys. For example, instead of just city, use city_year as the key.
  3. Repartition: Increase the number of partitions to a number closer to or matching the number of executors to distribute processing load more evenly. Use Spark’s repartition function like df.repartition("city_year") to ensure the data is distributed based on the new key.
  4. Monitor and Optimize: Observe the performance and continue to adjust the cardinality and partition settings based on actual resource utilization and processing times.

By increasing the key cardinality, you maximize the use of your available executors, which can lead to improved performance by ensuring work is evenly distributed across your cluster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment