Skip to content

Instantly share code, notes, and snippets.

@Sebclem
Last active April 26, 2022 17:04
Show Gist options
  • Save Sebclem/e1e0b386001fb6e18cd55b45a4335fe5 to your computer and use it in GitHub Desktop.
Save Sebclem/e1e0b386001fb6e18cd55b45a4335fe5 to your computer and use it in GitHub Desktop.
Telegraf sshd login fail metrics with GeoIP
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import sys
import os
from datetime import datetime
import subprocess
def get_lines():
'''
Get the last lines that have not been already send to telegraph
'''
log_file = open('/var/log/auth.log', 'r')
last_line = '11111'
# Get the previous last line if the file exist
if os.path.isfile('/etc/telegraf/.lastline'):
with open('/etc/telegraf/.lastline', 'r') as l_line_file:
last_line = l_line_file.readline()
to_return = []
for line in log_file:
if 'Failed' in line:
# empty the list if we found the last prefious line
# We have alredy send the previous lines
if line.replace('\n', '') == last_line.replace('\n', ''):
to_return = []
else:
to_return.append(line)
# Stop here if the list is empty
if len(to_return) == 0:
sys.exit(0)
return to_return
def main():
lines = get_lines()
for line in lines:
m = re.search(
'(...\s+\d+ \d\d:\d\d:\d\d).*Failed password for (?:invalid user )?\\b(.+)\\b from ([0-9]*\.[0-9]*\.[0-9]*\.[0-9]*)', line)
if m is not None:
ip = m.groups()[2]
user = m.groups()[1]
time = m.groups()[0]
time = time.replace(' ', ' ')
time_python = datetime.strptime(time, "%b %d %H:%M:%S")
geo = ip_info(ip)
now = datetime.now()
time_python = time_python.replace(year=now.year)
if time_python > now:
time_python = time_python.replace(
year=time_python.year - 1)
timestamp = int(time_python.timestamp()*1000000000)
print('sshd_fail,goe=' + geo + ',user=' + user +',ip=' + ip + ' user="' + user +
'",ip="' + ip + '",geo="' + geo + '" ' + str(timestamp))
# Save the last line for the next execution
with open('/etc/telegraf/.lastline', 'w') as l_line_file:
l_line_file.write(lines[-1])
return 0
def ip_info(addr=''):
import geoip2.database
if addr == '':
return "Unknown"
with geoip2.database.Reader('/usr/share/GeoIP/GeoLite2-Country.mmdb') as reader:
try:
result = reader.country(addr)
if isinstance(result.country.iso_code, str):
return result.country.iso_code
else:
return "Unknown"
except Exception as e:
return "Unknown"
if __name__ == '__main__':
sys.exit(main())
  1. Frist you need to give read access to /var/log/auth.log to the telegraf user.
  2. Download the following python script.
  3. Add this lines to your telegraf.conf file:
    [[inputs.exec]]
    commands = [
      "python3 /etc/telegraf/geoip.py"
    ]
    timeout = "40s"
    data_format = "influx"
    
  4. Test before run: sudo -u telegraf telegraf --test --input-filter exec

    Note: Don't run the test as root, the script create a file, if this file is create as root, the telegraf user will not be able to open it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment