Created
November 3, 2012 02:08
-
-
Save royguo/4005514 to your computer and use it in GitHub Desktop.
mrjob
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/python/env python | |
if __name__ == '__main__': | |
user_items = [] | |
items = [] | |
with open('u.data') as f: | |
for line in f: | |
user_items.append(line.split('\t')) | |
with open('u.item') as f: | |
for line in f: | |
items.append(line.split('|')) | |
print 'user_items[0] = ', user_items[0] | |
print 'items[0] = ', items[0] | |
items_hash = {} | |
for i in items: | |
items_hash[i[0]] = i[1] | |
print 'items_hash[1] = ', items_hash['1'] | |
for ui in user_items: | |
ui[1] = items_hash[ui[1]] | |
print 'user_items[0] = ', user_items[0] | |
with open('ratings.csv','w') as f: | |
for ui in user_items: | |
f.write(ui[0] + '|' + ui[1] + '|' + ui[2] + '\n') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mrjob.job import MRJob | |
import re | |
WORD_RE = re.compile(r"[\w']+") | |
class MRWordFreqCount(MRJob): | |
def mapper(self, _, line): | |
for word in WORD_RE.findall(line): | |
yield word.lower(), 1 | |
def combiner(self, word, counts): | |
yield word, sum(counts) | |
def reducer(self, word, counts): | |
yield word, sum(counts) | |
if __name__ == '__main__': | |
MRWordFreqCount.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
["Star Trek VI: The Undiscovered Country (1991)", "Star Trek: Generations (1994)"] [0.31762191045234545, 93] | |
["Star Trek VI: The Undiscovered Country (1991)", "Star Trek: The Motion Picture (1979)"] [0.4632318663542742, 96] | |
["Star Trek VI: The Undiscovered Country (1991)", "Star Trek: The Wrath of Khan (1982)"] [0.44969297939248015, 148] | |
["Star Trek VI: The Undiscovered Country (1991)", "Star Wars (1977)"] [0.08625580124837125, 151] | |
["Star Trek VI: The Undiscovered Country (1991)", "Stargate (1994)"] [0.30431878197511564, 94] | |
["Star Trek VI: The Undiscovered Country (1991)", "Stars Fell on Henrietta, The (1995)"] [1.0, 2] | |
["Star Trek VI: The Undiscovered Country (1991)", "Starship Troopers (1997)"] [0.14969005091372395, 59] | |
["Star Trek VI: The Undiscovered Country (1991)", "Steal Big, Steal Little (1995)"] [0.74535599249993, 5] | |
["Star Trek VI: The Undiscovered Country (1991)", "Stealing Beauty (1996)"] [-0.4879500364742666, 10] | |
["Star Trek VI: The Undiscovered Country (1991)", "Steel (1997)"] [1.0, 2] | |
["Star Trek VI: The Undiscovered Country (1991)", "Stephen King's The Langoliers (1995)"] [-0.11470786693528087, 16] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
from mrjob.job import MRJob | |
class Step1(MRJob): | |
""" | |
第一步是聚合单个用户的下的所有评分数据 | |
格式为:user_id, (item_count, rating_sum, [(item_id,rating)...]) | |
""" | |
def group_by_user_rating(self, key, line): | |
""" | |
该mapper输出为: | |
17 70,3 | |
35 21,1 | |
49 19,2 | |
49 21,1 | |
""" | |
user_id, item_id, rating = line.split('|') | |
yield user_id, (item_id, float(rating)) | |
def count_ratings_users_freq(self, user_id, values): | |
""" | |
该reducer输出为: | |
49 (3,7,[19,2 21,1 70,4]) | |
""" | |
item_count = 0 | |
item_sum = 0 | |
final = [] | |
for item_id, rating in values: | |
item_count += 1 | |
item_sum += rating | |
final.append((item_id, rating)) | |
yield user_id, (item_count, item_sum, final) | |
def steps(self): | |
return [self.mr(mapper=self.group_by_user_rating, | |
reducer=self.count_ratings_users_freq),] | |
if __name__ == '__main__': | |
Step1.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#! coding=utf-8 | |
from mrjob.job import MRJob | |
from itertools import combinations | |
from math import sqrt | |
class Step2(MRJob): | |
def pairwise_items(self, user_id, values): | |
''' | |
本mapper使用step1的输出作为输入,把user_id丢弃掉不再使用 | |
输出结果为 (item_1,item2),(rating_1,rating_2) | |
这里combinations(iterable,number)的作用是求某个集合的组合, | |
如combinations([1,2,3,4],2)就是在集合种找出任两个数的组合。 | |
这个mapper是整个任务的性能瓶颈,这是因为combinations函数生成的数据 | |
比较多,这么多的零散数据依次写回磁盘,IO操作过于频繁,可以用写一个 | |
Combiner来紧接着mapper做一些聚合操作(和Reducer相同),由Combiner | |
把数据写回磁盘,该Combiner也可以用C库来实现,由Python调用。 | |
''' | |
# 这里由于step1是分开的,把数据dump到文件result1.csv中,所以读取的时候 | |
# 需要按照字符串处理,如果step1和step2在同一个job内完成,则直接可以去掉 | |
# 这一行代码,在同一个job内完成参见steps函数的使用说明。 | |
values = eval(values.split('\t')[1]) | |
item_count, item_sum, ratings = values | |
for item1, item2 in combinations(ratings, 2): | |
yield (item1[0], item2[0]), (item1[1], item2[1]) | |
def calculate_similarity(self, pair_key, lines): | |
''' | |
(Movie A,Movie B)作为Key,(A rating,B rating)作为该reducer的输入, | |
每一次输入属于同一个用户,所有当两个key相同时,代表他们两个都看了A和B,所以 | |
按照这些所有都看了A、B的人的评分作为向量,计算A、B的皮尔逊系数。 | |
''' | |
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0) | |
item_pair, co_ratings = pair_key, lines | |
item_xname, item_yname = item_pair | |
for item_x, item_y in co_ratings: | |
sum_xx += item_x * item_x | |
sum_yy += item_y * item_y | |
sum_xy += item_x * item_y | |
sum_y += item_y | |
sum_x += item_x | |
n += 1 | |
similarity = self.normalized_correlation(n, sum_xy, sum_x, sum_y, sum_xx, sum_yy) | |
yield (item_xname, item_yname), (similarity, n) | |
def steps(self): | |
return [self.mr(mapper=self.pairwise_items, | |
reducer=self.calculate_similarity),] | |
def normalized_correlation(self,n,sum_xy,sum_x,sum_y,sum_xx,sum_yy): | |
numerator = ( n*sum_xy - sum_x*sum_y ) | |
denominator = sqrt(n*sum_xx - sum_x*sum_x) * sqrt(n*sum_yy - sum_y*sum_y) | |
similarity = numerator / denominator | |
return similarity | |
if __name__ == '__main__': | |
Step2.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment