Skip to content

Instantly share code, notes, and snippets.

@studiawan
Created May 27, 2020 02:42
Show Gist options
  • Save studiawan/c9995b5620f57ed9617fea6fd67ef16a to your computer and use it in GitHub Desktop.
Save studiawan/c9995b5620f57ed9617fea6fd67ef16a to your computer and use it in GitHub Desktop.
Parallel log parser with PyParsing and multiprocessing. Read the log file in chunk.
import sys
import multiprocessing
import csv
from pyparsing import Word, alphas, Suppress, Combine, string, nums, Optional, Regex
from itertools import zip_longest
class ParallelLogParser(object):
def __init__(self, log_file):
self.log_file = log_file
self.authlog_grammar = self.__get_authlog_grammar()
@staticmethod
def __get_authlog_grammar():
ints = Word(nums)
# timestamp
month = Word(string.ascii_uppercase, string.ascii_lowercase, exact=3)
day = ints
hour = Combine(ints + ":" + ints + ":" + ints)
timestamp = month + day + hour
# hostname, service name, message
hostname_or_ip = Word(alphas + nums + "_" + "-" + ".")
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":")
message = Regex(".*")
# auth log grammar
authlog_grammar = timestamp.setResultsName('timestamp') + hostname_or_ip.setResultsName('hostname') + \
appname.setResultsName('application') + message.setResultsName('message')
return authlog_grammar
def __get_fields(self, log_line):
# parsing
parsed = self.authlog_grammar.parseString(log_line)
# get each field
parsed_log = dict()
parsed_log['timestamp'] = ' '.join(parsed.timestamp.asList())
parsed_log['hostname'] = parsed.hostname
parsed_log['application'] = ' '.join(parsed.application.asList())
parsed_log['message'] = parsed.message
return parsed_log
def __call__(self, log_line):
if log_line is not None:
parsed_log = self.__get_fields(log_line)
return parsed_log
def __grouper(self, n, iterable, padvalue=None):
return zip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
def parse_authlog_chunk(self):
# open log file
f_log = open(self.log_file, 'r')
chunk_size = 1000
# open csv file
f_csv = open(self.log_file + '.csv', 'wt')
writer = csv.writer(f_csv)
writer.writerow(['timestamp', 'hostname', 'application', 'message'])
# create pool for multiprocessing
cpu_total = multiprocessing.cpu_count()
pool = multiprocessing.Pool(cpu_total)
# process in chunk
for chunk in self.__grouper(chunk_size, f_log):
results = pool.map(self, chunk)
# write to csv file
for result in results:
if result is not None:
writer.writerow([result['timestamp'], result['hostname'], result['application'], result['message']])
pool.close()
pool.join()
f_log.close()
f_csv.close()
if __name__ == '__main__':
if len(sys.argv) == 2:
file_name = sys.argv[1]
parser = ParallelLogParser(file_name)
parser.parse_authlog_chunk()
else:
print('Please type a correct log file name.')
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment