Skip to content

Instantly share code, notes, and snippets.

@iomarmochtar
Created May 23, 2018 09:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save iomarmochtar/6687e0b9c013569c443449fa17d3e419 to your computer and use it in GitHub Desktop.
Save iomarmochtar/6687e0b9c013569c443449fa17d3e419 to your computer and use it in GitHub Desktop.
Simple script to parse zimbra's audit.log for failed authentication then it will report and block the IP, yes it's a simple alternative to fail2ban
#!/usr/bin/env python
__author__ = 'Imam Omar Mochtar <iomarmochtar@gmail.com>'
"""
this script assume zimbra smtp and mailbox service in same server with original ip (oip) is logged
the attempted/blocked IP will listed in AUDITWATCH chain. for applying block to it add rule in INPUT filter.
Example:
# iptables -t filter -I INPUT -j AUDITWATCH
"""
import re
import sys
import subprocess
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
from pprint import pprint
### VARIABLES BEGIN
AUDIT_FILE = '/opt/zimbra/log/audit.log'
CHAIN = 'AUDITWATCH'
MAX_ATTEMPT = 5
# define white list here
WHITELIST = re.compile( '(%s)' % '|'.join( map(lambda x: x.replace('.', '\.'), [
'192.100'
])))
# list of blocked IP
BLOCKED = set()
# this var will hold parsing result
FAILED_IP = {}
## SMTP
REPORT_FROM = 'admin@mail.com'
REPORT_TO = ['system.support@mail.com', 'iomarmochtar@gmail.com']
REPORT_SUBJECT = 'Authentication Attempt Report'
REPORT_SMTP = '127.0.0.1'
### VARIABLES END
def run_cmd(command):
return subprocess.Popen(
command, universal_newlines=True, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
# make sure chain are exists
run_cmd("/sbin/iptables -nL {0} || /sbin/iptables -N {0}".format(CHAIN))
# grab listed of blocked IP
output, err = run_cmd("/sbin/iptables -nL {0}".format(CHAIN))
for txt in output.splitlines():
if txt.startswith('DROP'):
brk = txt.split()
if len(brk) != 5:
continue
BLOCKED.add(brk[3])
continue
oip_parser = re.compile(r'oip=(?P<oip>.*?);.*?for\s+\[(?P<username>.*?)\],\s+invalid password;$')
# loop trough audit file to parse any failed authentication, capture OIP params
for line in open(AUDIT_FILE, 'r').readlines():
oip = oip_parser.search(line)
if not oip:
continue
data = oip.groupdict()
ip = data['oip']
# if the IP was listed in existing block and ignore list then skip it
if ip in BLOCKED or WHITELIST.search(ip):
continue
if not FAILED_IP.has_key(ip):
FAILED_IP[ip] = {'users': set(), 'attempt': 0}
FAILED_IP[ip]['users'].add(data['username'])
FAILED_IP[ip]['attempt'] += 1
# input IP in CHAIN if has reach maximum var
is_report = False
text = "<h3><b>List of auth attempt</b></h3> <br/><br/><br/>"
f_cmd = "/sbin/iptables -I {0} -p tcp -s {1} -j DROP"
for ip, data in FAILED_IP.items():
if data['attempt'] <= MAX_ATTEMPT:
continue
is_report = True
cmd = f_cmd.format(CHAIN, ip)
print("IP {0} has reach max attempt {1} ({2})".format(ip, MAX_ATTEMPT, data['attempt']))
run_cmd(cmd)
text += "<b><u>{0} for {1} times</u></b> <br/><ul>".format(ip, data['attempt'])
for user in data['users']:
text += "<li>{0}</li>".format(user)
text += "</ul>"
if not is_report:
sys.exit(0)
print("sending report")
msg = MIMEMultipart()
msg['From'] = REPORT_FROM
msg['To'] = COMMASPACE.join(REPORT_TO)
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = REPORT_SUBJECT
msg.attach(MIMEText(text, 'html'))
msg.attach(MIMEText("Please see this email as HTML", 'plain'))
smtp = smtplib.SMTP(REPORT_SMTP)
smtp.sendmail(REPORT_FROM, REPORT_TO, msg.as_string())
smtp.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment