Skip to content

Instantly share code, notes, and snippets.

@sunzy
Created July 11, 2014 08:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sunzy/95952870f99b872a81fe to your computer and use it in GitHub Desktop.
Save sunzy/95952870f99b872a81fe to your computer and use it in GitHub Desktop.
dpark日志清洗
import json
import dpark
from dpark import optParser
from Common import CoConn
from Common import DbConn
optParser.add_option('-d','--date')
all_channel = DbConn().get_all_channelId()
class de_class(object):
'''切割
'''
@staticmethod
def pmt_mapper(lines):
'''分析处理
'''
for line in lines:
try:
ba = line.split('\t')
r_dict = json.loads(b_date)
for item in r_dict['events']:
try:
item.update(r_dict['client'])
c_dict = item
if c_dict['channel'] not in all_channel:
yield 'badlog', line
if c_dict['currevent'] == 'start':
rdd_s = '%s\t'*2 + '%s\n'
word = rdd_s % (ng_time, c_dict['appid'], c_dict['channel'])
yield 'start', word
except:
yield 'badlog', line
except:
yield 'error', line
@staticmethod
def de_method(hour):
'''分析开始
'''
key = CoConn().get_app_id()
path = CoConn().get_nginx_path()
mapper = dpark.textFile(glob.glob(path + '%s/*/*%s*' % (key,hour)), splitSize=4 << 20).glom().flatMap(de_class.pmt_mapper)
mapper = mapper.groupByKey(numSplits=8)
mapper1 = mapper.lookup('start')
r_path = CoConn().get_qiege_path()
if mapper1:
rdd1 = dpark.parallelize(mapper1, numSlices=100)
rdd1.saveAsTextFile('%sstart_%s' % (r_path, hour),overwrite=True)
if __name__ == '__main__':
options, args = optParser.parse_args()
hour = options.date
de_class.de_method(hour)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment