Skip to content

Instantly share code, notes, and snippets.

View kovid-r's full-sized avatar
🏠
Working from home

Kovid Rathee kovid-r

🏠
Working from home
View GitHub Profile
@kovid-r
kovid-r / pyspark_cheatsheet_read_all.py
Last active October 11, 2022 04:49
Reading Different File Formats PySpark Cheatsheet
# Reading a csv file - all of these methods work the same for all the different formats
df = spark.read.csv(csv_file_path)
df = spark.read.format('csv').options(header=True,inferSchema=True).load(csv_file_path)
df = spark.read.format('csv').options(header='True',inferSchema='True').load(csv_file_path)
df = spark.read.format('CSV').options(header='true',inferSchema='true').load(csv_file_path)
df = spark.read.csv(file_path, header=True)
df = spark.read.csv(file_path, header='true')
# Reading a json file
df = spark.read.json(json_file_path)
@kovid-r
kovid-r / pyspark_cheatsheet_create_df.py
Last active October 11, 2022 04:49
Creating DataFrames using Row construct
from pyspark.sql import Row
# populate two rows with random values
f1 = Row(original_title='Eroica', budget='13393950', year=1992)
f2 = Row(original_title='Night World', budget='1255930', year=1998)
# store the two rows in an array and pass it to Spark
films = [f1, f2]
df = spark.createDataFrame(films)
@kovid-r
kovid-r / pyspark_cheatsheet_create_new_columnsdf.py
Last active October 11, 2022 04:49
Create new columns in existing table PySpark Cheatsheet
# Create a column with the default value = 'xyz'
df = df.withColumn('new_column', F.lit('xyz'))
# Create a column with default value as null
df = df.withColumn('new_column', F.lit(None).cast(StringType()))
# Create a column using an existing column
df = df.withColumn('new_column', 1.4 * F.col('existing_column'))
# Another example using the MovieLens database
@kovid-r
kovid-r / pyspark_cheatsheet_change_column_names.py
Created June 11, 2020 06:58
Different methods for Changing column name PySpark Cheatsheet
# Changing column name with withColumnRenamed feature
df = df.withColumnRenamed('existing_column_name', 'new_column_name')
# Changing column with selectExpr (you'll have to select all the columns here)
df = df.selectExpr("existing_column_name AS existing_1", "new_column_name AS new_1")
# Changing column with sparksql functions - col and alias
from pyspark.sql.functions import col
df = df.select(col("existing_column_name").alias("existing_1"), col("new_column_name").alias("new_1"))
@kovid-r
kovid-r / pyspark_cheatsheet_write_parquet.py
Created June 11, 2020 10:21
Writing Files PySpark Cheatsheet
# Write file to disk in parquet format partitioned by year - overwrite any existing file
df.write.partitionBy('year').format('parquet').mode('overwrite').save(parquet_file_path)
# Write file to disk in parquet format partitioned by year - append to an existing file
df.write.partitionBy('year').format('parquet').mode('append').save(parquet_file_path)
# Write data frame as a Hive table
df.write.bucketBy(10, "year").sortBy("avg_ratings").saveAsTable("films_bucketed")
@kovid-r
kovid-r / pyspark_cheatsheet_remove_columns.py
Created June 11, 2020 10:27
Drop columns from a DataFrame PySpark Cheatsheet
# Remove a column from a DataFrame
df.drop('this_column')
# Remove multiple columns in a go
drop_columns = ['this_column', 'that_column']
df.select([col for col in df.columns if column not in drop_columns])
@kovid-r
kovid-r / pyspark_cheatsheet_between.py
Last active October 11, 2022 04:49
Filter Between PySpark Cheatsheet
# Filter movies with avg_ratings > 7.5 and < 8.2
df.filter((F.col('avg_ratings') > 7.5) & (F.col('avg_ratings') < 8.2)).show()
# Another way to do this
df.filter(df.avg_ratings.between(7.5,8.2)).show()
@kovid-r
kovid-r / pyspark_cheatsheet_between.py
Created June 11, 2020 11:16
Filter Between PySpark Cheatsheet
# Filter movies with avg_ratings > 7.5 and < 8.2
df.filter((F.col('avg_ratings') > 7.5) & (F.col('avg_ratings') < 8.2)).show()
# Another way to do this
df.filter(df.avg_ratings.between(7.5,8.2)).show()
@kovid-r
kovid-r / pyspark_cheatsheet_read_using_schema.py
Last active October 11, 2022 04:49
RDD to DataFrame using schema PySpark Cheatsheet
rdd = spark.textFile(csv_file_path)
from pyspark.sql.types import StringType, StructField, StructType, IntegerType
schema = StructType([
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
# When a new column is supposed to have nulls
df = df.withColumn('new_col_1', F.lit(None).cast(StringType()))
# When a new column is supposed to have 0 as the default value
df = df.withColumn('new_col_2', F.lit(0)))
# When a new column is supposed to be derived from two (or more) existing columns
df = df.withColumn('new_col_3', F.lit(df.some_column/df.some_other_column)))