Skip to content

Instantly share code, notes, and snippets.

@2shou
Last active August 8, 2019 13:30
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save 2shou/bd0940f7988774e2822d to your computer and use it in GitHub Desktop.
Save 2shou/bd0940f7988774e2822d to your computer and use it in GitHub Desktop.
log-extract script with Spark
# coding: utf-8
# command example:
# $ spark-submit spark_log_extract.py \
# --name test \
# --notblankkeys dn,stm,ev_ac,pg_url \
# --filterregex ".*(=ac_pl\`|=ac_dl\`).*" \
# --usegzip \
# /path/to/source \
# /path/to/atom \
# dn,stm,ev_ac,v_title,v_uri,pg_url
import sys
import re
from pyspark import SparkContext, SparkConf
from optparse import OptionParser
def process(line, keys=[], not_blank_keys=[]):
fields = line.split('`')
output_lst = [''] * len(keys)
try:
for field in fields:
key, val = field.split('=', 1)
if key in keys:
output_lst[keys.index(key)] = val
for not_blank_key in not_blank_keys:
if not output_lst[keys.index(not_blank_key)]:
return
except:
return
return '\t'.join(output_lst)
def parse_args():
usage = "usage: %prog [options] input_path output_path keys"
parser = OptionParser(usage=usage)
parser.add_option(
'--name', default='SparkLogExtract', help='name of Spark job')
parser.add_option('--notblankkeys', default=[],
help='field keys must be not blank with "," delimiter')
parser.add_option(
'--filterregex', help='line filter regex', default='.*')
parser.add_option('--usegzip', action='store_true',
help='output gzip or normal file', default=False)
(options, args) = parser.parse_args()
if len(args) != 3:
parser.error('incorrect number of arguments')
return options, args
def main():
options, args = parse_args()
keys = [k.strip() for k in args[2].split(',')]
not_blank_keys = [k.strip() for k in options.notblankkeys.split(',')]
conf = SparkConf().setAppName(options.name)
if options.usegzip:
conf.set(
'spark.hadoop.mapreduce.output.fileoutputformat.compress', 'true')
conf.set('spark.hadoop.mapreduce.output.fileoutputformat.compress.codec',
'org.apache.hadoop.io.compress.GzipCodec')
conf.set(
'spark.mapreduce.output.fileoutputformat.compress.type', 'BLOCK')
sc = SparkContext(conf=(conf))
sc.textFile(args[0]).filter(lambda x: re.match(options.filterregex, x)).map(
lambda x: process(x, keys, not_blank_keys)).filter(lambda x: x).saveAsTextFile(args[1])
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment