Created
January 6, 2017 11:27
-
-
Save techouse/601ca78d88aff3ece751adaa026a36fa to your computer and use it in GitHub Desktop.
Add a set of IPs to an ipset. Requires RHEL/CentOS/Fedora Linux, FirewallD and IPset.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
__author__ = "Klemen Tušar" | |
__email__ = "techouse@gmail.com" | |
__copyright__ = "GPL" | |
__version__ = "1.0.2" | |
__date__ = "2017-01-06" | |
__status__ = "Production" | |
import re, subprocess | |
from urllib.request import urlopen, Request, URLError | |
from datetime import datetime | |
from random import choice | |
class Blacklist: | |
def __init__(self, **kwargs): | |
self._url = kwargs.get('url', None) | |
self._file = kwargs.get('file', None) | |
self._text = kwargs.get('text', None) | |
self._whitelist = tuple() | |
self._timeout = 3600 | |
self._ipset = False | |
self._ipv4_pattern = re.compile('^[\d+\.]+(/\d+)?') | |
self._useragents = ( | |
'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36', | |
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36', | |
'Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36', | |
'Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10; rv:33.0) Gecko/20100101 Firefox/33.0', | |
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20130401 Firefox/31.0', | |
'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0', | |
'Opera/9.80 (X11; Linux i686; Ubuntu/14.10) Presto/2.12.388 Version/12.16', | |
'Opera/9.80 (Windows NT 6.0) Presto/2.12.388 Version/12.14', | |
'Opera/12.80 (Windows NT 5.1; U; en) Presto/2.10.289 Version/12.02', | |
'Opera/12.0(Windows NT 5.2;U;en)Presto/22.9.168 Version/12.00', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A', | |
'Mozilla/5.0 (iPad; CPU OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5355d Safari/8536.25', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_8) AppleWebKit/537.13+ (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/534.55.3 (KHTML, like Gecko) Version/5.1.3 Safari/534.53.10', | |
'Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko', | |
'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)', | |
'Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)', | |
'Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; GTB7.4; InfoPath.2; SV1; .NET CLR 3.3.69573; WOW64; en-US)', | |
'Mozilla/5.0 (Windows; U; MSIE 7.0; Windows NT 6.0; en-US)' | |
) | |
self._bin = { | |
'firewall-cmd': '/usr/bin/firewall-cmd', | |
'ipset': '/usr/sbin/ipset' | |
} | |
if not self._bin['firewall-cmd']: | |
raise OSError('firewall-cmd binary missing! Unable to continue.') | |
if not self._bin['ipset']: | |
raise OSError('ipset binary missing! Unable to continue.') | |
self._ports = (80, 443) | |
@property | |
def url(self): | |
return self._url | |
@url.setter | |
def url(self, url): | |
self._url = url | |
@url.deleter | |
def url(self): | |
self._url = False | |
@property | |
def file(self): | |
return self._file | |
@file.setter | |
def file(self, file): | |
self._file = file | |
@file.deleter | |
def file(self): | |
self._file = False | |
@property | |
def text(self): | |
return self._text | |
@text.setter | |
def text(self, text): | |
self._text = text | |
@text.deleter | |
def text(self): | |
self._text = False | |
@property | |
def whitelist(self): | |
return self._whitelist | |
@whitelist.setter | |
def whitelist(self, whitelist): | |
if isinstance(whitelist, tuple): | |
self._whitelist = whitelist | |
else: | |
raise TypeError('whitelist must be a tuple') | |
@whitelist.deleter | |
def whitelist(self): | |
self._whitelist = tuple() | |
@property | |
def ipset(self): | |
return self._ipset | |
@ipset.setter | |
def ipset(self, ipset): | |
self._ipset = ipset | |
@ipset.deleter | |
def ipset(self): | |
self._ipset = False | |
@property | |
def timeout(self): | |
return self._timeout | |
@timeout.setter | |
def timeout(self, timeout): | |
self._timeout = int(timeout) | |
@timeout.deleter | |
def timeout(self): | |
self._timeout = 3600 | |
@property | |
def ports(self): | |
return self._ports | |
@ports.setter | |
def ports(self, ports): | |
if isinstance(ports, tuple): | |
self._ports = ports | |
else: | |
raise TypeError('ports must be a tuple') | |
@ports.deleter | |
def ports(self): | |
self._ports = (80, 443) | |
def _get_new_ips(self): | |
if self._url: | |
try: | |
with urlopen(Request(self._url, headers={'User-Agent': choice(self._useragents)})) as response: | |
for ip in response.read().decode('utf-8').splitlines(): | |
if ip.strip(): | |
ipv4 = self._ipv4_pattern.match(ip) | |
if ipv4: | |
yield ipv4.group().strip() | |
except URLError as err: | |
raise ValueError('Invalid URL: {}'.format(err)) | |
elif self._file: | |
try: | |
with open(self._file, 'r') as fh: | |
for ip in fh: | |
if ip.strip(): | |
ipv4 = self._ipv4_pattern.match(ip) | |
if ipv4: | |
yield ipv4.group().strip() | |
except IOError as err: | |
raise IOError('Error reading file {}: {}'.format(self._file, err)) | |
elif self._text: | |
for ip in self._text.splitlines(): | |
if ip.strip(): | |
ipv4 = self._ipv4_pattern.match(ip) | |
if ipv4: | |
yield ipv4.group().strip() | |
else: | |
raise ValueError('Please provide a URL, FILE or STDIN lines of data!') | |
def _ipset_exists(self): | |
if not self._ipset: | |
raise ValueError('Invalid ipset name!') | |
return False if subprocess.getoutput('{} list {}'.format( | |
self._bin['ipset'], | |
self._ipset) | |
).find('The set with the given name does not exist') != -1 else True | |
def _create_ipset(self): | |
if not self._ipset: | |
raise ValueError('Invalid ipset name!') | |
return subprocess.call( | |
'{} create {} hash:net timeout {}'.format( | |
self._bin['ipset'], | |
self._ipset, | |
self._timeout | |
), | |
shell=True | |
) | |
def _rule_exists(self): | |
if not self._ipset: | |
raise ValueError('Invalid ipset name!') | |
return True if subprocess.getoutput( | |
'{} --direct --query-rule ipv4 filter INPUT_direct 0 -p tcp -m multiport --dports {} -m set --match-set {} src -j DROP'.format( | |
self._bin['firewall-cmd'], | |
','.join(str(port).strip() for port in self._ports), | |
self._ipset | |
) | |
) == 'yes' else False | |
def _create_rule(self): | |
if not self._ipset: | |
raise ValueError('Invalid ipset name!') | |
return subprocess.call( | |
'{} --direct --add-rule ipv4 filter INPUT_direct 0 -p tcp -m multiport --dports {} -m set --match-set {} src -j DROP'.format( | |
self._bin['firewall-cmd'], | |
','.join(str(port).strip() for port in self._ports), | |
self._ipset | |
), | |
shell=True | |
) | |
def _reject_ipset(self, ip): | |
if not self._ipset: | |
raise ValueError('Invalid ipset name!') | |
return subprocess.call( | |
'{} add {} {} timeout {} -exist'.format( | |
self._bin['ipset'], | |
self._ipset, | |
ip, | |
self._timeout | |
), | |
shell=True | |
) | |
def process(self): | |
if not self._ipset_exists(): | |
self._create_ipset() | |
if not self._rule_exists(): | |
self._create_rule() | |
for ip in self._get_new_ips(): | |
if ip not in self._whitelist: | |
self._reject_ipset(ip) | |
print('COMPLETED: {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) | |
def main(): | |
import sys, argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--url', dest='url', default=None, help='URL to a list of IPs') | |
parser.add_argument('--file', dest='file', default=None, help='A text file with a list of IPs') | |
parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help='A list of IPs') | |
parser.add_argument('--ipset', dest='ipset', default=None, help='ipset name', required=True) | |
parser.add_argument('--ports', dest='ports', nargs='*', default=(80, 443), help='Affected ports') | |
parser.add_argument('--timeout', dest='timeout', default=3600, help='Ban timeout') | |
parser.add_argument('--whitelist', dest='whitelist', nargs='*', default=None, help='Whitelisted list of IPs') | |
args = parser.parse_args() | |
if len(sys.argv) == 1: | |
parser.print_help() | |
exit(1) | |
if args.url is None and args.file is None and sys.stdin.isatty(): | |
parser.error("You have to provide either an URL or a FILE or STDIN with a list of IPs") | |
exit(1) | |
try: | |
blacklist = Blacklist() | |
if args.url: | |
blacklist.url = args.url | |
elif args.file: | |
blacklist.file = args.file | |
elif not sys.stdin.isatty(): | |
blacklist.text = sys.stdin.read().strip() | |
blacklist.whitelist = tuple(args.whitelist) if args.whitelist else tuple() | |
blacklist.ipset = args.ipset | |
blacklist.ports = tuple(args.ports) if args.ports else (80, 443) | |
blacklist.timeout = int(args.timeout) | |
blacklist.process() | |
except (ValueError, TypeError, OSError, IOError) as err: | |
print(err) | |
exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment