Skip to content

Instantly share code, notes, and snippets.

@lrstanley
Created June 25, 2016 19:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lrstanley/effdb7c618fb7387a3672937cba27134 to your computer and use it in GitHub Desktop.
Save lrstanley/effdb7c618fb7387a3672937cba27134 to your computer and use it in GitHub Desktop.
Old Exiclean version, in Python. Here for reference.
#!/usr/bin/python
"""
Exiclean -- Exim mail queue cleaner/spam removal script
-----------------------------------------------------------------------------
LICENSE: The MIT License (MIT)
Copyright (c) 2016 Liam Stanley <me@liamstanley.io>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import os
import re
import sys
import glob
import time
import getopt
from hashlib import md5
from threading import Thread
try:
import queue
except:
import Queue as queue
VERSION = 0.1
_args = [
{
'name': 'help', 'short': 'h', 'long': 'help',
'description': 'Shows this help dialog'
},
{
'name': 'version', 'short': 'v', 'long': 'version',
'description': 'Shows the current exiclean version'
},
{
'name': 'nocolors', 'short': 'n', 'long': 'no-colors',
'description': 'Strips color from all output'
},
{
'name': 'spooldir', 'short': 's:', 'long': 'spool-dir=',
'description': 'Changes the default spool directory',
'default': '/var/spool/exim/input/'
},
{
'name': 'readcount', 'short': 'r:', 'long': 'rthreads=',
'default': 2, 'description': 'Number of threads used for reading queue (min: 1, max: 5)'
},
{
'name': 'deletecount', 'short': 'd:', 'long': 'dthreads=',
'default': 1, 'description': 'Number of threads used for removing items from queue (min: 1, max: 5)'
},
{
'name': 'top', 'long': 'top-common',
'description': 'Show top common items within the queue'
},
{
'name': 'quiet', 'short': 'q', 'long': 'quiet',
'description': 'Print out only useful information'
},
{
'name': 'sort_user', 'short': 'u:', 'long': 'user=',
'description': 'Filter results based on user'
},
{
'name': 'queuemax', 'long': 'queue-max=',
'description': 'Only pull last X number of items from the queue'
}
]
colors = {
"black": "\033[30m", "blue": "\033[34m", "green": "\033[32m",
"cyan": "\033[36m", "red": "\033[31m", "purple": "\033[35m",
"brown": "\033[33m", "lightgray": "\033[37m", "darkgray": "\033[1;30m",
"lightblue": "\033[1;34m", "lightgreen": "\033[1;30m", "lightcyan": "\033[1;36m",
"lightred": "\033[1;31m", "lightpurple": "\033[1;35m", "yellow": "\033[1;33m",
"white": "\033[1;37m", "cr": "\033[0m"
}
class Exiclean(object):
def __init__(self):
self.db = {}
self.readq = queue.Queue()
self.read_init = False
self.read_count = 0
self.deleteq = queue.Queue()
self.delete_init = False
self.delete_count = 0
# headers we want if they exist in the exim header files
self.headers = ['to', 'from', 'subject']
# headers we want to use to sort/group by
self.organizable = ['to', 'from', 'subject', 'user']
# set some sane screen width/height -- used for self.out()
self.width = 80
self.height = 25
# start parsing sys.argv[]
self.gen_args()
# update screen dimensions (self.width, self.height)
self.update_dimensions()
def show_help(self):
"""
Prints out script help dialog -- attempts to simulate argparse or
optparse. Needs to be manually done to be able to be 2-3 compatible.
"""
helpdoc = """
usage: {exe} [-h] [arguments]
exiclean -- exim mail queue cleaner/spam removal script
optional arguments:
{arguments}
"""
helpdoc = re.sub(r'^ +', '', helpdoc.lstrip("\n"), flags=re.M)
args = []
for item in _args:
cmds = ""
if item.get('short'): cmds += "-%s" % item['short'].rstrip(':')
if item.get('short') and item.get('long'):
cmds += ", --%s" % item['long'].rstrip('=')
elif item.get('long'):
cmds += "--%s" % item['long'].rstrip('=')
if item.get('short', '').endswith(':') or item.get('long', '').endswith('='):
cmds += " <args>"
desc = item.get('description') or "No description"
args.append([cmds, desc])
helpdoc = helpdoc.format(exe=sys.argv[0], arguments=self.table_fmt(args))
self.out(helpdoc, tag=False)
def table_fmt(self, data):
"""
Prints out a unix "column" style output. Pass in a list of lists, e.g:
self.table_fmt([['something', 'else'], ['a', 'b']])
"""
widths = [max(map(len, col)) for col in zip(*data)]
tmp = ""
for row in data:
tmp += " ".join((val.ljust(width) for val, width in zip(row, widths))).strip()
tmp += "\n"
return tmp
def nproc(self):
""" Returns number of processors seen by the system """
try:
out = os.popen('cat /proc/cpuinfo').read()
cores = int(len(re.findall(r'proc', out)))
except:
cores = 1
return cores
def update_dimensions(self):
""" Updates self.width & self.height based on terminal dimensions """
try:
h, w = os.popen('stty size', 'r').read().split()
self.height, self.width = int(h), int(w)
except:
pass
def out(self, text="", ret=True, tag=True, parse_colors=True, trim=False, info=False):
"""
Stdout wrapper
optional args:
ret -- Add carriage return (allows text to be overwritten)
tag -- (True) Show [EXICLEAN] prefix tag line
parse_colors -- (True) Inject ANSII color codes into stdout
trim -- Ellipsis output based on terminal width
info -- (True) if self.quiet is set, output gets dropped
"""
if info and self.quiet:
return
if tag and len(text) > 1:
text = "[EXICLEAN]: " + text
if not ret:
text = "\r" + text
else:
text += "\n"
if self.nocolors or not parse_colors:
text = re.sub(r'<[a-z]+>', '', text)
else:
for color in colors:
text = text.replace("<%s>" % color, colors[color])
if trim:
try:
ansii_count = len(re.findall(r'(\x1b[^m]*m)', text))
except:
ansii_count = 0
if len(re.sub(r'\x1b[^m]*m', '', text)) > self.width:
text = text[:(self.width + (ansii_count * 2)) - 3].rstrip('. ') + "...\n"
sys.stdout.write(text)
sys.stdout.flush()
def exit(self, text):
if text:
self.out("<red>Error: %s<cr>" % str(text))
sys.exit(1)
def gen_args(self):
"""
Parse sys.argv[] and map to Exiclean() class attributes. E.g:
-q -> self.quiet
As we are trying to support broad versions of Python (e.g. 2.4+ -> 3.5+),
this needs to be done manually to be consistent between Python versions
"""
for _arg in _args:
if 'default' in _arg:
setattr(self, _arg['name'], _arg['default'])
else:
setattr(self, _arg['name'], None)
_short = [x['short'] for x in _args if x.get('short')]
_long = [x['long'] for x in _args if x.get('long')]
try:
opts, alt_args = getopt.getopt(sys.argv[1:], ''.join(_short), _long)
except getopt.GetoptError as err:
self.exit(err)
for name, value in opts:
name = name.lstrip('-')
arg = None
for _arg in _args:
if name == _arg.get('short', '').rstrip(':') or name == _arg.get('long', '').rstrip('='):
arg = _arg
break
if not arg:
self.exit("Unknown arg %s" % name)
if value == '': value = True
if isinstance(value, str) and value.isdigit(): value = int(value)
setattr(self, arg['name'], value)
def init_read(self):
""" Initialize read queue threads """
if self.read_init: return
if self.readcount < 1:
self.readcount = 1
if self.readcount > 10:
self.readcount = 10
self.out("Detecting core count: <cyan>%d<cr>" % self.nproc(), info=True)
if (float(self.readcount) / float(self.nproc())) >= 2.5:
self.out("<yellow>WARNING: Selecting higher threads than cores may impose additional load!<cr>", info=True)
self.out("Starting <cyan>%d<cr> read thread(s) (min: 1, max: 10)" % self.readcount, info=True)
def _read():
while True:
fn = self.readq.get()
if fn is None:
break
self.read_header_files(fn)
self.readq.task_done()
for i in range(self.readcount):
t = Thread(target=_read)
t.setDaemon(True)
t.start()
self.read_init = True
def init_delete(self):
""" Initialize delete queue threads """
if self.delete_init: return
if self.deletecount < 1:
self.deletecount = 1
if self.deletecount > 5:
self.deletecount = 5
if self.deletecount >= 4:
self.out("<yellow>WARNING: Selecting 4+ delete threads may cause very high IOWAIT!<cr>", info=True)
self.out("Starting <cyan>%d<cr> delete thread(s) (min: 1, max: 5)" % self.deletecount, info=True)
def _delete():
while True:
fn = self.deleteq.get()
self.delete_header_files(fn)
self.deleteq.task_done()
for i in range(self.deletecount):
t = Thread(target=_delete)
t.setDaemon(True)
t.start()
self.delete_init = True
def read_header_files(self, fn):
""" Read header files from the exim spool dir (self.spooldir) """
try:
with open(fn, 'r') as f:
text = f.read()
def get_header(search):
tmp = re.findall(search, text, flags=re.M)
if len(tmp) > 0:
return tmp[0].strip()
return None
mid = get_header(r'^([a-zA-Z0-9-]{16})-H$')
ident = get_header(r'^-ident (.*?)$')
if not ident or (self.sort_user and ident and ident.lower() != self.sort_user.lower()):
return
msg = {
'id': mid,
'user': ident,
'fn': fn
}
for item in re.findall(r'^[0-9A-Z\*]{3,4} +([^:]+): +(.*?)$', text, flags=re.M):
header, header_content = item[0].strip().lower(), item[1].strip()
if len(header_content) > 65:
header_content = header_content[:65] + "..."
if header in ['to', 'from']:
header_content = re.sub(r'.*<([^>]+)>.*', '\g<1>', header_content)
if header in self.headers:
msg[header] = header_content
for item in self.organizable:
if item not in msg:
msg[item] = None
self.db[mid] = msg
self.read_count += 1
except:
pass
def delete_header_files(self, id):
""" Delete header (and map/body) files from exim spool dir (self.spooldir) """
email = self.db.get(id, None)
if not email:
return
self.db.pop(id, None)
header_fn = email['fn']
extra_fn = re.sub(r'\-H$', '-D', header_fn)
try:
os.remove(header_fn)
except OSError:
pass
self.delete_count += 1
if header_fn != extra_fn:
try:
os.remove(extra_fn)
except OSError:
pass
def delete_sync(self, id_list):
""" Dumps id_list info deleteq and tracks total deletions """
self.delete_count = 0
[self.deleteq.put(id) for id in id_list]
def status():
self.out("Deleting items from exim queue: <cyan>%d<cr>" % self.delete_count, ret=False, info=True)
while self.deleteq.unfinished_tasks:
status()
time.sleep(0.3)
status()
self.out()
self.out("Finished deleting items from queue.")
def hash(self, string):
""" Returns a md5 hash string -- encode utf-8 for py3 """
return md5(string.encode('utf-8')).hexdigest()
def input(self, string):
""" Get raw user input -- py2 uses raw_input() -- py3 uses input() """
try:
ipt = raw_input(string)
except NameError:
ipt = input(string)
return ipt
def question_prompt(self, title, options):
"""
Initiate question prompt based on a list of lists
The child list must consist of a len(2) list, with each arg matching:
0: key to be returned from self.question_prompt()
1: text to display in question
"""
self.update_dimensions()
self.out(title, tag=False)
count = 0
for lst in options:
count += 1
self.out(" [<cyan>{n}<cr>] {text}".format(n=str(count), text=lst[1]), tag=False, trim=True)
self.out(" [X] Cancel", tag=False)
results = self.input("Select option [X]: ")
if results.lower() == 'x':
return None
if not results.isdigit() or int(results) < 1 or int(results) > count:
# re-invoke
return None
return options[int(results) - 1][0]
def update_db(self):
""" Update self.db with all emails in mail queue """
self.db = {}
self.spooldir = self.spooldir.rstrip('/')
if not os.path.isdir(self.spooldir):
self.exit("Spool directory does not exist or unable to access: %s" % self.spooldir)
count = 0
for f in glob.glob(self.spooldir + '/*/*-H'):
count += 1
self.readq.put(f)
self.out("Updating local queue db. Files scanned: <cyan>%d<cr>" % count, ret=False, info=True)
if self.queuemax and count >= self.queuemax:
break
self.out()
self.out("Reading items in queue. This may take a moment... (%d/%d)" % (self.read_count, count))
def status():
self.out("Reading items in queue. This may take a moment... (<cyan>%d<cr>/<cyan>%d<cr>)" % (self.read_count, count), ret=False, info=True)
while self.readq.unfinished_tasks:
status()
time.sleep(0.3)
status()
self.out()
self.out("Finished processing items from queue.")
def most_common(self):
"""
Return a list of the top 20 most common self.organizable headers in
self.db, though top results may contain duplicates
"""
self.out("Gathering information from queue db...", info=True)
self.tmp = {}
for email_id in self.db:
email = self.db[email_id]
for key in self.organizable:
if not email[key]: break
kv = self.hash(key + email[key])
if kv not in self.tmp:
self.tmp[kv] = {'hash': kv, 'count': 1, 'ids': [email['id']], 'type': key, 'value': email[key]}
else:
self.tmp[kv]['count'] += 1
self.tmp[kv]['ids'].append(email['id'])
return sorted([self.tmp[x] for x in self.tmp], key=lambda k: k['count'], reverse=True)[:20]
def main(self):
""" Main Exiclean() initialization function """
if self.help:
self.show_help()
sys.exit(0)
self.out("Running version: <cyan>%s<cr>" % str(VERSION), info=True)
if self.version:
sys.exit(0)
if self.queuemax:
if self.sort_user:
self.exit("--queue-max cannot be used with any sort filters")
if self.queuemax < 100:
self.queuemax = 100
self.out("<cyan>Limiting queue scan to %d emails<cr>" % self.queuemax, info=True)
self.init_read()
self.init_delete()
self.update_db()
if self.top:
scan = self.most_common()
self.out()
out = [["COUNT", "TYPE", "ITEM"], ["-----", "----", "----"]]
for item in scan[:10]:
out.append([str(item['count']), str(item['type']), str(item['value'])])
self.out(self.table_fmt(out), tag=False)
sys.exit(0)
if self.sort_user:
self.organizable.remove('user')
while True:
scan = self.most_common()
if len(scan) == 0:
self.out("<cyan>No email in the queue to delete! AWESOME!<cr>")
sys.exit(0)
q = [[x['hash'], "%6d x [<green>%7s<cr>] %s" % (x['count'], str(x['type']), str(x['value']))] for x in scan[:8]]
id_to_delete = self.question_prompt("What would you like to delete?", q)
if not id_to_delete:
print("\nExiting...")
sys.exit(0)
item = [x for x in scan if x['hash'] == id_to_delete][0]
scan = None
self.delete_sync(item['ids'])
if __name__ == '__main__':
try:
ec = Exiclean()
except KeyboardInterrupt:
print("\nCancelling initialization...\nExiting..")
sys.exit(1)
try:
ec.main()
except KeyboardInterrupt:
print("\nExiting...")
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment