Skip to content

Instantly share code, notes, and snippets.

@studiawan
Created May 27, 2020 02:41
Show Gist options
  • Save studiawan/60910b0a6e6ac926a93044b4ff48e533 to your computer and use it in GitHub Desktop.
Save studiawan/60910b0a6e6ac926a93044b4ff48e533 to your computer and use it in GitHub Desktop.
Parallel log parser with PyParsing and multiprocessing
import sys
import multiprocessing
import csv
from pyparsing import Word, alphas, Suppress, Combine, string, nums, Optional, Regex
class ParallelLogParser(object):
def __init__(self, log_file):
self.log_file = log_file
self.authlog_grammar = self.__get_authlog_grammar()
@staticmethod
def __get_authlog_grammar():
ints = Word(nums)
# timestamp
month = Word(string.ascii_uppercase, string.ascii_lowercase, exact=3)
day = ints
hour = Combine(ints + ":" + ints + ":" + ints)
timestamp = month + day + hour
# hostname, service name, message
hostname_or_ip = Word(alphas + nums + "_" + "-" + ".")
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":")
message = Regex(".*")
# auth log grammar
authlog_grammar = timestamp.setResultsName('timestamp') + hostname_or_ip.setResultsName('hostname') + \
appname.setResultsName('application') + message.setResultsName('message')
return authlog_grammar
def __get_fields(self, log_line):
# parsing
parsed = self.authlog_grammar.parseString(log_line)
# get each field
parsed_log = dict()
parsed_log['timestamp'] = ' '.join(parsed.timestamp.asList())
parsed_log['hostname'] = parsed.hostname
parsed_log['application'] = ' '.join(parsed.application.asList())
parsed_log['message'] = parsed.message
return parsed_log
def __call__(self, log_line):
parsed_log = self.__get_fields(log_line)
return parsed_log
def __save_csv(self, parsed_logs):
# open csv file
f = open(self.log_file + '.csv', 'wt')
writer = csv.writer(f)
writer.writerow(['timestamp', 'hostname', 'application', 'message'])
for result in parsed_logs:
writer.writerow([result['timestamp'], result['hostname'], result['application'], result['message']])
f.close()
def parse_authlog(self):
# read log file
try:
with open(self.log_file, 'r') as f:
log_lines = f.readlines()
except FileNotFoundError:
print('File not found.')
sys.exit(1)
# run parser with multiprocessing
total_cpu = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=total_cpu)
parsed_logs = pool.map(self, log_lines)
pool.close()
pool.join()
self.__save_csv(parsed_logs)
if __name__ == '__main__':
if len(sys.argv) == 2:
file_name = sys.argv[1]
parser = ParallelLogParser(file_name)
parser.parse_authlog()
else:
print('Please type a correct log file name.')
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment