Skip to content

Instantly share code, notes, and snippets.

@shiplu
Last active May 17, 2019 07:02
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save shiplu/4645072c35a1a744721c12c4d9e06e36 to your computer and use it in GitHub Desktop.
Sort python log files using date time. Takes datetime regex as input
import argparse
import re
from datetime import datetime
from signal import signal, SIGPIPE, SIG_DFL
from operator import itemgetter
signal(SIGPIPE, SIG_DFL)
def test_sorted_lines():
input_log_1 = ["2019-05-08 08:44:21,159 : 1 line 1",
"2019-05-08 08:44:22,996 : 1 line 2",
"line 2.1",
"line 2.2",
"2019-05-08 08:44:27,000 : 1 line 3", ]
input_log_2 = ["2019-05-08 08:44:21,158 : 2 line 1",
"2019-05-08 08:44:22,995 : 2 line 2",
"2019-05-08 08:44:26,000 : 2 line 3",
"line 3.1",
"line 3.2", ]
input_log_3 = ["line 0.1",
"line 0.2",
"line 0.3",
"2019-05-08 08:45:00,000 : 3 line 2",
"2019-05-08 08:46:00,000 : 3 line 3", ]
expected_output = [
"2019-05-08 08:44:21,158 : 2 line 1",
"2019-05-08 08:44:21,159 : 1 line 1",
"2019-05-08 08:44:22,995 : 2 line 2",
"2019-05-08 08:44:22,996 : 1 line 2\nline 2.1\nline 2.2",
"2019-05-08 08:44:26,000 : 2 line 3\nline 3.1\nline 3.2",
"2019-05-08 08:44:27,000 : 1 line 3",
"2019-05-08 08:45:00,000 : 3 line 2",
"2019-05-08 08:46:00,000 : 3 line 3",
]
output = list(sorted_lines([input_log_2, input_log_1, input_log_3], re.compile('(%s)' % r'^[\d :-]{19},\d+')))
assert output == expected_output
def get_datetime(asctime):
return datetime.strptime(asctime, "%Y-%m-%d %H:%M:%S,%f")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--pattern', help='datetime pattern',
default=r'^[\d :-]{19},\d+')
parser.add_argument('log', nargs='+', type=argparse.FileType(), help="log files")
return parser.parse_args()
def rows(f, datetime_pattern):
non_match = 0
last_row = ()
for line_no, raw_line in enumerate(f, 1):
line = raw_line.rstrip()
match = datetime_pattern.search(line)
if match:
if last_row:
yield last_row
last_row = [match.group(1), line]
else:
non_match += 1
if last_row != ():
# only do it if last row had something
last_row[1] = last_row[1] + "\n" + line
else:
if last_row:
yield last_row
def print_formatted(start, end):
duration = end['time'] - start['time']
print("%s\t%0.03f" % (end['time'].strftime("%Y-%m-%d %H:%M:%S.%f"), duration.total_seconds()))
def indexed_rows(files, pattern):
for f in files:
for indexed_row in rows(f, pattern):
yield indexed_row
def sorted_lines(files, pattern):
for _, line in sorted(indexed_rows(files, pattern), key=itemgetter(0)):
yield line
def main():
args = parse_args()
datetime_pattern = re.compile('(%s)' % args.pattern)
for line in sorted_lines(args.log, datetime_pattern):
print(line)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment