Skip to content

Instantly share code, notes, and snippets.

@alfredrichards
Created August 22, 2019 15:53
Show Gist options
  • Save alfredrichards/9fd3b605b19a1b897871babe43b981ed to your computer and use it in GitHub Desktop.
Save alfredrichards/9fd3b605b19a1b897871babe43b981ed to your computer and use it in GitHub Desktop.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower
import sys
def main(argv):
spark = SparkSession.builder\
.appName('spark-movies-app')\
.getOrCreate()
MOVIES_DATASET_PATH = argv[1]
RATINGS_DATASET_PATH = argv[2]
OUTPUT_FOLDER = argv[3]
movies_df = spark.read\
.option('header', True)\
.option('inferSchema', True)\
.format('csv')\
.load(MOVIES_DATASET_PATH)
ratings_df = spark.read\
.option('header', True)\
.option('inferSchema', True)\
.format('csv')\
.load(RATINGS_DATASET_PATH)\
.createOrReplaceTempView('ratings')
avg_ratings = spark.sql('''
select movieId, count(userId) as no_of_users_rated, ROUND(AVG(rating), 2) as rating from ratings group by movieId
''')
movie_ratings = movies_df\
.join(avg_ratings, movies_df[0] == avg_ratings[0])\
.drop(avg_ratings.movieId)
movie_ratings.sort(col('rating').desc(), col('no_of_users_rated').desc()).write.csv(OUTPUT_FOLDER)
if __name__ == '__main__':
main(sys.argv)
@bala93kumar
Copy link

soo Much informative thank you Alfred

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment