Last active
August 29, 2015 14:05
-
-
Save dongguosheng/d9f4926ef972062ddfdf to your computer and use it in GitHub Desktop.
Hadoop Streaming计算tfidf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
import sys | |
def read_input(file): | |
''' | |
read input tf mapper-reducer file | |
class:Queries\tword_1:val_1,word_2:val_2,..... | |
''' | |
for line in file: | |
yield line.split('\t') | |
def str_to_dict(term_val_pairs): | |
''' | |
make the 'word:val,.....' str to dict | |
''' | |
term_val_dict = {} | |
term_val_list = term_val_pairs.split(',') | |
for term_val in term_val_list: | |
term,val = term_val.split(':') | |
try: | |
if term in term_val_dict: | |
term_val_dict[term] += int(val) | |
else: | |
term_val_dict[term] = int(val) | |
except ValueError: | |
pass | |
return term_val_dict | |
def main(separator='\t'): | |
data = read_input(sys.stdin) | |
for qt_record in data: | |
key,term_val_pairs = qt_record | |
term_val_dict = str_to_dict(term_val_pairs.rstrip().split('+')[-1]) | |
for term in term_val_dict.keys(): | |
print '%s%s%d' % (term,separator,1) | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from itertools import groupby | |
from operator import itemgetter | |
import sys | |
def read_mapper_output(file, separator='\t'): | |
for line in file: | |
yield line.rstrip().split(separator, 1) | |
def main(separator='\t'): | |
''' | |
word count reducer | |
''' | |
data = read_mapper_output(sys.stdin, separator=separator) | |
for current_word, group in groupby(data, itemgetter(0)): | |
try: | |
total_count = sum(int(count) for current_word, count in group) | |
if total_count >= int(sys.argv[1]): | |
print "%s%s%d" % (current_word, separator, total_count) | |
except ValueError: | |
pass | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""A more advanced Mapper, using Python iterators and generators.""" | |
import sys | |
def read_session(input): | |
''' | |
Read a session block, return a tuple, (classes, queries, contents). | |
''' | |
for session in input: | |
session_query_list = [] | |
session_content_list = [] | |
class_list = [] | |
for line in session.rstrip().split(','): | |
items = line.rstrip().split('\t') | |
if len(items) == 2 or items[2] == '-': | |
session_query_list.append(items[1]) | |
else: | |
session_query_list.append(items[1]) | |
session_content_list.append(items[2]) | |
if 'UNKNOWN' in items[0] or 'TEST' in items[0]: | |
pass | |
else: | |
if '|' in items[0]: | |
class_list.extend(items[0].split(' | ')) | |
else: | |
class_list.append(items[0]) | |
# handle classes | |
if len(class_list) == 0: | |
class_list.append('CLASS=TEST') | |
else: | |
class_list = list(set(class_list)) | |
class_list.sort() | |
# add query | |
session_content_list.extend(session_query_list) | |
# querys remove duplicates and sort | |
session_query_list = list(set(session_query_list)) | |
session_query_list.sort() | |
yield (class_list, session_query_list, session_content_list) | |
def get_term_count_dict(segment_list, n): | |
''' | |
Return a dict: key => term(n-gram); value => term count. | |
''' | |
rs_dict = {} | |
for segment in segment_list: | |
words = segment.split() | |
for i in range(1, n + 1): | |
for j in range(len(words) - i + 1): | |
word = ' '.join(words[j : i + j]) | |
if word in rs_dict: | |
rs_dict[word] += 1 | |
else: | |
rs_dict[word] = 1 | |
return rs_dict | |
def gen_format_values(term_count_dict, separator=','): | |
''' | |
Return string with format (word:term count, word:term count, ...). | |
''' | |
return separator.join([word + ':' + str(count) for word, count in term_count_dict.items()]) | |
def main(separator='\t'): | |
# input comes from STDIN (standard input) | |
session_tuples = read_session(sys.stdin) | |
for session_tuple in session_tuples: | |
# write the results to STDOUT (standard output); | |
# what we output here will be the input for the | |
# Reduce step, i.e. the input for reducer.py | |
# | |
# tab-delimited; the trivial word count is 1 | |
n = int(sys.argv[1]) | |
value_str = gen_format_values(get_term_count_dict(session_tuple[2], n)) | |
print '%s%s%s' % (','.join(session_tuple[1]), separator, '|'.join(session_tuple[0]) + '+' + value_str) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""A more advanced Reducer, using Python iterators and generators.""" | |
from itertools import groupby | |
from operator import itemgetter | |
import sys | |
def read_mapper_output(input, separator='\t'): | |
for line in input: | |
yield line.rstrip().split(separator, 1) | |
def gen_format_values(term_count_dict, separator=','): | |
''' | |
Return string with format (word:term count, word:term count, ...). | |
''' | |
return separator.join([word + ':' + str(count) for word, count in term_count_dict.items()]) | |
def resume_dict(value_str): | |
''' | |
Resume string with format (word:term count, word:term count, ...) to dict. | |
''' | |
rs_dict = {} | |
for item in value_str.rstrip().split(','): | |
term_count = item.split(':') | |
rs_dict[term_count[0]] = int(term_count[1]) | |
return rs_dict | |
def merge_dict(term_count_dict, temp_dict): | |
for key_temp, value_temp in temp_dict.items(): | |
if key_temp in term_count_dict: | |
term_count_dict[key_temp] += value_temp | |
else: | |
term_count_dict[key_temp] = value_temp | |
return term_count_dict | |
def main(separator='\t'): | |
# input comes from STDIN (standard input) | |
data = read_mapper_output(sys.stdin, separator=separator) | |
# groupby groups multiple word-count pairs by word, | |
# and creates an iterator that returns consecutive keys and their group: | |
# current_word - string containing a word (the key) | |
# group - iterator yielding all ["<current_word>", "<count>"] items | |
for doc_key, group in groupby(data, itemgetter(0)): | |
try: | |
term_count_dict = {} | |
class_list = [] | |
for current_word, value in group: | |
value_list = value.rstrip().split('+') | |
if len(value_list) < 2: | |
pass | |
else: | |
temp_dict = resume_dict(value_list[-1]) | |
term_count_dict = merge_dict(term_count_dict, temp_dict) | |
class_list.extend(value_list[0].split('|')) | |
class_list = list(set(class_list)) | |
# for test | |
# if len(doc_key) == 0: | |
# print doc_key, group | |
print "%s%s%s" % (doc_key, separator, '|'.join(class_list) + '+' + gen_format_values(term_count_dict)) | |
except ValueError: | |
# count was not a number, so silently discard this item | |
pass | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""A more advanced Mapper, using Python iterators and generators.""" | |
import sys | |
import math | |
from operator import itemgetter | |
def read_doc(input): | |
''' | |
Read a session block, return a tuple, (queries, contents). | |
''' | |
for line in input: | |
yield line.rstrip().split('\t') | |
def resume_dict(value_str): | |
''' | |
Resume string with format (word:term count, word:term count, ...) to dict. | |
''' | |
rs_dict = {} | |
for item in value_str.rstrip().split(','): | |
term_count = item.split(':') | |
try: | |
rs_dict[term_count[0]] = int(term_count[1]) | |
# print item | |
except IndexError, e: | |
sys.stderr.write(item) | |
sys.stderr.write(' IndexError.\n') | |
return rs_dict | |
def cal_tfidf(tf_dict): | |
rs_dict = {} | |
# term_count = float(len(tf_dict)) | |
for word, tf in tf_dict.items(): | |
if word in idf_dict: | |
rs_dict[index_dict[word]] = idf_dict[word] * tf # / term_count | |
return sorted(rs_dict.iteritems(), key=itemgetter(0)) | |
def gen_format_values(tfidf_list, separator=' '): | |
''' | |
Return string with format (word:term count, word:term count, ...). | |
''' | |
return separator.join([str(index) + ':' + str(tfidf) for index, tfidf in tfidf_list]) | |
# initialize a global df dict, query table and class label | |
# print 'initialize start.' | |
# class_dict = {'VIDEO' : 1, 'NOVEL' : 2, 'GAME' : 3, 'TRAVEL' : 4, 'LOTTERY' : 5, 'ZIPCODE' : 6, 'OTHER' : 7, 'TEST' : 8} | |
total_doc = 2634607.0 | |
idf_dict = {} | |
index_dict = {} | |
index = 1 | |
with open('df_filtered.txt') as f: | |
for line in f: | |
temp_list = line.rstrip().split('\t') | |
idf_dict[temp_list[0]] = math.log(total_doc / (float(temp_list[1]) + 1)) | |
index_dict[temp_list[0]] = index | |
index += 1 | |
# print 'initialize complete.' | |
def main(separator='\t'): | |
# input comes from STDIN (standard input) | |
docs = read_doc(sys.stdin) | |
for doc in docs: | |
# write the results to STDOUT (standard output); | |
# what we output here will be the input for the | |
# Reduce step, i.e. the input for reducer.py | |
# | |
# tab-delimited; the trivial word count is 1 | |
tf_dict = resume_dict(doc[1].rstrip().split('+')[-1]) | |
# if len(doc[0]) == 0: | |
# print doc | |
print '%s%s%s' % (doc[0], separator, doc[1].rstrip().split('+')[0] + '+' + gen_format_values(cal_tfidf(tf_dict))) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
import sys | |
def read_input(file): | |
''' | |
read input tf mapper-reducer file | |
class:Queries\tword_1:val_1,word_2:val_2,..... | |
''' | |
for line in file: | |
yield line | |
def main(separator='\t'): | |
data = read_input(sys.stdin) | |
for doc_line in data: | |
print '%s%s%d' % ('Total count',separator,1) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from itertools import groupby | |
from operator import itemgetter | |
import sys | |
def read_mapper_output(file, separator='\t'): | |
for line in file: | |
yield line.rstrip().split(separator, 1) | |
def main(separator='\t'): | |
''' | |
word count reducer | |
''' | |
data = read_mapper_output(sys.stdin, separator=separator) | |
for current_word, group in groupby(data, itemgetter(0)): | |
try: | |
total_count = sum(int(count) for current_word, count in group) | |
print "%s%s%d" % (current_word, separator, total_count) | |
except ValueError: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment