-
-
Save jadianes/85c31c72dc96b036372e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Blueprint | |
main = Blueprint('main', __name__) | |
import json | |
from engine import RecommendationEngine | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
from flask import Flask, request | |
@main.route("/<int:user_id>/ratings/top/<int:count>", methods=["GET"]) | |
def top_ratings(user_id, count): | |
logger.debug("User %s TOP ratings requested", user_id) | |
top_ratings = recommendation_engine.get_top_ratings(user_id,count) | |
return json.dumps(top_ratings) | |
@main.route("/<int:user_id>/ratings/<int:movie_id>", methods=["GET"]) | |
def movie_ratings(user_id, movie_id): | |
logger.debug("User %s rating requested for movie %s", user_id, movie_id) | |
ratings = recommendation_engine.get_ratings_for_movie_ids(user_id, [movie_id]) | |
return json.dumps(ratings) | |
@main.route("/<int:user_id>/ratings", methods = ["POST"]) | |
def add_ratings(user_id): | |
# get the ratings from the Flask POST request object | |
ratings_list = request.form.keys()[0].strip().split("\n") | |
ratings_list = map(lambda x: x.split(","), ratings_list) | |
# create a list with the format required by the negine (user_id, movie_id, rating) | |
ratings = map(lambda x: (user_id, int(x[0]), float(x[1])), ratings_list) | |
# add them to the model using then engine API | |
recommendation_engine.add_ratings(ratings) | |
return json.dumps(ratings) | |
def create_app(spark_context, dataset_path): | |
global recommendation_engine | |
recommendation_engine = RecommendationEngine(spark_context, dataset_path) | |
app = Flask(__name__) | |
app.register_blueprint(main) | |
return app |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pyspark.mllib.recommendation import ALS | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def get_counts_and_averages(ID_and_ratings_tuple): | |
"""Given a tuple (movieID, ratings_iterable) | |
returns (movieID, (ratings_count, ratings_avg)) | |
""" | |
nratings = len(ID_and_ratings_tuple[1]) | |
return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings) | |
class RecommendationEngine: | |
"""A movie recommendation engine | |
""" | |
def __count_and_average_ratings(self): | |
"""Updates the movies ratings counts from | |
the current data self.ratings_RDD | |
""" | |
logger.info("Counting movie ratings...") | |
movie_ID_with_ratings_RDD = self.ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey() | |
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages) | |
self.movies_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0])) | |
def __train_model(self): | |
"""Train the ALS model with the current dataset | |
""" | |
logger.info("Training the ALS model...") | |
self.model = ALS.train(self.ratings_RDD, self.rank, seed=self.seed, | |
iterations=self.iterations, lambda_=self.regularization_parameter) | |
logger.info("ALS model built!") | |
def __predict_ratings(self, user_and_movie_RDD): | |
"""Gets predictions for a given (userID, movieID) formatted RDD | |
Returns: an RDD with format (movieTitle, movieRating, numRatings) | |
""" | |
predicted_RDD = self.model.predictAll(user_and_movie_RDD) | |
predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating)) | |
predicted_rating_title_and_count_RDD = \ | |
predicted_rating_RDD.join(self.movies_titles_RDD).join(self.movies_rating_counts_RDD) | |
predicted_rating_title_and_count_RDD = \ | |
predicted_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1])) | |
return predicted_rating_title_and_count_RDD | |
def add_ratings(self, ratings): | |
"""Add additional movie ratings in the format (user_id, movie_id, rating) | |
""" | |
# Convert ratings to an RDD | |
new_ratings_RDD = self.sc.parallelize(ratings) | |
# Add new ratings to the existing ones | |
self.ratings_RDD = self.ratings_RDD.union(new_ratings_RDD) | |
# Re-compute movie ratings count | |
self.__count_and_average_ratings() | |
# Re-train the ALS model with the new ratings | |
self.__train_model() | |
return ratings | |
def get_ratings_for_movie_ids(self, user_id, movie_ids): | |
"""Given a user_id and a list of movie_ids, predict ratings for them | |
""" | |
requested_movies_RDD = self.sc.parallelize(movie_ids).map(lambda x: (user_id, x)) | |
# Get predicted ratings | |
ratings = self.__predict_ratings(requested_movies_RDD).collect() | |
return ratings | |
def get_top_ratings(self, user_id, movies_count): | |
"""Recommends up to movies_count top unrated movies to user_id | |
""" | |
# Get pairs of (userID, movieID) for user_id unrated movies | |
user_unrated_movies_RDD = self.movies_RDD.filter(lambda rating: not rating[1]==user_id).map(lambda x: (user_id, x[0])) | |
# Get predicted ratings | |
ratings = self.__predict_ratings(user_unrated_movies_RDD).filter(lambda r: r[2]>=25).takeOrdered(movies_count, key=lambda x: -x[1]) | |
return ratings | |
def __init__(self, sc, dataset_path): | |
"""Init the recommendation engine given a Spark context and a dataset path | |
""" | |
logger.info("Starting up the Recommendation Engine: ") | |
self.sc = sc | |
# Load ratings data for later use | |
logger.info("Loading Ratings data...") | |
ratings_file_path = os.path.join(dataset_path, 'ratings.csv') | |
ratings_raw_RDD = self.sc.textFile(ratings_file_path) | |
ratings_raw_data_header = ratings_raw_RDD.take(1)[0] | |
self.ratings_RDD = ratings_raw_RDD.filter(lambda line: line!=ratings_raw_data_header)\ | |
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache() | |
# Load movies data for later use | |
logger.info("Loading Movies data...") | |
movies_file_path = os.path.join(dataset_path, 'movies.csv') | |
movies_raw_RDD = self.sc.textFile(movies_file_path) | |
movies_raw_data_header = movies_raw_RDD.take(1)[0] | |
self.movies_RDD = movies_raw_RDD.filter(lambda line: line!=movies_raw_data_header)\ | |
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache() | |
self.movies_titles_RDD = self.movies_RDD.map(lambda x: (int(x[0]),x[1])).cache() | |
# Pre-calculate movies ratings counts | |
self.__count_and_average_ratings() | |
# Train the model | |
self.rank = 8 | |
self.seed = 5L | |
self.iterations = 10 | |
self.regularization_parameter = 0.1 | |
self.__train_model() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time, sys, cherrypy, os | |
from paste.translogger import TransLogger | |
from app import create_app | |
from pyspark import SparkContext, SparkConf | |
def init_spark_context(): | |
# load spark context | |
conf = SparkConf().setAppName("movie_recommendation-server") | |
# IMPORTANT: pass aditional Python modules to each worker | |
sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py']) | |
return sc | |
def run_server(app): | |
# Enable WSGI access logging via Paste | |
app_logged = TransLogger(app) | |
# Mount the WSGI callable object (app) on the root directory | |
cherrypy.tree.graft(app_logged, '/') | |
# Set the configuration of the web server | |
cherrypy.config.update({ | |
'engine.autoreload.on': True, | |
'log.screen': True, | |
'server.socket_port': 5432, | |
'server.socket_host': '0.0.0.0' | |
}) | |
# Start the CherryPy WSGI web server | |
cherrypy.engine.start() | |
cherrypy.engine.block() | |
if __name__ == "__main__": | |
# Init spark context and load libraries | |
sc = init_spark_context() | |
dataset_path = os.path.join('datasets', 'ml-latest') | |
app = create_app(sc, dataset_path) | |
# start web server | |
run_server(app) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
~/spark-1.3.1-bin-hadoop2.6/bin/spark-submit --master spark://169.254.206.2:7077 --total-executor-cores 14 --executor-memory 6g server.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
260,9 | |
1,8 | |
16,7 | |
25,8 | |
32,9 | |
335,4 | |
379,3 | |
296,7 | |
858,10 | |
50,8 |
Hi Ulrich,
I guess the problem you are facing is, for the desired route, the method is POST, where as you might be attempting it by GET (using your browser), hence try using a REST client, ie: Postman, or cUrl for terminal.
Hi Jose,
Line #80 in engine.py should be:
user_unrated_movies_RDD = self.ratings_RDD.filter(lambda rating: not rating[1]==user_id).map(lambda x: (user_id, x[0]))
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Jose,
Thanks for the great tutorial.
I have managed to get it all running no problem apart from the add request handler.
For instance if I want to add some ratings for user 1 I enter http://0.0.0.0:5432/1/ratings
but I get Method Not Allowed, can you explain what am I doing wrong please.
Thank you
Ulrich