Created
October 30, 2022 01:17
-
-
Save geky/b27cd2fa75253686c981d1003a74090f to your computer and use it in GitHub Desktop.
Filter/count/find culprits for emails in .mbox format
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# Filter/count/find culprits for emails in .mbox format | |
# | |
# To find number of emails from an address: | |
# ./mboxfilter.py mail-2022.mbox --from=@github.com | |
# | |
# To separate these out into their own .mbox: | |
# ./mboxfilter.py mail-2022.mbox --from=@github.com -omail-github-2022.mbox | |
# | |
# To find the culprits who sent you the most mail: | |
# ./mboxfilter.py mail-2022.mbox --culprits -N100 | |
# | |
import mailbox | |
import re | |
import math as ma | |
import time | |
import collections as co | |
def main(mbox_path, *, | |
output=None, | |
sample=False, | |
filters=[], | |
culprits=False, | |
culprit_header='From', | |
culprit_quantity=False, | |
culprit_count=20): | |
start = time.time() | |
t_count = 0 | |
t_size = 0 | |
t_min = 0 | |
t_max = 0 | |
if culprits: | |
print('finding culprits') | |
t_culprits = co.defaultdict(lambda: 0) | |
if output and not sample: | |
obox = mailbox.mbox(output, create=True) | |
print('writing to %s' % output) | |
print('populating...') | |
mbox = mailbox.mbox(mbox_path) | |
for i, (k, m) in enumerate(mbox.items()): | |
if i == 0: | |
print('filtering...') | |
z = len(bytes(m)) | |
if not all(f(m,z) for f in filters): | |
continue | |
t_count += 1 | |
t_size += z | |
t_min = min(t_min, z) if t_min else z | |
t_max = max(t_max, z) | |
if culprits: | |
if culprit_quantity: | |
t_culprits[str(m[culprit_header])] += 1 | |
else: | |
t_culprits[str(m[culprit_header])] += z | |
if sample: | |
if output is not None: | |
with open(output, 'w'): | |
output.write(m.as_string(unixfrom=True)) | |
print('wrote email %r to %s' % (k, output)) | |
else: | |
print('found email %r:' % k) | |
print(m.as_string(unixfrom=True)) | |
break | |
if output and not sample: | |
obox.add(m) | |
if output and not sample: | |
obox.close() | |
if culprits: | |
print('culprits:') | |
t_culprits = sorted(t_culprits.items(), key=lambda kv: kv[1]) | |
for k, v in t_culprits[-culprit_count:]: | |
print('%11d %s' % (v, k)) | |
stop = time.time() | |
print('found %d, %d bytes, avg %d bytes, min %d bytes, max %d bytes' % ( | |
t_count, | |
t_size, | |
ma.ceil(t_size/t_count) if t_count else 0, | |
t_min, | |
t_max)) | |
print('in %.2fs' % (stop-start)) | |
def parse_size(s): | |
m = re.match('^(?P<op>>=|<=|>>|<<|==)?(?P<v>.*)$', s) | |
op = m.group('op') or '>=' | |
v = int(m.group('v'), 0) | |
if op == '>=': | |
f = lambda m,z: z >= v | |
elif op == '<=': | |
f = lambda m,z: z <= v | |
elif op == '>>': | |
f = lambda m,z: z > v | |
elif op == '<<': | |
f = lambda m,z: z < v | |
elif op == '==': | |
f = lambda m,z: z == v | |
else: | |
assert False | |
print('filtering size %s %r' % (op, v)) | |
return f | |
def parse_header(s): | |
m = re.match('^(?P<hop>>=|==|\*=|~=)?(?P<hv>[^=]*)=(?P<op>>=|==|\*=|~=)?(?P<v>[^=]*)$', s) | |
hop = m.group('hop') or '==' | |
hv = m.group('hv') | |
op = m.group('op') or '>=' | |
v = m.group('v') | |
if hop == '>=': | |
hf = lambda m,f: any(f(str(h)) for k,h in m.items() if hv in k) | |
elif hop == '==': | |
hf = lambda m,f: f(str(m.get(hv, ''))) | |
elif op == '*=': | |
hf = lambda m,f: any(f(str(h)) for k,h in m.items() if fnmatch.fnmatch(k, hv)) | |
elif op == '~=': | |
hf = lambda m,f: any(f(str(h)) for k,h in m.items() if bool(re.search(hv, k))) | |
if op == '>=': | |
f = lambda m,z: hf(m, lambda h: v in h) | |
elif op == '==': | |
f = lambda m,z: hf(m, lambda h: h == v) | |
elif op == '*=': | |
f = lambda m,z: hf(m, lambda h: fnmatch.fnmatch(h, v)) | |
elif op == '~=': | |
f = lambda m,z: hf(m, lambda h: bool(re.search(v, h))) | |
print('filtering %s %r %s %r' % (hop, hv, op, v)) | |
return f | |
if __name__ == "__main__": | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser(description="filter mbox by various criteria") | |
parser.add_argument( | |
'mbox_path', | |
help="Input mbox file") | |
parser.add_argument( | |
'-o', '--output', | |
help="Output file") | |
parser.add_argument( | |
'-s', '--sample', | |
action='store_true', | |
help="Samples the first match, printing the contents and stopping") | |
class AppendFilter(argparse.Action): | |
def __call__(self, parser, namespace, value, option): | |
if not hasattr(namespace, 'filters') or namespace.filters is None: | |
namespace.filters = [] | |
namespace.filters.append(value) | |
parser.add_argument( | |
'-z', '--size', | |
type=parse_size, | |
action=AppendFilter, | |
help="Filter by size, can be prefixed with a comparison operation, one of >=,<=,>>,<<,==, defaults to >=") | |
parser.add_argument( | |
'-f', '--from', | |
type=lambda v: parse_header('From='+v), | |
action=AppendFilter, | |
help="Filter by from field, can be prefixed with a comparison operation, one of >=,==,*=,~=, defaults to >=") | |
parser.add_argument( | |
'-l', '--label', | |
type=lambda v: parse_header('X-Gmail-Labels='+v), | |
action=AppendFilter, | |
help="Filter by gmail label, can be prefixed with a comparison operation, one of >=,==,*=,~=, defaults to >=") | |
parser.add_argument( | |
'-k', '--header', | |
type=parse_header, | |
action=AppendFilter, | |
help="Filter by arbitrary header, -kheader=pattern, can be prefixed with a comparison operation, one of >=,==,*=,~=, defaults to == for the header and >= for the pattern") | |
parser.add_argument( | |
'-C', '--culprits', | |
action='store_true', | |
help="Find culprits, aka find who created the most emails") | |
parser.add_argument( | |
'-K', '--culprit-header', | |
help="Use this header to find culprits, defaults to 'From'") | |
parser.add_argument( | |
'-Q', '--culprit-quantity', | |
action='store_true', | |
help="Find culprits by quantity not size") | |
parser.add_argument( | |
'-N', '--culprit-count', | |
type=lambda x: int(x, 0), | |
help="Show the top n results, defaults to 20") | |
sys.exit(main(**{k: v | |
for k, v in vars(parser.parse_intermixed_args()).items() | |
if v is not None})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment