Skip to content

Instantly share code, notes, and snippets.

@soul9
Created April 4, 2011 12:27
Show Gist options
  • Save soul9/901555 to your computer and use it in GitHub Desktop.
Save soul9/901555 to your computer and use it in GitHub Desktop.
This (unfinished) fail2ban script will adaptively ban recurring dictionary attackers for more and more time
#!/usr/bin/env python
import sqlite3, subprocess, time, random, sys, os
lfile='/var/run/fail2ban/fail2ban-sqlite.lock'
if os.path.exists(lfile):
sys.exit(0)
else:
open(lfile, 'w').close()
dbfile="/var/db/fail2ban.sqlite3"
conn=sqlite3.connect(dbfile)
c=conn.cursor()
def selectbanned():
return c.execute("""SELECT hostip FROM banned WHERE unbantime > STRFTIME('%s', 'now');""")
def selectunbanned():
return c.execute("""SELECT hostip FROM banned WHERE unbantime < STRFTIME('%s', 'now');""")
def cleanup():
c.execute("""DELETE FROM banned WHERE unbantime < STRFTIME('%s', 'now');""")
conn.commit()
def unban_timeup():
unbanned=selectunbanned()
for row in unbanned:
cmd=["/sbin/iptables", "-t", "nat", "-D", "fail2ban-sqlite", "-s", row[0], "-j", "DNAT", "--to-destination", "10.0.0.4"]
subprocess.call(cmd, shell=False)
c.execute("""DELETE FROM banned WHERE hostip=?;""", (row[0], ))
conn.commit()
c.execute("""UPDATE stats SET lastunban=STRFTIME('%s', 'now') WHERE hostip=?;""", (row[0], ))
conn.commit()
def checkstats():
c.execute("""SELECT * FROM stats WHERE numbans > 3 AND lastunban < lastban;""")
for row in c:
c2 = conn.cursor()
c2.execute("""SELECT count(hostip) FROM banned WHERE hostip=?;""", (row[0], ))
if c2.fetchone()[0] == 0:
c2.execute("""INSERT INTO banned VALUES(?, ?);""", (row[0], int(time.time()) + (row[1] ** 2) * 600, ))
conn.commit()
subprocess.call(["/sbin/iptables", "-t", "nat", "-I", "fail2ban-sqlite", "1", "-s", row[0], "-j", "DNAT", "--to-destination", "10.0.0.4"], shell=False)
def start():
subprocess.call(["/sbin/iptables", "-t", "nat", "-N", "fail2ban-sqlite"], shell=False)
subprocess.call(["/sbin/iptables", "-t", "nat", "-A", "fail2ban-sqlite", "-j", "RETURN"], shell=False)
subprocess.call(["/sbin/iptables", "-t", "nat", "-I", "PREROUTING", "-p", "all", "-j", "fail2ban-sqlite"], shell=False)
c.execute("""SELECT count(name) FROM sqlite_master WHERE type='table' AND name='stats';""")
if c.fetchone()[0] == 0:
c.execute("""CREATE TABLE stats(hostip TEXT, numbans INTEGER, lastban INTEGER, lastunban INTEGER);""")
conn.commit()
c.execute("""SELECT count(name) FROM sqlite_master WHERE type='table' AND name='banned';""")
if c.fetchone()[0] == 0:
c.execute("""CREATE TABLE banned(hostip TEXT, unbantime INTEGER);""")
conn.commit()
cleanup()
banned = selectbanned()
for b in banned:
subprocess.call(["/sbin/iptables", "-t", "nat", "-I", "fail2ban-sqlite", "1", "-s", b[0], "-j", "DNAT", "--to-destination", "10.0.0.4"], shell=False)
def banaction(ip):
c.execute("""select count(hostip) from stats where hostip=?;""", (ip, ))
if c.fetchone()[0] == 1:
c.execute("""UPDATE stats SET numbans=numbans+1,lastban=STRFTIME('%s', 'now') WHERE hostip=?;""", (ip, ))
else:
c.execute("""INSERT INTO stats VALUES(?, 1, strftime('%s', 'now'), 0);""", (ip, ))
conn.commit()
unban_timeup()
checkstats()
def unbanaction():
unban_timeup()
checkstats()
def stop():
subprocess.call(["/sbin/iptables", "-t", "nat", "-D", "PREROUTING", "-p", "all", "-j", "fail2ban-sqlite"], shell=False)
subprocess.call(["/sbin/iptables", "-t", "nat", "-F", "fail2ban-sqlite"], shell=False)
subprocess.call(["/sbin/iptables", "-t", "nat", "-X", "fail2ban-sqlite"], shell=False)
def test(numtest):
for i in range(numtest):
banaction('10.10.' + str(random.randint(1, 254)) + '.' + str(random.randint(1, 254)))
if sys.argv[1] == 'start':
start()
elif sys.argv[1] == 'ban':
banaction(sys.argv[2])
elif sys.argv[1] == 'unban':
unbanaction()
elif sys.argv[1] == 'stop':
stop()
os.remove(lfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment