Skip to content

Instantly share code, notes, and snippets.

@iomarmochtar
Created March 19, 2018 04:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iomarmochtar/2e4bf7338e462a8df878189b79571b56 to your computer and use it in GitHub Desktop.
Save iomarmochtar/2e4bf7338e462a8df878189b79571b56 to your computer and use it in GitHub Desktop.
Parse zimbra audit file for blocking any massive failed authentication which indicating brute force attempt. if it's found then will do block mechanism and report to admin
#!/usr/bin/env python
__author__ = 'Imam Omar Mochtar <iomarmochtar@gmail.com>'
"""
Parse zimbra audit file for blocking any massive failed authentication which indicating brute force attempt
this script assume zimbra proxy and mailbox service in same server with original ip (oip) is logged
the attempted IP will listed in AUDITWATCH chain. for applying block combine run this command for add it in INPUT filter
# iptables -t filter -I INPUT -j AUDITWATCH
"""
import re
import sys
import subprocess
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
from pprint import pprint
### VARIABLES BEGIN
AUDIT_FILE = '/opt/zimbra/log/audit.log'
CHAIN = 'AUDITWATCH'
MAX_ATTEMPT = 20
# define white list here
WHITELIST = re.compile( '(%s)' % '|'.join( map(lambda x: x.replace('.', '\.'), [
'192.100'
])))
# list of blocked IP
BLOCKED = set()
# this var will hold parsing result
FAILED_IP = {}
## SMTP
REPORT_FROM = 'admin@mail.com'
REPORT_TO = ['iomarmochtar@gmail.com']
REPORT_SUBJECT = 'Authentication Attempt Report'
REPORT_SMTP = '127.0.0.1'
### VARIABLES END
def run_cmd(command):
return subprocess.Popen(
command, universal_newlines=True, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
# make sure chain are exists
run_cmd("/sbin/iptables -nL {0} || /sbin/iptables -N {0}".format(CHAIN))
# grab listed of blocked IP
output, err = run_cmd("/sbin/iptables -nL {0}".format(CHAIN))
for txt in output.splitlines():
if txt.startswith('DROP'):
brk = txt.split()
if len(brk) != 5:
continue
BLOCKED.add(brk[3])
continue
oip_parser = re.compile(r'oip=(?P<oip>.*?);.*?for\s+\[(?P<username>.*?)\],\s+invalid password;$')
# loop trough audit file to parse any failed authentication, capture OIP params
for line in open(AUDIT_FILE, 'r').readlines():
oip = oip_parser.search(line)
if not oip:
continue
data = oip.groupdict()
ip = data['oip']
# if the IP was listed in existing block and ignore list then skip it
if ip in BLOCKED or WHITELIST.search(ip):
continue
if not FAILED_IP.has_key(ip):
FAILED_IP[ip] = {'users': set(), 'attempt': 0}
FAILED_IP[ip]['users'].add(data['username'])
FAILED_IP[ip]['attempt'] += 1
# input IP in CHAIN if has reach maximum var
is_report = False
text = "<h3><b>List of auth attempt</b></h3> <br/><br/><br/>"
f_cmd = "/sbin/iptables -I {0} -p tcp -s {1} -j DROP"
for ip, data in FAILED_IP.items():
if data['attempt'] <= MAX_ATTEMPT:
continue
is_report = True
cmd = f_cmd.format(CHAIN, ip)
print("IP {0} has reach max attempt {1} ({2})".format(ip, MAX_ATTEMPT, data['attempt']))
run_cmd(cmd)
text += "<b><u>{0} for {1} times</u></b> <br/><ul>".format(ip, data['attempt'])
for user in data['users']:
text += "<li>{0}</li>".format(user)
text += "</ul>"
if not is_report:
sys.exit(0)
print("sending report")
msg = MIMEMultipart()
msg['From'] = REPORT_FROM
msg['To'] = COMMASPACE.join(REPORT_TO)
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = REPORT_SUBJECT
msg.attach(MIMEText(text, 'html'))
msg.attach(MIMEText("Please see this email as HTML", 'plain'))
smtp = smtplib.SMTP(REPORT_SMTP)
smtp.sendmail(REPORT_FROM, REPORT_TO, msg.as_string())
smtp.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment