Skip to content

Instantly share code, notes, and snippets.

@manuthu
Created December 4, 2018 16:03
Show Gist options
  • Save manuthu/401e4c340cdfd26eda181be747278d26 to your computer and use it in GitHub Desktop.
Save manuthu/401e4c340cdfd26eda181be747278d26 to your computer and use it in GitHub Desktop.
IP Block with signal handling
#!/opt/venv/ipblock/env/bin/python
__author__ = 'Mike Manuthu <muragumichael@gmail.com>'
"""
A simple script to disable the number of connections to the server.
Uses IPTables to block the IPS with more than the threshold connections.
Uses Fail2Ban as the default IP Blocker.
usage: IP Blocker :: [-h] [-i IP] [-I IPRANGE] [-c CONNECTIONS] [-B]
[-f FREQUENCY]
optional arguments:
-h, --help show this help message and exit
-i IP, --ip IP IP to block
-I IPRANGE, --iprange IPRANGE
IP to block. This blocks the entire range
-c CONNECTIONS, --connections CONNECTIONS
The connections to port 80,443,22 from a single IP
considered offensive
-B, --background Daemon mode. Run in the background
-f FREQUENCY, --frequency FREQUENCY
Should be set while running in daemon mode. How often
to run
Example:
./ipblock.py -i xxx.xx.xx.xxxx
"""
import argparse
import iptc
import logging
import signal
import time
from bash import bash
from ipwhois import IPWhois
LOG_FILE = '/var/log/ipblock.log'
COMMAND = "netstat -an | egrep ':80|:443|:22' | awk {'print $5'} | awk -F : '{print $1}' | sort | uniq -c | sort -n "
WHITELIST = ['127.0.01', ]
logging.basicConfig(level=logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(name)s %(message)s')
h = logging.FileHandler(LOG_FILE)
h.setLevel(logging.DEBUG)
h.setFormatter(formatter)
logger = logging.getLogger('::IPBLOCK::')
logger.addHandler(h)
class GracefulInterruptHandler(object):
def __init__(self, signals=(signal.SIGINT, signal.SIGTERM)):
self.signals = signals
self.original_handlers = {}
def __enter__(self):
self.interrupted = False
self.released = False
for sig in self.signals:
self.original_handlers[sig] = signal.getsignal(sig)
signal.signal(sig, self.handler)
return self
def handler(self, signum, frame):
self.release()
self.interrupted = True
def __exit__(self, type, value, tb):
self.release()
def release(self):
if self.released:
return False
for sig in self.signals:
signal.signal(sig, self.original_handlers[sig])
self.released = True
return True
def get_offending_ips(threshold=10):
#logger.info('Get offending IPS ==>>')
offenders = list()
output = bash(COMMAND)
for line in output.value().splitlines():
#print 'Got line ===>>> %s', line
try:
count, ip = line.strip().split(' ')
except:
continue
if int(count) >= threshold:
offenders.append(ip.strip())
return offenders
def get_ip_range(ip):
logger.info('Get IP range ==>> %s', ip)
if ip in WHITELIST:
logger.info('Operation not permitted! Range for IP in whitelist not allowed ==> %s', offender)
return
obj = IPWhois(ip)
whois = obj.lookup_whois()
whoisrange = whois.get('nets')[0].get('range')
return '-'.join([i.strip() for i in whoisrange.split('-') if i])
def block_iprange(iprange):
logger.info('Block IP Range ==>> %s', iprange)
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
rule = iptc.Rule()
match = iptc.Match(rule, "iprange")
match.src_range = iprange
rule.add_match(match)
rule.target = iptc.Target(rule, "DROP")
return chain.insert_rule(rule)
def block_ip(ip, should_use_fail2ban=True):
if ip in WHITELIST:
logger.info('Operation not permitted! IP cannot be banned ==> %s', offender)
return
if should_use_fail2ban:
logger.debug('Blocking IP using Fail2Ban ===>>> %s', ip)
return block_ip_fail2ban(ip)
logger.debug('Blocking IP ===>>> %s', ip)
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
rule = iptc.Rule()
rule.src = ip
rule.target = iptc.Target(rule, "DROP")
return chain.insert_rule(rule)
def block_ip_fail2ban(ip):
cmd = 'fail2ban-client -vvv set ipblock banip %s' % ip
output = bash(cmd)
return output.value()
def mk_handler():
''' Dummy handler. Pass the handler to properly handle the signals '''
class _handler(object):
interrupted = False
return _handler()
def run(
connection_threshold=100,
should_block_range=False,
should_exit=True,
frequency=10,
handler=None):
if not handler:
handler = mk_handler()
while True:
if handler.interrupted:
logger.warning('Interrupted. Exiting gracefully')
exit(0)
time.sleep(frequency) # Sleep for frequency in seconds
blocked_offenders = list()
try:
offenders = get_offending_ips(threshold=connection_threshold)
except Exception as e:
offenders = list()
logger.error(e)
for offender in offenders:
offender = offender.strip()
logger.info('Possible Offender ===>>> %s', offender)
if offender in blocked_offenders:
logger.info('Offender already blocked continue ===>>> %s', offender)
continue
# We dont want to block the server and ourselves from the server.
if offender in WHITELIST:
logger.info('Offender in Whitelist ==> %s', offender)
continue
if should_block_range:
try:
iprange = get_ip_range(offender)
except Exception as e:
logger.error(e)
continue
block_iprange(iprange)
else:
try:
block_ip(offender)
except Exception as e:
logger.error(e)
continue
logger.info('Blocked Offender ===>>> %s', offender)
blocked_offenders.append(offender)
# Exit clause if you just want to exit immediately
if should_exit:
exit(0)
def add_args():
parser = argparse.ArgumentParser(prog='IP Blocker :: ')
parser.add_argument(
'-i',
'--ip',
default=argparse.SUPPRESS,
help='IP to block')
parser.add_argument(
'-I',
'--iprange',
default=argparse.SUPPRESS,
help='IP to block. This blocks the entire range')
parser.add_argument(
'-c',
'--connections',
default=argparse.SUPPRESS,
help='The connections to port 80,443,22 from a single IP considered offensive')
parser.add_argument(
'-B',
'--background',
action='store_true',
default=argparse.SUPPRESS,
help='Daemon mode. Run in the background')
parser.add_argument(
'-f',
'--frequency',
default=argparse.SUPPRESS,
help='Should be set while running in daemon mode. How often to run')
return parser
if __name__ == '__main__':
parser = add_args()
parsed_args = parser.parse_args()
logger.info('Raw Args %s', parsed_args)
#args = vars(parsed_args)
args = {arg[0]:arg[1] for arg in vars(parsed_args).items() if arg[1]}
logger.info('Parsed Args %s', args)
should_block_range = args.get('iprange')
should_exit = not args.get('background')
connection_threshold = int(args.get('connections', 100))
frequency = int(args.get('frequency', 10))
ip = args.get('ip')
iprange = args.get('iprange')
if ip:
logger.info('Blocking IP ===>>> %s', ip)
block_ip(ip)
elif iprange:
logger.info('Blocking IP Range ===>>> %s', ip)
block_iprange(get_ip_range(iprange))
else:
logger.info('---- Run Blocker ----')
with GracefulInterruptHandler() as handler:
run(
connection_threshold=connection_threshold,
should_block_range=should_block_range,
should_exit=should_exit,
frequency=frequency,
handler=handler)
logger.info('Done !!')
@manuthu
Copy link
Author

manuthu commented Dec 4, 2018

Integrating with Fail2Ban

def block_ip_fail2ban(ip):
    cmd = 'fail2ban-client -vvv set ipblock banip %s' % ip
    output = bash(cmd)
    return output.value()

Had to manually create the necessary configs

Created a custom config

(env)[root@simba ~]# cat /etc/fail2ban/jail.d/jail.ipblock.local
[ipblock]
port    = http,https
bantime = 3600
enabled = true

Create a filter for the jail

(env)[root@simba ~]# cat /etc/fail2ban/filter.d/ipblock.conf
[INCLUDES]
before = common.conf

[Definition]

ignoreregex =

Testing if command line works

(env)[root@simba ~]# fail2ban-client -vvv set ipblock banip 196.106.242.22
INFO   Loading configs for fail2ban under /etc/fail2ban
DEBUG  Reading configs for fail2ban under /etc/fail2ban
DEBUG  Reading config files: /etc/fail2ban/fail2ban.conf
INFO     Loading files: ['/etc/fail2ban/fail2ban.conf']
Level 7     Reading file: /etc/fail2ban/fail2ban.conf
INFO     Loading files: ['/etc/fail2ban/fail2ban.conf']
Level 7     Shared file: /etc/fail2ban/fail2ban.conf
INFO   Using socket file /var/run/fail2ban/fail2ban.sock
DEBUG  OK : '196.106.242.22'
DEBUG  Beautify '196.106.242.22' with ['set', 'ipblock', 'banip', '196.106.242.22']
196.106.242.22

@manuthu
Copy link
Author

manuthu commented Dec 4, 2018

[root@simba ~]# ipblock -i 145.255.3.201
INFO:::IPBLOCK:::Raw Args Namespace(ip='145.255.3.201')
INFO:::IPBLOCK:::Parsed Args {'ip': '145.255.3.201'}
INFO:::IPBLOCK:::Blocking IP ===>>> 145.255.3.201
DEBUG:::IPBLOCK:::Blocking IP using Fail2Ban ===>>> 145.255.3.201
INFO:::IPBLOCK:::Done !!
[root@simba ~]# iptables -L -n -v | grep 145.255.3.201
    0     0 REJECT     all  --  *      *       145.255.3.201       0.0.0.0/0            reject-with icmp-port-unreachable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment