Skip to content

Instantly share code, notes, and snippets.

@foxx
Created May 27, 2014 12:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save foxx/120cfe6dab28516e73e0 to your computer and use it in GitHub Desktop.
Save foxx/120cfe6dab28516e73e0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import re
import datetime
import smtplib
import itertools
from email.mime.text import MIMEText
class ScanAuthLog(object):
"""
ssh auth.log scanner
Scans ssh authentication log and sends a few detected failures via email.
This script should NOT be used in production for the following reasons;
* String matching rules have only been checked against Ubuntu 12.x, there
are no unit tests, no additional checks, no malformed string checks,
and will almost certainly crash if more data is thrown at it
* Log scanning should never use regex. The amount of data being parsed usually
means its better to custom write the parsers using string extraction, manual
index offsets, and hinting. This gets more noticeable when scanning huge
log files, e.g. 20GB+
* Sending log events via email is not production worthy. Seriously, Splunk or
any other log collection service would be much more suited for the job
* Fail2ban and other daemons already handle a lot of the necessary parsing and
string detection, they will be more mature and also handle email notifications.
* This does not work with multiple lines, it's not optimized for memory usage,
and would probably crash and burn if used on anything more than 1 file
* No pid state, and no duplication detection (e.g. if i run twice, i get the same
results twice). The longer the file grows, the more results you'll get. You could
store last known ts or byte offset in a state file, but again, why bother when you
have better tools that can already do this
"""
# Define once, speed improvement
re_ts_match = re.compile(r'(?P<timestamp>\w+ \w+ \d{2}:\d{2}:\d{2}) (?P<host>.+) (?P<proc>.+)', re.IGNORECASE)
# Failed password for invalid user user1 from 127.0.0.1 port 53024 ssh2
# Failed password for root from 127.0.0.1 port 53025 ssh2
re_match1 = re.compile(r'Failed password for (?:invalid user |)(?P<username>.+) from (?P<host>.+) port (?P<port>.+) (?P<service>.+)', re.IGNORECASE)
# Invalid user user1 from 127.0.0.1
re_match2 = re.compile(r'Invalid user (?P<username>.+) from (?P<host>.+)', re.IGNORECASE)
# input_userauth_request: invalid user roohfhfdhfdhfdt [preauth]
re_match3 = re.compile(r'input_userauth_request: invalid user (?P<username>.+) \[preauth\]', re.IGNORECASE)
def check(self, path):
results = []
with open(path, 'rb') as fh:
for line in fh.readlines():
try:
r = self.process_line(line)
if r:
results.append(r)
except:
print "Error processing line: " + line
raise
# create simple email
e_subject = "Authentication failures: %s" % ( datetime.datetime.now(), )
e_from = "XXX"
e_to = "XXX"
e_msg = "Please see failed auth events below\r\n"
# build email text, grouped by description
skey = lambda x: x['error_desc']
results = sorted(results, key=skey)
for error_desc, events in itertools.groupby(results, skey):
e_msg += "\r\nEvent type: %s\r\n" % ( error_desc, )
for event in events:
items = []
items += [ "=".join(x) for x in event['info'].items() ]
items += [ "=".join(x) for x in event['meta'].items() ]
msg_line = " " + ", ".join(items) + "\r\n"
e_msg += msg_line
# create email
msg = MIMEText(e_msg)
msg['Subject'] = e_subject
msg['From'] = e_from
msg['To'] = e_to
s = smtplib.SMTP('localhost')
s.sendmail(e_from, [e_to], msg.as_string())
s.quit()
def process_line(self, line):
# break into components
assert line.count(": "), "invalid line"
line = line.strip()
part1, part2 = line.split(": ", 1)
# no datetime parsing, not without modifying the ssh logger
meta = self.re_ts_match.match(part1).groupdict()
# avoid hitting regex on each line (hinting)
if 'Failed password' in part2:
m = self.re_match1.match(part2)
assert m, "invalid match"
return dict(error_desc="Failed password authentication", meta=meta, info=m.groupdict())
elif 'Invalid user' in part2:
m = self.re_match2.match(part2)
assert m, "invalid match"
return dict(error_desc="Invalid user", meta=meta, info=m.groupdict())
elif 'input_userauth_request: invalid user' in part2:
m = self.re_match3.match(part2)
assert m, "invalid match"
return dict(error_desc="Invalid user", meta=meta, info=m.groupdict())
#(on another screen testing regex, sec)
if __name__ == '__main__':
s = ScanAuthLog()
s.check('/var/log/auth.log')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment