Created
July 11, 2014 08:23
-
-
Save sunzy/95952870f99b872a81fe to your computer and use it in GitHub Desktop.
dpark日志清洗
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import dpark | |
from dpark import optParser | |
from Common import CoConn | |
from Common import DbConn | |
optParser.add_option('-d','--date') | |
all_channel = DbConn().get_all_channelId() | |
class de_class(object): | |
'''切割 | |
''' | |
@staticmethod | |
def pmt_mapper(lines): | |
'''分析处理 | |
''' | |
for line in lines: | |
try: | |
ba = line.split('\t') | |
r_dict = json.loads(b_date) | |
for item in r_dict['events']: | |
try: | |
item.update(r_dict['client']) | |
c_dict = item | |
if c_dict['channel'] not in all_channel: | |
yield 'badlog', line | |
if c_dict['currevent'] == 'start': | |
rdd_s = '%s\t'*2 + '%s\n' | |
word = rdd_s % (ng_time, c_dict['appid'], c_dict['channel']) | |
yield 'start', word | |
except: | |
yield 'badlog', line | |
except: | |
yield 'error', line | |
@staticmethod | |
def de_method(hour): | |
'''分析开始 | |
''' | |
key = CoConn().get_app_id() | |
path = CoConn().get_nginx_path() | |
mapper = dpark.textFile(glob.glob(path + '%s/*/*%s*' % (key,hour)), splitSize=4 << 20).glom().flatMap(de_class.pmt_mapper) | |
mapper = mapper.groupByKey(numSplits=8) | |
mapper1 = mapper.lookup('start') | |
r_path = CoConn().get_qiege_path() | |
if mapper1: | |
rdd1 = dpark.parallelize(mapper1, numSlices=100) | |
rdd1.saveAsTextFile('%sstart_%s' % (r_path, hour),overwrite=True) | |
if __name__ == '__main__': | |
options, args = optParser.parse_args() | |
hour = options.date | |
de_class.de_method(hour) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment