This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Finding info of Ace Ventura films | |
df.where(F.lower(F.col('title')).like("%ace%")).show() | |
# Another way to do this | |
df.where("title like '%ace%'").show() | |
# Using where clause in sequence | |
df.where(df.year != '1998').where(df.avg_ratings >= 6.0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP TABLE if exists d_date; | |
CREATE TABLE d_date | |
( | |
date_dim_id INT NOT NULL, | |
date_actual DATE NOT NULL, | |
epoch BIGINT NOT NULL, | |
day_suffix VARCHAR(4) NOT NULL, | |
day_name VARCHAR(9) NOT NULL, | |
day_of_week INT NOT NULL, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark | |
from pyspark import SparkContext | |
from pyspark.sql import SparkSession | |
from pyspark.sql import SQLContext | |
# create a SparkSession instance with the name moviedb with Hive support enabled | |
# https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html | |
spark = SparkSession.builder.appName("moviedb").enableHiveSupport().getOrCreate() | |
# create a SparkContext instance which allows the Spark Application to access |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# set the file_path variable in the beginning of the file | |
# or if your Spark application interacts with other applications, parameterize it | |
file_path = '/Users/kovid-r/datasets/moviedb/movies_metadata.csv' | |
# method 1 for reading a CSV file | |
df = spark.read.csv(file_path, header=True) | |
# method 2 for reading a CSV file | |
df = spark.read.format(csv_plugin).options(header='true', inferSchema='true').load(file_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reading a csv file - all of these methods work the same for all the different formats | |
df = spark.read.csv(csv_file_path) | |
df = spark.read.format('csv').options(header=True,inferSchema=True).load(csv_file_path) | |
df = spark.read.format('csv').options(header='True',inferSchema='True').load(csv_file_path) | |
df = spark.read.format('CSV').options(header='true',inferSchema='true').load(csv_file_path) | |
df = spark.read.csv(file_path, header=True) | |
df = spark.read.csv(file_path, header='true') | |
# Reading a json file | |
df = spark.read.json(json_file_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import Row | |
# populate two rows with random values | |
f1 = Row(original_title='Eroica', budget='13393950', year=1992) | |
f2 = Row(original_title='Night World', budget='1255930', year=1998) | |
# store the two rows in an array and pass it to Spark | |
films = [f1, f2] | |
df = spark.createDataFrame(films) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create a column with the default value = 'xyz' | |
df = df.withColumn('new_column', F.lit('xyz')) | |
# Create a column with default value as null | |
df = df.withColumn('new_column', F.lit(None).cast(StringType())) | |
# Create a column using an existing column | |
df = df.withColumn('new_column', 1.4 * F.col('existing_column')) | |
# Another example using the MovieLens database |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Changing column name with withColumnRenamed feature | |
df = df.withColumnRenamed('existing_column_name', 'new_column_name') | |
# Changing column with selectExpr (you'll have to select all the columns here) | |
df = df.selectExpr("existing_column_name AS existing_1", "new_column_name AS new_1") | |
# Changing column with sparksql functions - col and alias | |
from pyspark.sql.functions import col | |
df = df.select(col("existing_column_name").alias("existing_1"), col("new_column_name").alias("new_1")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Write file to disk in parquet format partitioned by year - overwrite any existing file | |
df.write.partitionBy('year').format('parquet').mode('overwrite').save(parquet_file_path) | |
# Write file to disk in parquet format partitioned by year - append to an existing file | |
df.write.partitionBy('year').format('parquet').mode('append').save(parquet_file_path) | |
# Write data frame as a Hive table | |
df.write.bucketBy(10, "year").sortBy("avg_ratings").saveAsTable("films_bucketed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Remove a column from a DataFrame | |
df.drop('this_column') | |
# Remove multiple columns in a go | |
drop_columns = ['this_column', 'that_column'] | |
df.select([col for col in df.columns if column not in drop_columns]) |
NewerOlder