Skip to content

Instantly share code, notes, and snippets.

@royguo
Created November 3, 2012 02:08
Show Gist options
  • Save royguo/4005514 to your computer and use it in GitHub Desktop.
Save royguo/4005514 to your computer and use it in GitHub Desktop.
mrjob
#!/usr/python/env python
if __name__ == '__main__':
user_items = []
items = []
with open('u.data') as f:
for line in f:
user_items.append(line.split('\t'))
with open('u.item') as f:
for line in f:
items.append(line.split('|'))
print 'user_items[0] = ', user_items[0]
print 'items[0] = ', items[0]
items_hash = {}
for i in items:
items_hash[i[0]] = i[1]
print 'items_hash[1] = ', items_hash['1']
for ui in user_items:
ui[1] = items_hash[ui[1]]
print 'user_items[0] = ', user_items[0]
with open('ratings.csv','w') as f:
for ui in user_items:
f.write(ui[0] + '|' + ui[1] + '|' + ui[2] + '\n')
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")
class MRWordFreqCount(MRJob):
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combiner(self, word, counts):
yield word, sum(counts)
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordFreqCount.run()
["Star Trek VI: The Undiscovered Country (1991)", "Star Trek: Generations (1994)"] [0.31762191045234545, 93]
["Star Trek VI: The Undiscovered Country (1991)", "Star Trek: The Motion Picture (1979)"] [0.4632318663542742, 96]
["Star Trek VI: The Undiscovered Country (1991)", "Star Trek: The Wrath of Khan (1982)"] [0.44969297939248015, 148]
["Star Trek VI: The Undiscovered Country (1991)", "Star Wars (1977)"] [0.08625580124837125, 151]
["Star Trek VI: The Undiscovered Country (1991)", "Stargate (1994)"] [0.30431878197511564, 94]
["Star Trek VI: The Undiscovered Country (1991)", "Stars Fell on Henrietta, The (1995)"] [1.0, 2]
["Star Trek VI: The Undiscovered Country (1991)", "Starship Troopers (1997)"] [0.14969005091372395, 59]
["Star Trek VI: The Undiscovered Country (1991)", "Steal Big, Steal Little (1995)"] [0.74535599249993, 5]
["Star Trek VI: The Undiscovered Country (1991)", "Stealing Beauty (1996)"] [-0.4879500364742666, 10]
["Star Trek VI: The Undiscovered Country (1991)", "Steel (1997)"] [1.0, 2]
["Star Trek VI: The Undiscovered Country (1991)", "Stephen King's The Langoliers (1995)"] [-0.11470786693528087, 16]
#!/usr/bin/env python
# coding=utf-8
from mrjob.job import MRJob
class Step1(MRJob):
"""
第一步是聚合单个用户的下的所有评分数据
格式为:user_id, (item_count, rating_sum, [(item_id,rating)...])
"""
def group_by_user_rating(self, key, line):
"""
该mapper输出为:
17 70,3
35 21,1
49 19,2
49 21,1
"""
user_id, item_id, rating = line.split('|')
yield user_id, (item_id, float(rating))
def count_ratings_users_freq(self, user_id, values):
"""
该reducer输出为:
49 (3,7,[19,2 21,1 70,4])
"""
item_count = 0
item_sum = 0
final = []
for item_id, rating in values:
item_count += 1
item_sum += rating
final.append((item_id, rating))
yield user_id, (item_count, item_sum, final)
def steps(self):
return [self.mr(mapper=self.group_by_user_rating,
reducer=self.count_ratings_users_freq),]
if __name__ == '__main__':
Step1.run()
#!/usr/bin/env python
#! coding=utf-8
from mrjob.job import MRJob
from itertools import combinations
from math import sqrt
class Step2(MRJob):
def pairwise_items(self, user_id, values):
'''
本mapper使用step1的输出作为输入,把user_id丢弃掉不再使用
输出结果为 (item_1,item2),(rating_1,rating_2)
这里combinations(iterable,number)的作用是求某个集合的组合,
如combinations([1,2,3,4],2)就是在集合种找出任两个数的组合。
这个mapper是整个任务的性能瓶颈,这是因为combinations函数生成的数据
比较多,这么多的零散数据依次写回磁盘,IO操作过于频繁,可以用写一个
Combiner来紧接着mapper做一些聚合操作(和Reducer相同),由Combiner
把数据写回磁盘,该Combiner也可以用C库来实现,由Python调用。
'''
# 这里由于step1是分开的,把数据dump到文件result1.csv中,所以读取的时候
# 需要按照字符串处理,如果step1和step2在同一个job内完成,则直接可以去掉
# 这一行代码,在同一个job内完成参见steps函数的使用说明。
values = eval(values.split('\t')[1])
item_count, item_sum, ratings = values
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), (item1[1], item2[1])
def calculate_similarity(self, pair_key, lines):
'''
(Movie A,Movie B)作为Key,(A rating,B rating)作为该reducer的输入,
每一次输入属于同一个用户,所有当两个key相同时,代表他们两个都看了A和B,所以
按照这些所有都看了A、B的人的评分作为向量,计算A、B的皮尔逊系数。
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y in co_ratings:
sum_xx += item_x * item_x
sum_yy += item_y * item_y
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
n += 1
similarity = self.normalized_correlation(n, sum_xy, sum_x, sum_y, sum_xx, sum_yy)
yield (item_xname, item_yname), (similarity, n)
def steps(self):
return [self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity),]
def normalized_correlation(self,n,sum_xy,sum_x,sum_y,sum_xx,sum_yy):
numerator = ( n*sum_xy - sum_x*sum_y )
denominator = sqrt(n*sum_xx - sum_x*sum_x) * sqrt(n*sum_yy - sum_y*sum_y)
similarity = numerator / denominator
return similarity
if __name__ == '__main__':
Step2.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment