Skip to content

Instantly share code, notes, and snippets.

@muxueqz
Created February 10, 2013 07:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save muxueqz/4748764 to your computer and use it in GitHub Desktop.
Save muxueqz/4748764 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
#-*- coding: utf-8 -*-
"""
统计每UID及UID各WORD中的pv, uv
"""
import dpark
#这个文件在 https://gist.github.com/4748763
file_path = 'test_word.log'
WORD, UID, UV = range(3)
words = dpark.csvFile(file_path, splitSize=32<<20, dialect='excel-tab')
def xmap(line):
l = []
value = ({line[UV]}, 1)
#这个key(UID+WORD)的uv数据在reduceByKey后会和下一个key(UID)的uv错误合并
key = (line[UID], line[WORD])
l.append((key, value))
key = (line[UID])
l.append((key, value))
return l
user_words = words.flatMap( xmap )
def xreduce(line, value):
# print line, value
result = list(line)
result[0] |= value[0]
result[1] += value[1]
return result
for i in user_words.collect(): print i
user_words = user_words.reduceByKey( xreduce )
print user_words.collect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment