Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Last active May 21, 2023 03:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sjlongland/5a4fd21047cd505b91fc048eb6831049 to your computer and use it in GitHub Desktop.
Save sjlongland/5a4fd21047cd505b91fc048eb6831049 to your computer and use it in GitHub Desktop.
fail2ban-subnet scraper
#!/usr/bin/env python3
"""
Automatically coalesce IPs from fail2ban into subnets and flag them for
banning.
"""
# © 2023 Stuart Longland <me@vk4msl.com>
# SPDX-License-Identifier: BSD-2-Clause
# Usage, run this periodically from `cron`, then configure your
# `fail2ban` instance to actually act on the bans:
#
# ==> /etc/fail2ban/filter.d/fail2ban-subnet.conf <==
# [INCLUDES]
#
# before = common.conf
#
# [Definition]
#
# failregex = fail2ban-subnet-scrape\.py: <SUBNET> is currently misbehaving$
#
# ==> /etc/fail2ban/jail.d/subnet.conf <==
# [subnet]
# enabled = true
# filter = fail2ban-subnet
# banaction= iptables-allports
# logpath = /var/log/messages
# maxretry = 1
#
# Known bugs:
# - scraping /var/log/fail2ban.log is horrible, it'd be nice to retrieve all
# the "recently seen" IPs from the `fail2ban-client` (or better yet, import
# `fail2ban`'s API since it is Python too), but it works.
# - it'd also be nice to "filter out" the subnets currently banned so we can
# avoid the "WARNING: <subnet> is already banned"
import re
import sys
import os
import time
import datetime
import ipaddress
import sqlite3
import hashlib
import logging
import syslog
# fail2ban.log full path
FAIL2BAN_LOG = "/var/log/fail2ban.log"
# Subnet status database
SUBNET_DB = "/var/lib/fail2ban/subnet.sqlite3"
# Min subnet length: we don't block anything bigger than this
# e.g. 16 subnet bits minimum → do not block subnets bigger than a /16
# One for IPv4; the other for IPv6
MIN_SUBNET4_LENGTH = 16
MIN_SUBNET6_LENGTH = 56
# Max subnet length: we don't block anything smaller than this
# One for IPv4; the other for IPv6
MAX_SUBNET4_LENGTH = 28
MAX_SUBNET6_LENGTH = 64
# Ignore these jails
IGNORE_JAILS = set(["subnet"])
# Step size for subnet length
SUBNET_LENGTH_STEP = 4
# Base number of strikes before we ban a subnet
SUBNET_STRIKES = 4
# Adjustment based on subnet size, bigger subnets require more hits
SUBNET_STRIKES_SIZE_FACTOR = 2
# Decrement delay: after N seconds, we will decrement a record that has
# not been seen a second time.
DECREMENT_DELAY = 14400
# How long do we ban whole subnets for?
SUBNET_BAN_EXPIRY = 86400
# fail2ban.log date/time format (we'll ignore milliseconds)
# Assume system local time
LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
# fail2ban.log log format regular expression
LOG_FORMAT_RE = re.compile(
r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} " # log timestamp
r"fail2ban\.filter +\[\d+\]: +INFO +" # logger name and level
# jail name, offending address, daemon log date/time
r"\[([^\[\]]+)\] Found (.+) - (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})$"
)
# Initialise logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("fail2ban.subnet")
# Initialise or open the database
db_new = not os.path.exists(SUBNET_DB)
db = sqlite3.connect(SUBNET_DB)
if db_new:
# Create the tables
db.execute(
"""
CREATE TABLE lastrun (
path TEXT NOT NULL,
firstlinemd5 TEXT,
lastfilepos INTEGER,
mtime INTEGER
);
"""
)
db.execute(
"""
CREATE UNIQUE INDEX lastrun_path ON lastrun(path);
"""
)
db.execute(
"""
CREATE TABLE found_ips (
ip TEXT NOT NULL,
count INTEGER NOT NULL DEFAULT 0,
lastseen INTEGER NOT NULL,
decrement INTEGER NOT NULL
);
"""
)
db.execute(
"""
CREATE UNIQUE INDEX found_ips_ip ON found_ips(ip);
"""
)
# Pick up the last-known state
logfile_firstlinemd5 = None
logfile_lastfilepos = None
logfile_mtime = 0
for (logfile_firstlinemd5, logfile_filelinepos, logfile_mtime) in db.execute(
"""
SELECT firstlinemd5, lastfilepos, mtime
FROM lastrun
WHERE path=?
""",
(FAIL2BAN_LOG,),
):
# Should only be one record
break
# Get the current log file mtime
logfile_mtime_now = os.stat(FAIL2BAN_LOG).st_mtime
if logfile_mtime_now <= logfile_mtime:
# Nothing changed
logger.info("Log file has not changed")
sys.exit(0)
# Scan the log file
ips = {}
for (linenum, line) in enumerate(open(FAIL2BAN_LOG, "r")):
if linenum == 0:
# First line, does it match the MD5 of our existing line
md5hash = hashlib.md5()
md5hash.update(line.encode())
md5 = md5hash.hexdigest()
logger.debug(
"First line MD5: was %s now %s", logfile_firstlinemd5, md5
)
if md5 != logfile_firstlinemd5:
# MD5 has changed, start at the beginning!
logger.info(
"Log file newly rolled over, starting at the beginning."
)
logfile_filelinepos = 0
logfile_firstlinemd5 = md5
else:
logger.info(
"Log file continues, seeking to position %d.",
logfile_filelinepos,
)
if linenum < logfile_filelinepos:
# We've seen this, carry on!
continue
match = LOG_FORMAT_RE.match(line)
if not match:
logger.debug("Not matching: %r", line)
continue
try:
(jail, ip_str, ts_str) = match.groups()
if jail in IGNORE_JAILS:
continue
ts = datetime.datetime.strptime(ts_str, LOG_TIMESTAMP_FORMAT)
ips[ipaddress.ip_address(ip_str)] = int(ts.timestamp())
except ValueError:
# Unparseable data, continue
logger.debug("Not parsed: %r", line, exc_info=1)
continue
if logfile_firstlinemd5 is None:
# File is empty
sys.exit(0)
# Record our position
logfile_lastfilepos = linenum
# Make a note of the IPs we just saw
NOW = int(time.time())
for (ip, ts) in ips.items():
# Did we have a count for this already?
count = 0
for (count,) in db.execute(
"SELECT count FROM found_ips WHERE ip=?;", (str(ip),)
):
# Should be just one record
break
# New values
count += 1
decrement = ts + DECREMENT_DELAY
db.execute(
"""
INSERT INTO found_ips
(ip, count, lastseen, decrement)
VALUES
(?, ?, ?, ?)
ON CONFLICT (ip)
DO UPDATE SET
count=?, lastseen=?, decrement=?;
""",
(str(ip), count, ts, decrement, count, ts, decrement),
)
logger.info(
"%s seen %d time(s), last at %s, decrement at %s",
ip,
count,
ts,
decrement,
)
# Update the last-seen record
db.execute(
"""
INSERT INTO lastrun
(path, firstlinemd5, lastfilepos, mtime)
VALUES
(?, ?, ?, ?)
ON CONFLICT (path)
DO UPDATE SET
firstlinemd5=?, lastfilepos=?, mtime=?
""",
(
FAIL2BAN_LOG,
logfile_firstlinemd5,
logfile_lastfilepos,
logfile_mtime_now,
logfile_firstlinemd5,
logfile_lastfilepos,
logfile_mtime_now,
),
)
logger.info(
"recording file %r firstline=%s, pos=%d, mtime=%s",
FAIL2BAN_LOG,
logfile_firstlinemd5,
logfile_lastfilepos,
logfile_mtime_now,
)
db.commit()
# Decrement those who are behaving
db.execute("UPDATE found_ips SET count=count-1 WHERE decrement < ?;", (NOW,))
# Purge those who have continued behaving
db.execute("DELETE FROM found_ips WHERE count <= 0;")
db.commit()
# Coalesce misbehaving subnets
subnets = {}
for (ip_str, count, lastseen) in db.execute(
"""
SELECT ip, count, lastseen FROM found_ips;
"""
):
ip = ipaddress.ip_address(ip_str)
# IPv4 or IPv6?
if ip.version == 4:
min_length = MIN_SUBNET4_LENGTH
max_length = MAX_SUBNET4_LENGTH
else:
min_length = MIN_SUBNET6_LENGTH
max_length = MAX_SUBNET6_LENGTH
for length in range(
min_length, max_length + SUBNET_LENGTH_STEP, SUBNET_LENGTH_STEP
):
# strict=False → ignore host bits
subnet = ipaddress.ip_network((ip, length), strict=False)
try:
(subnet_count, subnet_lastseen) = subnets[subnet]
except KeyError:
subnet_count = 0
subnet_lastseen = 0
subnet_count += count
subnet_lastseen = max(subnet_lastseen, lastseen)
logger.debug(
"%s seen %d time(s) last at %s",
subnet,
subnet_count,
subnet_lastseen,
)
subnets[subnet] = (subnet_count, subnet_lastseen)
# Figure out who of these are misbehaving
subnet_expiry = NOW - SUBNET_BAN_EXPIRY
misbehaving = set()
for (subnet, (count, lastseen)) in subnets.items():
# IPv4 or IPv6?
if ip.version == 4:
max_length = MAX_SUBNET4_LENGTH
else:
max_length = MAX_SUBNET6_LENGTH
# Size factor for the subnet
size = round((max_length - subnet.prefixlen) / SUBNET_LENGTH_STEP)
logger.debug("%s is size %d", subnet, size)
# Compute number of strikes
strikes = SUBNET_STRIKES + round(pow(SUBNET_STRIKES_SIZE_FACTOR, size))
logger.debug("will accept %d strikes, has %d strikes", strikes, count)
if (count >= strikes) and (lastseen > subnet_expiry):
# Still misbehaving, don't bother looking at smaller subnets
logger.info(
"%s is misbehaving, seen %d time(s), last at %s",
subnet,
count,
lastseen,
)
misbehaving.add(subnet)
# Reduce overlapping subnets to the smallest possible set
done = False
while not done:
done = True
logger.debug(
"Cleaning up overlapping subnets: %d subnet(s)", len(misbehaving)
)
for subnet in list(misbehaving):
children = [
s
for s in misbehaving
if (subnet is not s) and subnet.supernet_of(s)
]
num_children = len(children)
if num_children == 1:
# Keep the child, ditch the parent
logger.debug("Keeping %s in favour of %s", children[0], subnet)
misbehaving.discard(subnet)
done = False
break
elif num_children > 1:
# Prefer the parent since it's clear _multiple_ addresses in this
# subnet are behaving badly.
logger.debug(
"Keeping %s in favour of %s",
subnet,
", ".join([str(s) for s in children]),
)
for s in children:
misbehaving.discard(s)
done = False
break
if misbehaving:
logger.info(
"All misbehaving subnets: %s",
", ".join([str(s) for s in misbehaving]),
)
for subnet in misbehaving:
syslog.syslog(
syslog.LOG_NOTICE | syslog.LOG_DAEMON,
"%s is currently misbehaving" % subnet,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment