Skip to content

Instantly share code, notes, and snippets.

@copperlight
Last active October 22, 2020 04:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save copperlight/eac3e2632acf600758e981a133542c8d to your computer and use it in GitHub Desktop.
Save copperlight/eac3e2632acf600758e981a133542c8d to your computer and use it in GitHub Desktop.
import bz2
import fnmatch
import gzip
import os
import re
from typing import Callable, Generator, List, Optional, Pattern, Tuple
"""
Generators for building log processing pipelines.
Code borrowed and combined from Python Generator Hacking, with some modifications:
https://www.dabeaz.com/usenix2009/generators/index.html
Given a set of log files which may be compressed, stream all lines, filter them by pattern
match and transform them into dictionaries, maybe remapping some of the field values.
Example log line:
2020-10-04T19:11:45.945 DEBUG [RxComputationScheduler-1] com.amazonaws.latency:
ServiceName=[Amazon S3], StatusCode=[200],
Example usage:
import re
from logprocessing import *
log_format = r'.*com.amazonaws.latency: ' \
r'ServiceName=\[([^\]]*)\], ' \
r'StatusCode=\[([^\]]*)\], '
log_pattern = re.compile(log_format)
columns = (
'ServiceName',
'StatusCode'
)
remaps = [
('StatusCode', int),
]
lines = lines_from_dir('*.log', '.')
filtered = gen_grep('com.amazonaws.latency', lines)
events = lines_to_events(filtered, log_pattern, columns, remaps)
for e in events:
# do something with the sequence of log events
pass
"""
def gen_files(pattern: str, top: str) -> Generator:
"""Generate filenames in a directory that match a pattern."""
for dirpath, dirnames, filenames in os.walk(top):
for name in fnmatch.filter(filenames, pattern):
yield os.path.join(dirpath, name)
def gen_streams(filenames: Generator) -> Generator:
"""Given a sequence of filenames, generate a sequence of file streams."""
for name in filenames:
if name.endswith('.gz'):
yield gzip.open(name)
elif name.endswith('.bz2'):
yield bz2.BZ2File(name)
else:
yield open(name)
def gen_cat(sources: Generator) -> Generator:
"""Concatenate multiple generators into a single sequence."""
for s in sources:
for item in s:
yield item
def lines_from_dir(pattern: str, dirname: str) -> Generator:
"""Generate lines from files in a directory."""
files = gen_files(pattern, dirname)
streams = gen_streams(files)
lines = gen_cat(streams)
return lines
def gen_grep(pattern: str, lines: Generator) -> Generator:
"""Grep a sequence of lines that match regex pattern."""
pattern = re.compile(pattern)
for line in lines:
if pattern.search(line):
yield line
def field_remap(
events: Generator,
remaps: List[Tuple[str, Callable]]
) -> Generator:
"""Take a sequence of dictionaries and remap fields."""
for e in events:
for name, func in remaps:
e[name] = func(e[name])
yield e
def lines_to_events(
lines: Generator,
log_pattern: Pattern,
columns: Tuple,
remaps: Optional[List[Tuple[str, Callable]]] = None
) -> Generator:
"""Parse an app log file into a sequence of dictionaries."""
if not remaps:
remaps = []
groups = (log_pattern.match(line) for line in lines)
tuples = (g.groups() for g in groups if g)
events = (dict(zip(columns, t)) for t in tuples)
return field_remap(events, remaps)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment