Skip to content

Instantly share code, notes, and snippets.

@rkempter
Created February 20, 2015 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkempter/785f4637eba82aac7d03 to your computer and use it in GitHub Desktop.
Save rkempter/785f4637eba82aac7d03 to your computer and use it in GitHub Desktop.
Generate top N lists for each user
from pyspark import SparkContext
from heapq import heappush, heappop
import re
logFilePath = '/Users/rkempter/Downloads/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv'
def get_top_N_artists(tuple, top_N=10):
"""
Returns the top N artists
"""
key, values = tuple
heap = []
for artist_id, count in values:
heappush(heap, (-count, artist_id))
top_artists = []
for index in range(top_N):
artist_tuple = heappop(heap)
top_artists.append(artist_tuple)
return "%s: %s\n" % (key, ",".join(top_artists))
def get_user_artist_tuple(row):
"""
Generate tuples of ((user_id, artist_id), 1)
"""
row_elements = re.split(r'\t', row)
user_id = row_elements[0]
artist_id = row_elements[2]
return ((user_id, artist_id), 1)
spark_context = SparkContext("local[4]", "Get users top 10 artists")
logFile = spark_context.textFile(logFilePath)
# Generate counts for each (user_id, artist_id) - pair
counts = logFile.map(get_user_artist_tuple).filter(lambda row: row[0][1]).reduceByKey(lambda a,b: a + b)
# Generate (user_id, (artist_id, count)) -> group by key, apply get_top_N_artists for each group
user_counts = counts.map(lambda tuple: (tuple[0][0], (tuple[0][1], tuple[1]))).groupByKey().map(get_top_N_artists)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment