This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reading a csv file - all of these methods work the same for all the different formats | |
df = spark.read.csv(csv_file_path) | |
df = spark.read.format('csv').options(header=True,inferSchema=True).load(csv_file_path) | |
df = spark.read.format('csv').options(header='True',inferSchema='True').load(csv_file_path) | |
df = spark.read.format('CSV').options(header='true',inferSchema='true').load(csv_file_path) | |
df = spark.read.csv(file_path, header=True) | |
df = spark.read.csv(file_path, header='true') | |
# Reading a json file | |
df = spark.read.json(json_file_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import Row | |
# populate two rows with random values | |
f1 = Row(original_title='Eroica', budget='13393950', year=1992) | |
f2 = Row(original_title='Night World', budget='1255930', year=1998) | |
# store the two rows in an array and pass it to Spark | |
films = [f1, f2] | |
df = spark.createDataFrame(films) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create a column with the default value = 'xyz' | |
df = df.withColumn('new_column', F.lit('xyz')) | |
# Create a column with default value as null | |
df = df.withColumn('new_column', F.lit(None).cast(StringType())) | |
# Create a column using an existing column | |
df = df.withColumn('new_column', 1.4 * F.col('existing_column')) | |
# Another example using the MovieLens database |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Changing column name with withColumnRenamed feature | |
df = df.withColumnRenamed('existing_column_name', 'new_column_name') | |
# Changing column with selectExpr (you'll have to select all the columns here) | |
df = df.selectExpr("existing_column_name AS existing_1", "new_column_name AS new_1") | |
# Changing column with sparksql functions - col and alias | |
from pyspark.sql.functions import col | |
df = df.select(col("existing_column_name").alias("existing_1"), col("new_column_name").alias("new_1")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Write file to disk in parquet format partitioned by year - overwrite any existing file | |
df.write.partitionBy('year').format('parquet').mode('overwrite').save(parquet_file_path) | |
# Write file to disk in parquet format partitioned by year - append to an existing file | |
df.write.partitionBy('year').format('parquet').mode('append').save(parquet_file_path) | |
# Write data frame as a Hive table | |
df.write.bucketBy(10, "year").sortBy("avg_ratings").saveAsTable("films_bucketed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Remove a column from a DataFrame | |
df.drop('this_column') | |
# Remove multiple columns in a go | |
drop_columns = ['this_column', 'that_column'] | |
df.select([col for col in df.columns if column not in drop_columns]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Filter movies with avg_ratings > 7.5 and < 8.2 | |
df.filter((F.col('avg_ratings') > 7.5) & (F.col('avg_ratings') < 8.2)).show() | |
# Another way to do this | |
df.filter(df.avg_ratings.between(7.5,8.2)).show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Filter movies with avg_ratings > 7.5 and < 8.2 | |
df.filter((F.col('avg_ratings') > 7.5) & (F.col('avg_ratings') < 8.2)).show() | |
# Another way to do this | |
df.filter(df.avg_ratings.between(7.5,8.2)).show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rdd = spark.textFile(csv_file_path) | |
from pyspark.sql.types import StringType, StructField, StructType, IntegerType | |
schema = StructType([ | |
StructField("first_name", StringType(), True), | |
StructField("last_name", StringType(), True), | |
StructField("age", IntegerType(), True) | |
]) | |
df = spark.createDataFrame(rdd, schema) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# When a new column is supposed to have nulls | |
df = df.withColumn('new_col_1', F.lit(None).cast(StringType())) | |
# When a new column is supposed to have 0 as the default value | |
df = df.withColumn('new_col_2', F.lit(0))) | |
# When a new column is supposed to be derived from two (or more) existing columns | |
df = df.withColumn('new_col_3', F.lit(df.some_column/df.some_other_column))) |