Skip to content

Instantly share code, notes, and snippets.

@dongguosheng
Last active August 29, 2015 14:05
Show Gist options
  • Save dongguosheng/d9f4926ef972062ddfdf to your computer and use it in GitHub Desktop.
Save dongguosheng/d9f4926ef972062ddfdf to your computer and use it in GitHub Desktop.
Hadoop Streaming计算tfidf
#! /usr/bin/python
import sys
def read_input(file):
'''
read input tf mapper-reducer file
class:Queries\tword_1:val_1,word_2:val_2,.....
'''
for line in file:
yield line.split('\t')
def str_to_dict(term_val_pairs):
'''
make the 'word:val,.....' str to dict
'''
term_val_dict = {}
term_val_list = term_val_pairs.split(',')
for term_val in term_val_list:
term,val = term_val.split(':')
try:
if term in term_val_dict:
term_val_dict[term] += int(val)
else:
term_val_dict[term] = int(val)
except ValueError:
pass
return term_val_dict
def main(separator='\t'):
data = read_input(sys.stdin)
for qt_record in data:
key,term_val_pairs = qt_record
term_val_dict = str_to_dict(term_val_pairs.rstrip().split('+')[-1])
for term in term_val_dict.keys():
print '%s%s%d' % (term,separator,1)
if __name__ == '__main__':
main()
#!/usr/bin/env python
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
'''
word count reducer
'''
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
if total_count >= int(sys.argv[1]):
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
pass
if __name__ == "__main__":
main()
#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
def read_session(input):
'''
Read a session block, return a tuple, (classes, queries, contents).
'''
for session in input:
session_query_list = []
session_content_list = []
class_list = []
for line in session.rstrip().split(','):
items = line.rstrip().split('\t')
if len(items) == 2 or items[2] == '-':
session_query_list.append(items[1])
else:
session_query_list.append(items[1])
session_content_list.append(items[2])
if 'UNKNOWN' in items[0] or 'TEST' in items[0]:
pass
else:
if '|' in items[0]:
class_list.extend(items[0].split(' | '))
else:
class_list.append(items[0])
# handle classes
if len(class_list) == 0:
class_list.append('CLASS=TEST')
else:
class_list = list(set(class_list))
class_list.sort()
# add query
session_content_list.extend(session_query_list)
# querys remove duplicates and sort
session_query_list = list(set(session_query_list))
session_query_list.sort()
yield (class_list, session_query_list, session_content_list)
def get_term_count_dict(segment_list, n):
'''
Return a dict: key => term(n-gram); value => term count.
'''
rs_dict = {}
for segment in segment_list:
words = segment.split()
for i in range(1, n + 1):
for j in range(len(words) - i + 1):
word = ' '.join(words[j : i + j])
if word in rs_dict:
rs_dict[word] += 1
else:
rs_dict[word] = 1
return rs_dict
def gen_format_values(term_count_dict, separator=','):
'''
Return string with format (word:term count, word:term count, ...).
'''
return separator.join([word + ':' + str(count) for word, count in term_count_dict.items()])
def main(separator='\t'):
# input comes from STDIN (standard input)
session_tuples = read_session(sys.stdin)
for session_tuple in session_tuples:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
n = int(sys.argv[1])
value_str = gen_format_values(get_term_count_dict(session_tuple[2], n))
print '%s%s%s' % (','.join(session_tuple[1]), separator, '|'.join(session_tuple[0]) + '+' + value_str)
if __name__ == "__main__":
main()
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(input, separator='\t'):
for line in input:
yield line.rstrip().split(separator, 1)
def gen_format_values(term_count_dict, separator=','):
'''
Return string with format (word:term count, word:term count, ...).
'''
return separator.join([word + ':' + str(count) for word, count in term_count_dict.items()])
def resume_dict(value_str):
'''
Resume string with format (word:term count, word:term count, ...) to dict.
'''
rs_dict = {}
for item in value_str.rstrip().split(','):
term_count = item.split(':')
rs_dict[term_count[0]] = int(term_count[1])
return rs_dict
def merge_dict(term_count_dict, temp_dict):
for key_temp, value_temp in temp_dict.items():
if key_temp in term_count_dict:
term_count_dict[key_temp] += value_temp
else:
term_count_dict[key_temp] = value_temp
return term_count_dict
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
for doc_key, group in groupby(data, itemgetter(0)):
try:
term_count_dict = {}
class_list = []
for current_word, value in group:
value_list = value.rstrip().split('+')
if len(value_list) < 2:
pass
else:
temp_dict = resume_dict(value_list[-1])
term_count_dict = merge_dict(term_count_dict, temp_dict)
class_list.extend(value_list[0].split('|'))
class_list = list(set(class_list))
# for test
# if len(doc_key) == 0:
# print doc_key, group
print "%s%s%s" % (doc_key, separator, '|'.join(class_list) + '+' + gen_format_values(term_count_dict))
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()
#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""
import sys
import math
from operator import itemgetter
def read_doc(input):
'''
Read a session block, return a tuple, (queries, contents).
'''
for line in input:
yield line.rstrip().split('\t')
def resume_dict(value_str):
'''
Resume string with format (word:term count, word:term count, ...) to dict.
'''
rs_dict = {}
for item in value_str.rstrip().split(','):
term_count = item.split(':')
try:
rs_dict[term_count[0]] = int(term_count[1])
# print item
except IndexError, e:
sys.stderr.write(item)
sys.stderr.write(' IndexError.\n')
return rs_dict
def cal_tfidf(tf_dict):
rs_dict = {}
# term_count = float(len(tf_dict))
for word, tf in tf_dict.items():
if word in idf_dict:
rs_dict[index_dict[word]] = idf_dict[word] * tf # / term_count
return sorted(rs_dict.iteritems(), key=itemgetter(0))
def gen_format_values(tfidf_list, separator=' '):
'''
Return string with format (word:term count, word:term count, ...).
'''
return separator.join([str(index) + ':' + str(tfidf) for index, tfidf in tfidf_list])
# initialize a global df dict, query table and class label
# print 'initialize start.'
# class_dict = {'VIDEO' : 1, 'NOVEL' : 2, 'GAME' : 3, 'TRAVEL' : 4, 'LOTTERY' : 5, 'ZIPCODE' : 6, 'OTHER' : 7, 'TEST' : 8}
total_doc = 2634607.0
idf_dict = {}
index_dict = {}
index = 1
with open('df_filtered.txt') as f:
for line in f:
temp_list = line.rstrip().split('\t')
idf_dict[temp_list[0]] = math.log(total_doc / (float(temp_list[1]) + 1))
index_dict[temp_list[0]] = index
index += 1
# print 'initialize complete.'
def main(separator='\t'):
# input comes from STDIN (standard input)
docs = read_doc(sys.stdin)
for doc in docs:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
tf_dict = resume_dict(doc[1].rstrip().split('+')[-1])
# if len(doc[0]) == 0:
# print doc
print '%s%s%s' % (doc[0], separator, doc[1].rstrip().split('+')[0] + '+' + gen_format_values(cal_tfidf(tf_dict)))
if __name__ == "__main__":
main()
#! /usr/bin/python
import sys
def read_input(file):
'''
read input tf mapper-reducer file
class:Queries\tword_1:val_1,word_2:val_2,.....
'''
for line in file:
yield line
def main(separator='\t'):
data = read_input(sys.stdin)
for doc_line in data:
print '%s%s%d' % ('Total count',separator,1)
if __name__ == '__main__':
main()
#!/usr/bin/env python
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
'''
word count reducer
'''
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment