Skip to content

Instantly share code, notes, and snippets.

@dimiro1
Last active January 1, 2016 10:48
Show Gist options
  • Save dimiro1/8133516 to your computer and use it in GitHub Desktop.
Save dimiro1/8133516 to your computer and use it in GitHub Desktop.
Este script ajuda a proteger seus servidores de ataques DOS. Uso netstat para descobrir a quantidade de conexões que determinado IP tem aberto, e uso o iptables como firewall. O script deve rodar como ROOT, já que precisa adicionar regras no firewall. USO: Rode o script com o supervisor, upstart ou systemd, ele tem várias opções de configuração …
#!/usr/bin/env python
# Copyright (C) 2013 by Claudemiro Alves Feitosa Neto
# <dimiro1@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licences>
# This script must be executed as root.
# Run with Supervisor.
import time
import argparse
import subprocess
from functools import wraps
import logging
NETSTAT = "/usr/bin/env netstat"
IPTABLES = "/usr/bin/env iptables"
# When an ip reach this limit, it will be droppped
MAX_CONNECTIONS = 100
# Check interval in seconds
INTERVAL = 3
# Ips in this list will not be dropped
WHITELIST = ['127.0.0.1']
def add_to_whitelist(ip):
"""Append an IP to whitelist
"""
WHITELIST.append(ip)
def run_in_a_shell(f):
"""A Helper decorator to run a shell command.
"""
@wraps(f)
def wrapper(*args):
return subprocess.check_output(f(*args), shell=True)
return wrapper
@run_in_a_shell
def iptables(args):
return "%s %s" % (IPTABLES, args)
@run_in_a_shell
def netstat(args):
return "%s %s" % (NETSTAT, args)
def drop(ip):
"""Run the iptables DROP command
"""
if not ip in WHITELIST:
command = "-A INPUT -s %s -j DROP" % ip
logging.debug(command)
iptables(command)
else:
logging.debug("IP in WHITELIST")
def free(ip):
"""Run the iptables DROP command with a -D flag
"""
command = '-D INPUT -s %s -j DROP' % ip
logging.debug(command)
iptables(command)
def check_dropped(ip):
logging.debug('Chcking Droppped')
return list_dropped().find(ip) >= 0
def list_dropped():
logging.debug('Listing Dropped')
return iptables("-L INPUT -v -n | awk '{print $8}' | tail -n +3")
def step():
netstat_ips = netstat(
"-ntu | tail -n +3 | awk '{print $5}' | cut -d: -f1 | sort | uniq -c | sort -nr").strip()
for line in netstat_ips.split("\n"):
conections, ip = line.strip().split(" ")
if int(conections) > MAX_CONNECTIONS:
if not check_dropped(ip):
logging.info("Dropping %s" % ip)
drop(ip)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-r', '--run', action='store_true', help='Run')
parser.add_argument(
'-l', '--list', action='store_true', help='List dropped ips')
parser.add_argument(
'-d', '--drop', nargs='+', help='Drop ip', metavar='ip')
parser.add_argument(
'-f', '--free', nargs='+', help='Free ip', metavar='ip')
parser.add_argument(
'-w', '--whitelist', nargs='+', help='Whitelist', metavar='ip')
parser.add_argument(
'-i', '--interval', help='Interval, in seconds', type=int)
parser.add_argument(
'-m', '--max-connections', help='Max Connections', type=int)
parser.add_argument(
'-c', '--check', help='Check ip', metavar='ip')
parser.add_argument(
'--netstat-bin', help='Netstat Binary', metavar='location')
parser.add_argument(
'--iptables-bin', help='Iptables Binary', metavar='location')
args = parser.parse_args()
logging.debug(args)
# NETSTAT BINARY
if args.netstat_bin is not None:
NETSTAT = args.netstat_bin
# IPTABLES BINARY
if args.iptables_bin is not None:
IPTABLES = args.iptables_bin
# INTERVAL
if args.interval is not None:
INTERVAL = args.interval
# MAX CONNECTIONS
if args.max_connections is not None:
MAX_CONNECTIONS = args.max_connections
# DROP
if args.drop is not None:
for ip in args.drop:
drop(ip)
# FREE
if args.free is not None:
for ip in args.free:
free(ip)
# WHITELIST
if args.whitelist is not None:
for ip in args.whitelist:
add_to_whitelist(ip)
# LIST DROPPED
if args.list:
print list_dropped()
# CHECK IP
if args.check:
print check_dropped(args.check)
# RUN
if args.run:
while True:
step()
time.sleep(INTERVAL)
3 80.81.82.83
101 20.21.22.23
290 90.91.92.93
1 50.51.52.53
import unittest
from mock import patch
from dosblocker import drop, check_dropped, free, list_dropped, add_to_whitelist, netstat, step
class DosBlockerTest(unittest.TestCase):
def test_drop(self):
drop('10.11.12.13')
self.assertEqual(True, check_dropped('10.11.12.13'))
free('10.11.12.13')
def test_drop_in_whitelist(self):
drop("127.0.0.1")
self.assertEqual(False, check_dropped('127.0.0.1'))
def test_add_to_whitelist(self):
add_to_whitelist('12.13.14.15')
drop('12.13.14.15')
self.assertEqual(False, check_dropped('12.13.14.15'))
def test_free(self):
drop('10.11.12.13')
self.assertEqual(True, check_dropped('10.11.12.13'))
free('10.11.12.13')
self.assertEqual(False, check_dropped('10.11.12.13'))
def test_list_droppped(self):
drop('10.11.12.13')
drop('11.12.13.14')
self.assertEqual("10.11.12.13\n11.12.13.14\n", list_dropped())
free('10.11.12.13')
free('11.12.13.14')
@patch('dosblocker.netstat', return_value=open('netstat.txt').read())
def test_step(self, *args):
step()
self.assertEqual(True, check_dropped("20.21.22.23"))
self.assertEqual(True, check_dropped("90.91.92.93"))
self.assertEqual(False, check_dropped("80.81.82.83"))
self.assertEqual(False, check_dropped("50.51.52.53"))
free("20.21.22.23")
free("90.91.92.93")
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment