Last active
April 2, 2018 12:34
-
-
Save manuthu/499b934646f09edba1cd07983ed08a71 to your computer and use it in GitHub Desktop.
Block IP with a connection to the server on Port 80, 443, 22. If there are more than specified connections, use iptables drop the connections.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/root/env/bin/python | |
import argparse | |
import iptc | |
import logging | |
import time | |
from bash import bash | |
from ipwhois import IPWhois | |
logging.basicConfig(level=logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s %(name)s %(message)s') | |
h = logging.FileHandler('/var/log/ipblock.log') | |
h.setLevel(logging.DEBUG) | |
h.setFormatter(formatter) | |
logger = logging.getLogger('::IPBLOCK::') | |
logger.addHandler(h) | |
# Command to get the number of connections | |
COMMAND = "netstat -an | egrep ':80|:443|:22' | awk {'print $5'} | awk -F : '{print $1}' | sort | uniq -c | sort -n | tail" | |
def get_offending_ips(threshold=10): | |
#logger.info('Get offending IPS ==>>') | |
offenders = list() | |
output = bash(COMMAND) | |
for line in output.value().splitlines(): | |
count, ip = line.strip().split(' ') | |
if int(count) >= threshold: | |
offenders.append(ip.strip()) | |
return offenders | |
def get_ip_range(ip): | |
logger.info('Get IP range ==>> %s', ip) | |
obj = IPWhois(ip) | |
whois = obj.lookup_whois() | |
whoisrange = whois.get('nets')[0].get('range') | |
return '-'.join([i.strip() for i in whoisrange.split('-') if i]) | |
def block_iprange(iprange): | |
logger.info('Block IP Range ==>> %s', iprange) | |
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") | |
rule = iptc.Rule() | |
match = iptc.Match(rule, "iprange") | |
match.src_range = iprange | |
rule.add_match(match) | |
rule.target = iptc.Target(rule, "DROP") | |
return chain.insert_rule(rule) | |
def block_ip(ip): | |
logger.debug('Blocking IP ===>>> %s', ip) | |
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT") | |
rule = iptc.Rule() | |
rule.src = ip | |
rule.target = iptc.Target(rule, "DROP") | |
return chain.insert_rule(rule) | |
def run(connection_threshold=100, should_block_range=False, should_exit=True, frequency=10): | |
while True: | |
time.sleep(frequency) # Sleep for frequency in seconds | |
blocked_offenders = list() | |
offenders = get_offending_ips(threshold=connection_threshold) | |
for offender in offenders: | |
offender = offender.strip() | |
logger.info('Possible Offender ===>>> %s', offender) | |
if offender in blocked_offenders: | |
logger.info('Offender already blocked continue ===>>> %s', offender) | |
continue | |
if should_block_range: | |
try: | |
iprange = get_ip_range(offender) | |
except Exception as e: | |
logger.error(e) | |
continue | |
block_iprange(iprange) | |
else: | |
try: | |
block_ip(offender) | |
except Exception as e: | |
logger.error(e) | |
continue | |
logger.info('Blocked Offender ===>>> %s', offender) | |
blocked_offenders.append(offender) | |
# Exit clause if you just want to exit immediately | |
if should_exit: | |
exit(0) | |
def add_args(): | |
parser = argparse.ArgumentParser(prog='IP Blocker :: ') | |
parser.add_argument( | |
'-i', | |
'--ip', | |
default=argparse.SUPPRESS, | |
help='IP to block') | |
parser.add_argument( | |
'-I', | |
'--iprange', | |
default=argparse.SUPPRESS, | |
help='IP to block. This blocks the entire range') | |
parser.add_argument( | |
'-c', | |
'--connections', | |
default=argparse.SUPPRESS, | |
help='The connections to port 80,443,22 from a single IP considered offensive') | |
parser.add_argument( | |
'-B', | |
'--background', | |
action='store_true', | |
default=argparse.SUPPRESS, | |
help='Daemon mode. Run in the background') | |
parser.add_argument( | |
'-f', | |
'--frequency', | |
default=argparse.SUPPRESS, | |
help='Should be set while running in daemon mode. How often to run') | |
return parser | |
if __name__ == '__main__': | |
parser = add_args() | |
parsed_args = parser.parse_args() | |
logger.info('Raw Args %s', parsed_args) | |
#args = vars(parsed_args) | |
args = {arg[0]:arg[1] for arg in vars(parsed_args).items() if arg[1]} | |
logger.info('Parsed Args %s', args) | |
should_block_range = args.get('iprange') | |
should_exit = not args.get('background') | |
connection_threshold = int(args.get('connections', 100)) | |
frequency = int(args.get('frequency', 10)) | |
ip = args.get('ip') | |
if ip: | |
logger.info('Blocking IP ===>>> %s', ip) | |
block_ip(ip) | |
else: | |
logger.info('---- Run Blocker ----') | |
run( | |
connection_threshold=connection_threshold, | |
should_block_range=should_block_range, | |
should_exit=should_exit, | |
frequency=frequency) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage:
Example