Skip to content

Instantly share code, notes, and snippets.

@geky
Created October 30, 2022 01:17
Show Gist options
  • Save geky/b27cd2fa75253686c981d1003a74090f to your computer and use it in GitHub Desktop.
Save geky/b27cd2fa75253686c981d1003a74090f to your computer and use it in GitHub Desktop.
Filter/count/find culprits for emails in .mbox format
#!/usr/bin/env python3
#
# Filter/count/find culprits for emails in .mbox format
#
# To find number of emails from an address:
# ./mboxfilter.py mail-2022.mbox --from=@github.com
#
# To separate these out into their own .mbox:
# ./mboxfilter.py mail-2022.mbox --from=@github.com -omail-github-2022.mbox
#
# To find the culprits who sent you the most mail:
# ./mboxfilter.py mail-2022.mbox --culprits -N100
#
import mailbox
import re
import math as ma
import time
import collections as co
def main(mbox_path, *,
output=None,
sample=False,
filters=[],
culprits=False,
culprit_header='From',
culprit_quantity=False,
culprit_count=20):
start = time.time()
t_count = 0
t_size = 0
t_min = 0
t_max = 0
if culprits:
print('finding culprits')
t_culprits = co.defaultdict(lambda: 0)
if output and not sample:
obox = mailbox.mbox(output, create=True)
print('writing to %s' % output)
print('populating...')
mbox = mailbox.mbox(mbox_path)
for i, (k, m) in enumerate(mbox.items()):
if i == 0:
print('filtering...')
z = len(bytes(m))
if not all(f(m,z) for f in filters):
continue
t_count += 1
t_size += z
t_min = min(t_min, z) if t_min else z
t_max = max(t_max, z)
if culprits:
if culprit_quantity:
t_culprits[str(m[culprit_header])] += 1
else:
t_culprits[str(m[culprit_header])] += z
if sample:
if output is not None:
with open(output, 'w'):
output.write(m.as_string(unixfrom=True))
print('wrote email %r to %s' % (k, output))
else:
print('found email %r:' % k)
print(m.as_string(unixfrom=True))
break
if output and not sample:
obox.add(m)
if output and not sample:
obox.close()
if culprits:
print('culprits:')
t_culprits = sorted(t_culprits.items(), key=lambda kv: kv[1])
for k, v in t_culprits[-culprit_count:]:
print('%11d %s' % (v, k))
stop = time.time()
print('found %d, %d bytes, avg %d bytes, min %d bytes, max %d bytes' % (
t_count,
t_size,
ma.ceil(t_size/t_count) if t_count else 0,
t_min,
t_max))
print('in %.2fs' % (stop-start))
def parse_size(s):
m = re.match('^(?P<op>>=|<=|>>|<<|==)?(?P<v>.*)$', s)
op = m.group('op') or '>='
v = int(m.group('v'), 0)
if op == '>=':
f = lambda m,z: z >= v
elif op == '<=':
f = lambda m,z: z <= v
elif op == '>>':
f = lambda m,z: z > v
elif op == '<<':
f = lambda m,z: z < v
elif op == '==':
f = lambda m,z: z == v
else:
assert False
print('filtering size %s %r' % (op, v))
return f
def parse_header(s):
m = re.match('^(?P<hop>>=|==|\*=|~=)?(?P<hv>[^=]*)=(?P<op>>=|==|\*=|~=)?(?P<v>[^=]*)$', s)
hop = m.group('hop') or '=='
hv = m.group('hv')
op = m.group('op') or '>='
v = m.group('v')
if hop == '>=':
hf = lambda m,f: any(f(str(h)) for k,h in m.items() if hv in k)
elif hop == '==':
hf = lambda m,f: f(str(m.get(hv, '')))
elif op == '*=':
hf = lambda m,f: any(f(str(h)) for k,h in m.items() if fnmatch.fnmatch(k, hv))
elif op == '~=':
hf = lambda m,f: any(f(str(h)) for k,h in m.items() if bool(re.search(hv, k)))
if op == '>=':
f = lambda m,z: hf(m, lambda h: v in h)
elif op == '==':
f = lambda m,z: hf(m, lambda h: h == v)
elif op == '*=':
f = lambda m,z: hf(m, lambda h: fnmatch.fnmatch(h, v))
elif op == '~=':
f = lambda m,z: hf(m, lambda h: bool(re.search(v, h)))
print('filtering %s %r %s %r' % (hop, hv, op, v))
return f
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser(description="filter mbox by various criteria")
parser.add_argument(
'mbox_path',
help="Input mbox file")
parser.add_argument(
'-o', '--output',
help="Output file")
parser.add_argument(
'-s', '--sample',
action='store_true',
help="Samples the first match, printing the contents and stopping")
class AppendFilter(argparse.Action):
def __call__(self, parser, namespace, value, option):
if not hasattr(namespace, 'filters') or namespace.filters is None:
namespace.filters = []
namespace.filters.append(value)
parser.add_argument(
'-z', '--size',
type=parse_size,
action=AppendFilter,
help="Filter by size, can be prefixed with a comparison operation, one of >=,<=,>>,<<,==, defaults to >=")
parser.add_argument(
'-f', '--from',
type=lambda v: parse_header('From='+v),
action=AppendFilter,
help="Filter by from field, can be prefixed with a comparison operation, one of >=,==,*=,~=, defaults to >=")
parser.add_argument(
'-l', '--label',
type=lambda v: parse_header('X-Gmail-Labels='+v),
action=AppendFilter,
help="Filter by gmail label, can be prefixed with a comparison operation, one of >=,==,*=,~=, defaults to >=")
parser.add_argument(
'-k', '--header',
type=parse_header,
action=AppendFilter,
help="Filter by arbitrary header, -kheader=pattern, can be prefixed with a comparison operation, one of >=,==,*=,~=, defaults to == for the header and >= for the pattern")
parser.add_argument(
'-C', '--culprits',
action='store_true',
help="Find culprits, aka find who created the most emails")
parser.add_argument(
'-K', '--culprit-header',
help="Use this header to find culprits, defaults to 'From'")
parser.add_argument(
'-Q', '--culprit-quantity',
action='store_true',
help="Find culprits by quantity not size")
parser.add_argument(
'-N', '--culprit-count',
type=lambda x: int(x, 0),
help="Show the top n results, defaults to 20")
sys.exit(main(**{k: v
for k, v in vars(parser.parse_intermixed_args()).items()
if v is not None}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment