You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This document outlines the structured content of my learning journey through Apache Spark, covering various topics from installation to advanced data processing techniques.
What is spark?
Distributed computing platform we create program and execute them on a spark cluster.
Spark Processing Model
When spark-submit happens then Application and containers will create. Spark Engine will ask container from underline cluster manager to start driver process. Driver will ask more container for execution process and start slave executers.
A1 and A2 are 2 independent application they have their own driver and executers.
Spark Local and Execution Modes
How does spark run on local machine?
Spark runs locally in multi-thread execution or Single threaded application then u have only Driver and no execution then driver does everything. If u have 3 Thread then you have one thread for driver and 2 executer thread.
If u quit or log off client machine then exectuer will die in absense of driver, so for long running job this is not good.
If u quit or log off client machine then exectuer will die in absense of driver, so for long running job this is not good.
cluster mode = driver runs on cluster and good for long running jobs
Execution model - When to use what?
Cluster Manager
local = IDE or local machine
YARN = on real cluster 1/on-premise 2/on-cloud
Spark cluster in GCP Dataproc cluster
Google Dataproc is on-demand YARN cluster that comes with Spark Setup.
Yarn is going to collect all the log files from all nodes.
Spark is made to work with Log4J python logger is not compatible.
Spark Session
Each program will create driver and driver will create executer to run program.
Create a spark session object is the first thing in Spark. Look Spark Session as your Driver.
Spark session created and available as variable spark
In program you have to crate spark session. It is a singleton object and it should be one for one application.
Spark is highly configurable (appName, master, config for other config settings)
Lets create spark session and stop the session finally
Spark Configuration
All config can be overwritten by application code.
When to use which config, leave env variable and spark-defaults for cluster admin. So use command line or application code.
Spark Data Frame
Spark data frame is a distributed structure.
Data Frame reader can read any type of files, its the abstraction over data file types.
Data Frame reader needs to know the schema of csv file so it reads correctly, user option method for spark data frame.
Spark data frame has fixed schema and database table 2 dimentional.
You have to mention data types of the col of the csv to infer correct data type or you can use inferSchema option to intelligent guess for data type.
Spark Data Frame Partitions
Spark Data Frame is distributed structure that helps to do distributed data processing.
Your data file will be Stored as partition across the distribution node.
Each data frame is in memory rep of data partitions stored in HDFS (Hadoop or GCS or S3)
We ask driver to read the data file, driver reach out to cluster manager (YARN) and storage manager (HDFS / Hadoop) to get the data partitions and create a logical in-memory structure as data frame but the data did not load immedietely.
Spark session can be configured to create n executers with desired memory and cpu config. You configured 5 Executers, 10GB memory 5CPU cores for each executers. Driver will reach out to the Cluster Manager to ask for Containers. Once containers are allocated driver will start the executer, each executers is a JVM process with 5 CPU core and 10GB memory.
Now Driver is ready to distribute work to these executers. Driver will assign some data frame partitions to each JVM core.
These executer core will load their own data partition to work on.
Spark will also try to minimize the network bandwidth, while assign partition to the executer it will assign the partition to the closest executers.
Spark Transformation and Actions
Data frame is internally a distributed data structure composed of bunch of partitions. While developing u work data frame as 2D table. Spark data frame is immutable data structure.
Spark Transformation
You give instruction to driver to do any changes on data frame called transformations (filter, where), driver will work with executers to acheive.
We are creating a graph of operation using transformation
So we create a DAG of activities of 2 types transformation and action
Transformation like where works on input DF and creates new output DF the original input DF remains unchanged.
Narrow vs Wide Dependencies Transformations
Narrow Transformations: A transformation can be performed in a single partition like where and they dont depend on other partition to produce correct result.
Valid result after combining each partition result.
Wide Dependencies Transformations: A transformation requires other partitions to create correct results like groupBy, orderBy, joint, distinct .
GroupBy breaks the partitions they need both partition to work on. We dont get correct result by combining each partition groupby.
How to fix it? We can repartition the grouped data, combine all the partitions and create the new partition such that all the data of the same group remains in the single partition, this is called shuffle/sort operation, spark will internally does shuffle/sort operation you do not need to do this.
Lazy Evaluations:
We use the builder pattern to create the DAG of transformation that is terminated and triggered by an anction then driver will create execution plan executed by executer.
Until action is called DAG will not execute
Transformatin is lazy and Action happens immedietly
Spark jobs, stages and tasks
read data file , infer schema, result is data frame, apply chain of ops result is data frame, then show result
Spark will take code, create low level spark code and prepare the execution plan.
show method is utility method to show the data.
When u load a csv file u get single partition, so u want to repartition it in 2 it is a transformation ops, so u get 2 partition, then next operations will execute on those 2 partitions. We forced repartitions.
countBy will do shuffle/sort and we dont know how many partitions will be created after shuffle/sort. You can use Spark App Config to control the shuffle/sort partition.
So the whole transformation will cause only 2 partitions.
Spark UI is available only during the life of the application to see the plan.
Spark UI has jobs, each job is broken down to stages, We have 5 Stage here.
Each stage is broken down to Tasks. Stage 3 and 4 has 2 task rest has 1 task.
Spark aplication will generate internal job, stage and task.
Spark Execution Plan
If you just load the csv data file into data frame is an action.
Each spark action is converted in spark job. This job is caused by csv action.
Every job has 1 stage and each stage has 1 task
Each stage has 1 task.
Each stage has the DAG of internal operations. The sequence is the compiled code generated by spark shown in the DAG. Spark is a compiler that compiles high level code to low level code.
spark.read : Reading the physical file is action that will create one more job.
Now we have 2 jobs,
2 stages and 2 tasks total.
Reading a file, infering schema, are 2 internal jobs.
Break it into stages separated by shuffle operation..
3 stages
1stage worked on 1 partition and created 2 partition write in internal buffer called exchange
2nd stage run 2 tasks since 2 partition, finished by group by write to exchange
3rd stage will read from exchange and work on 2 partition tasks.
Finally the collect runs in driver to collect the final results
RDD APIs: is at the core, use it to develop the application, most challenging to learn and use. Spark community recommending to not use RDD.
Catalyst Optimizer: Code pases to the catalyst to decide how to create executed plan
Spark SQL, Dataframe API and Dataset API: Use spark sql everywhere. A sophisticated data pipeline use dataframe api. The dataset api are the language native api in scala and java. They are strongly typed objects such as scala or java. They can't be used in python.
RDD is raw you have to read the file type, compress, and do lot of other work. DataFrame is SQL like operations on top of RDD make it easy.
Spark SQL over Dataframe
Spark SQL Engine
Spark SQL Engine is the compiler that optimizes code and generates code
If you use Spark SQL api then u get:
Runtime error will come during Analysis phase
Logical optimizatin such as predicate push down, boolean expression simplification, constant failing.
Physical plan is RDD ops
Code Generate to generate efficient JAVA byte code to run on each executer.
Summary
Use Spark SQL but for data crunching problem use Dataframe and do not use RDD. Spark SQL Engine automatically make catalyst to work on optimization.
Data sources required for spark can be 2 types 1. Internal and 2. External datasources.
All external data sources must be read in dataframe format to process in spark.
So you use 2 approaches. 1/ bring external data to the datalake 2/ use spark datasource api to directly connect with external system.
1/ data from external data sources to your datalake and store in distributed storage by using data integration tools like kafka connect, Informatica, AWS Glue etc.
Batch Operations: Option 1: Bringing data to the datalake then process in spark
Data Stream Operations : Option 2: Connect spark data processing directly to external data source.
Batch Processing Requirement for external data sources
Use data integration tool to bring data to the distributed data storage and then start processing it.
In batch processing why do we bring data to data lake first then proces, why do we do 2 step process?
Modularity: Before processing data, Bringing data correctly and efficiently to the lake is a complex goal. We want to decouple the ingestion from processing to improve manageability.
Load Balance: Source system was designed for different business purpose and it has defined capacity. If you connect spark workload to these system then u have to replan the sourcedata capacity.
Security: You have to redesign the security aspect of the datasource if you will want to connect spark workload directly
Flexibility: Spark is an excellent tool for data processing , it is not designed for data ingestion.
Well designed applications will not directly connect to datasoures.
For Internal Data Sources
It is a distributed storage could be HDFS or GSC or S3. Data stored in the sytem in various datafile format. Spark SQL Tables and Delta Lakes has meta data that is stored outside the data-file, so they are not file sources category.
Data sink
Data Sink is the final destination of the processed data. Load data from internal or external source, then handle it using spark API once processing is complete save the outcome to external or internal system.
Working with data source is all about reading data, working with sink is all about writing the data.
Spark DataFrameReader API
Read a CSV file using dataframe reader
Output: All data type is string.
Set option to infer schema , all numeric is integer but date is still a string
Use datfile format that comes with schema or explicitly define schema to fix all dat types.
Read JSON
JSON file will always infer schema out of the box.
integer is bigint and date is string
To fix reading JSON or CSV schema issue you have 2 options.
Read Parquet (Recommended and default file format in Apache Spark)
Parquet data file has correct schema so during read u dont need to infer.
It loaded in same schema as input file
Data Frame Schema
Schema inference not working for JSON and CSV file.
Schema design with DDL or program
Program style, no need infer option , date type, string type ,integer type when type does not match it should fail at run time.
DDL format to define schema
Spark DataFrameWriter API
dataframe writer can write to internal or external
Data frame is partition, when u write for each partition it creates a file. You can re-partition to write to the sink by using repartition or use partitionBy(col1,col2) using key this is effecient, or bucketBy(n,col1,col2) is only available on Spark managed storage.
SortBy used by bucketBy
Max record per file limit records per file protects to create huge files
If you want to work with Avro u have to add avro jar file in the spark cluster.
Create one or more avro files in destination depending on data frame partitions
We have 3 partitions but the record count in rest partitions are 0
Now lets repartition forcefully to 5
Partition data gives 2 benefit partition elimination for certain read operation.
Partition equally all data will not give elimination benefit so use partitionBy col method.
So now u have dir per carier and for each carrier u get origin as sub dir
This way you can search fastly your data reader can directly search a perticular carrier and its origin
Spark Databases and Tables
Spark is DB engine you can create database, table and views. It has table data and table metadata. Bydefault its parquet file.
Catalog Metastore
Spark has in-memory catalog persisted per spark session. Metastore has info about table and its schema, Schema, table name, col name,partitions, the physical location where data is resides. Spark catalog destroyes when session ends therefore, spark decided to re-uses Apache Hive metastore.
Spark Tables
Managed Table - spark manage both metadata and data. It creates metadata and store the data in the spark sql datawarehouse dir. You can't change the location at runtime.
Unmanaged Table - Designed for temp mapping existing data and using in spark table. for external table, spark will create the catalog metastore and sotre in the metastore. When u create the unmanaged table you have to define the data dir location for the table. Managed table must be stored inside the internal store. but for managed table u have flexibility to store outside external datasource.
Suppose you have data external dir location , u want to use spark sql on that data, spark sql engine not know about this data.
so u create unmanged table and map the external data to spark metadata, now u can run spark sql on the external data
If you drop unmanaged data table it just removes the metadata only. But in managed table it will delete both data and metadata.
Managed tables are preffered they give bucketing and sorting on internal tables.
Working with Spark SQL Tables
Creating Managed Table needs persistent metastore , Spark depends on Hive metastore so u need Apache Hive.
Creating Managed Table and save the data in the parquet formate internally.
Spark is Database and managed tables can be used by SQL compliance tools like using SQL expression over JDBC/ODBC connectors, 3rd party tools Talend, Tableau, PowerBI.
However, plain datafiles like parquet, avro, csv and json will not be accessable by JDBC/ODBC interface.
saveAsTable will create file as a managed table inside the default database.
Lets save flights avro file as table
Spark Data warehouse created with database and data table inside table u have datafiles in snappy parquet format
Lets run the project
You have persistent Metastore_db is common location for all spark applications, it can be changed by config file your admin does this
Now lets create partitioned table, partition by col origin and carrier. I have 200 diff origins and i get 200 partitions.
If you have too many unique values in the col then u should not partition by that col. Instead you the bucketBy.
BucketBy restrict the number of partitions.
You see 5 datafiles they are also called as bucket.
Spark created 5 buckets or files, spark created hashing to move the data to corresponding files
Each unique key combo will create same hash value and they will land in same file , these bucket will improve join operations.
If these records are sorted then u can use for many other options. So BucketBy has SortBy companion.
summarise the complete dataframe return single row
summarize group them by country and give total sales
spark sql can use to grouping aggreagation
Grouping Aggregate using agg method
convert date to date field use inbuilt function to create week of the year
Windowing Aggregate
compute week by week running total for each country.
Running total restart for each country.
We break DF by country
We order each partition by the week number
Lastly compute the total using the sliding window of the record
Sliding window of records: Take total and extend the window to 2 records, then take sum extend the window to 3 records and take sum again it goes for entire partition. It goes for all partition like this. Window starts at 1st record and include all untill the current record
Sliding window of records: Take total and extend the window to 2 records, then take sum extend the window to 3 records and take sum again it goes for entire partition. It goes for all partition like this. Window starts at 1st record and include all untill the current records
unboundedPreceding take all rows from the current row.
start with left df and pass the right df then pass the expression, inner join is default join,
spark will take 1 row from left and evaluate with all rows from the right df to match it goes and does for all.
spark engine create col id for each col name , so u see duplicate col name
so rename the col before joining is good practice
Rename qty to reorder_qty temp using withColumnRenamed method, then do join and then drop that temp col after the join
we have another prod_id which is duplicate, so after join if u select prod_id then it will error out. so lets drop the product id after the join . So remove prod_id from product df.
Dataframe outer join
Product id -9 was missed from the output since non of the records had 9 in the right side. So outer join will help you to include your all records from left side and then show.
sort by order id, for right side no data for left side so it put null for those.
how to remove nulls from the report, instead showing the product id if no name found, for the list price is missing show unit price.
With coalesce function will take the first non-null value from the list of the col take prod_name if that is null show the prod id.
Join Internals and Shuffle
Shuffle sort join match with hadoop map reduce shuffle sort.
Suppose u have 2 data frames for each you have 3 partitions.
and want to join them using key,
u have 3 executers each exec has 2 partitions.
can you perform join? No
You can't perform join because the same key is present in other exectuters. You cant join unless you have same key in the same executer. Unless u have all same key in same executer
Join is done in 2 stages
first stage: each executer will map the record using join and send to the exchange to be picked up by the spark framework
spark will send them to reduce exchange. You can select how many shuffle partition u want to create I have 3 node so I did 3 partitions. kv 1-10 goes to 1st reducer, 11-20 kv goes to 2nd reducer and so on. All this transfer of data from map exchange to reduce exchange corresponding partition is called shuffle/sort it can choke ur cluster network and slow down ur join performance, tuning ur join operation is all about optimizing suffle operation .
2nd stage: join the similar keys in each excuters after shuffle sort.
check spark UI
Read file in 2 jobs and 1 job for join
Shuffle write happens in 3 parallel task and shuffle read also happens from 3 parrallel task
Eliminating a shuffle is to remove the join
Optimizing Joins
Understand your data and then optimize your process.
Large df can not fit in memory of single exec
small df can fit in memory of single exec
Common mistakes while join
Filterout un-necessary data
So we know shuffle / sort operation send all of your data from map phase to reduce phase costly operation, so filtering out your data will cut down the amount of data.
For filter you want to know your data set. suppose u have global online sales which has all city and u have new york sells only u want to do join operation. First filter other cities from global online sales to reduce size before doing join operation. You can have a tiny table with country city mapping that u can join with global sales and get rid of other countries data.
Look for all possible opportunities to reduce the size of the dataframe before doing join. common mistake people do is they first do join and then they cleanup that creates huge performance issue. Do aggregate before joinning smaller df faster join and smaller shuffle sort.
What is the maximum possible parallelism? # of executers.
If u have 400 shuffle sort but u have only 200 unique key then u have only 200 shuffle partitions only. Each node process one partition.
If u have less unique key then increase the key cardinality.
Larger partition will take long time smaller partitionwill join quick but entire join will not finish till larger is done.
So increase the partition key carinality.
Large to Large (always go to shuffle sort join), for large to small can take broadcast join
Large to Small df join (Broadcast join)
Sales df is huge millions of records, 100 partitions, 100 executers
Products df is 200 records 2mb and stored in 1 exectuter with 1 partition
200 unique keys so 200 shuffle partition will happen
u send millions of records over network to shuffle partition
U dispatch product table over network to the sales partitions . U broadcast the product table df to sales df
Now u removed shuffle/sort join
Spark will automatically use broadcast for smaller and larget merge but u can force if u know ur df.
Our examample created shuffle join
Now use broadcast: right side table is small plz broad cast this to left table and avoid shuffle join.
Check shuffle join is removed.
Shuffle join for large to large join can not be avoided but u can use Bucketing.
Joining Bucketing
You bucket your data before joining hence avoid shuffle sort merge join
I have code that is doing shuffle join they are very large cant fit in one exec so broadcast can not be used.
Use spark bucketing and bucket both ur df , it also does shuffle sort but only once then if u do as many joining of df u wont see shuffle sort.
How many buckets u want to create? If you have number of buckets, number of partitions is critical dicision, suppose u have 100GB and 10GB per executer if u create 10 buckets. Suppose here i did 3 buckets with id key
My Db created with 2 tables with 3 buckets these are ready for joining operations
No shuffle only plain sort merge join
Appendix
Problem Explanation:
In a distributed computing environment with Apache Spark, you have 500 executors and you've set up 400 shuffle partitions. However, you're only dealing with 200 unique keys. This setup indicates a low key cardinality which might result in inefficient utilization of your cluster resources because many executors might remain idle, especially during shuffle operations where tasks are divided based on the number of keys.
Increasing Key Cardinality:
Key cardinality refers to the number of unique keys that data is partitioned or grouped by in distributed computing tasks. Higher cardinality can help utilize cluster resources more effectively as it increases parallelism during data shuffle and processing stages.
Concrete Example:
If you're processing data where the key is "city" and only have 200 unique cities, you can increase the cardinality by combining "city" with another column like "year" (assuming the data spans multiple years). This increases the number of unique keys (e.g., "New York_2021", "New York_2022"), allowing more executors to be actively used because there are more groups for Spark to process in parallel.
Solution Approach:
Identify Combinable Fields: Look for columns in your dataset that can logically be combined with your current key to increase uniqueness.
Modify Key Generation: Adjust your data transformation to use these combined keys. For example, instead of just city, use city_year as the key.
Repartition: Increase the number of partitions to a number closer to or matching the number of executors to distribute processing load more evenly. Use Spark’s repartition function like df.repartition("city_year") to ensure the data is distributed based on the new key.
Monitor and Optimize: Observe the performance and continue to adjust the cardinality and partition settings based on actual resource utilization and processing times.
By increasing the key cardinality, you maximize the use of your available executors, which can lead to improved performance by ensuring work is evenly distributed across your cluster.