Skip to content

Instantly share code, notes, and snippets.

@techouse
Created January 6, 2017 11:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save techouse/601ca78d88aff3ece751adaa026a36fa to your computer and use it in GitHub Desktop.
Save techouse/601ca78d88aff3ece751adaa026a36fa to your computer and use it in GitHub Desktop.
Add a set of IPs to an ipset. Requires RHEL/CentOS/Fedora Linux, FirewallD and IPset.
#!/usr/bin/env python3
__author__ = "Klemen Tušar"
__email__ = "techouse@gmail.com"
__copyright__ = "GPL"
__version__ = "1.0.2"
__date__ = "2017-01-06"
__status__ = "Production"
import re, subprocess
from urllib.request import urlopen, Request, URLError
from datetime import datetime
from random import choice
class Blacklist:
def __init__(self, **kwargs):
self._url = kwargs.get('url', None)
self._file = kwargs.get('file', None)
self._text = kwargs.get('text', None)
self._whitelist = tuple()
self._timeout = 3600
self._ipset = False
self._ipv4_pattern = re.compile('^[\d+\.]+(/\d+)?')
self._useragents = (
'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 6.3; rv:36.0) Gecko/20100101 Firefox/36.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10; rv:33.0) Gecko/20100101 Firefox/33.0',
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20130401 Firefox/31.0',
'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0',
'Opera/9.80 (X11; Linux i686; Ubuntu/14.10) Presto/2.12.388 Version/12.16',
'Opera/9.80 (Windows NT 6.0) Presto/2.12.388 Version/12.14',
'Opera/12.80 (Windows NT 5.1; U; en) Presto/2.10.289 Version/12.02',
'Opera/12.0(Windows NT 5.2;U;en)Presto/22.9.168 Version/12.00',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A',
'Mozilla/5.0 (iPad; CPU OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5355d Safari/8536.25',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_8) AppleWebKit/537.13+ (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/534.55.3 (KHTML, like Gecko) Version/5.1.3 Safari/534.53.10',
'Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko',
'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)',
'Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)',
'Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; GTB7.4; InfoPath.2; SV1; .NET CLR 3.3.69573; WOW64; en-US)',
'Mozilla/5.0 (Windows; U; MSIE 7.0; Windows NT 6.0; en-US)'
)
self._bin = {
'firewall-cmd': '/usr/bin/firewall-cmd',
'ipset': '/usr/sbin/ipset'
}
if not self._bin['firewall-cmd']:
raise OSError('firewall-cmd binary missing! Unable to continue.')
if not self._bin['ipset']:
raise OSError('ipset binary missing! Unable to continue.')
self._ports = (80, 443)
@property
def url(self):
return self._url
@url.setter
def url(self, url):
self._url = url
@url.deleter
def url(self):
self._url = False
@property
def file(self):
return self._file
@file.setter
def file(self, file):
self._file = file
@file.deleter
def file(self):
self._file = False
@property
def text(self):
return self._text
@text.setter
def text(self, text):
self._text = text
@text.deleter
def text(self):
self._text = False
@property
def whitelist(self):
return self._whitelist
@whitelist.setter
def whitelist(self, whitelist):
if isinstance(whitelist, tuple):
self._whitelist = whitelist
else:
raise TypeError('whitelist must be a tuple')
@whitelist.deleter
def whitelist(self):
self._whitelist = tuple()
@property
def ipset(self):
return self._ipset
@ipset.setter
def ipset(self, ipset):
self._ipset = ipset
@ipset.deleter
def ipset(self):
self._ipset = False
@property
def timeout(self):
return self._timeout
@timeout.setter
def timeout(self, timeout):
self._timeout = int(timeout)
@timeout.deleter
def timeout(self):
self._timeout = 3600
@property
def ports(self):
return self._ports
@ports.setter
def ports(self, ports):
if isinstance(ports, tuple):
self._ports = ports
else:
raise TypeError('ports must be a tuple')
@ports.deleter
def ports(self):
self._ports = (80, 443)
def _get_new_ips(self):
if self._url:
try:
with urlopen(Request(self._url, headers={'User-Agent': choice(self._useragents)})) as response:
for ip in response.read().decode('utf-8').splitlines():
if ip.strip():
ipv4 = self._ipv4_pattern.match(ip)
if ipv4:
yield ipv4.group().strip()
except URLError as err:
raise ValueError('Invalid URL: {}'.format(err))
elif self._file:
try:
with open(self._file, 'r') as fh:
for ip in fh:
if ip.strip():
ipv4 = self._ipv4_pattern.match(ip)
if ipv4:
yield ipv4.group().strip()
except IOError as err:
raise IOError('Error reading file {}: {}'.format(self._file, err))
elif self._text:
for ip in self._text.splitlines():
if ip.strip():
ipv4 = self._ipv4_pattern.match(ip)
if ipv4:
yield ipv4.group().strip()
else:
raise ValueError('Please provide a URL, FILE or STDIN lines of data!')
def _ipset_exists(self):
if not self._ipset:
raise ValueError('Invalid ipset name!')
return False if subprocess.getoutput('{} list {}'.format(
self._bin['ipset'],
self._ipset)
).find('The set with the given name does not exist') != -1 else True
def _create_ipset(self):
if not self._ipset:
raise ValueError('Invalid ipset name!')
return subprocess.call(
'{} create {} hash:net timeout {}'.format(
self._bin['ipset'],
self._ipset,
self._timeout
),
shell=True
)
def _rule_exists(self):
if not self._ipset:
raise ValueError('Invalid ipset name!')
return True if subprocess.getoutput(
'{} --direct --query-rule ipv4 filter INPUT_direct 0 -p tcp -m multiport --dports {} -m set --match-set {} src -j DROP'.format(
self._bin['firewall-cmd'],
','.join(str(port).strip() for port in self._ports),
self._ipset
)
) == 'yes' else False
def _create_rule(self):
if not self._ipset:
raise ValueError('Invalid ipset name!')
return subprocess.call(
'{} --direct --add-rule ipv4 filter INPUT_direct 0 -p tcp -m multiport --dports {} -m set --match-set {} src -j DROP'.format(
self._bin['firewall-cmd'],
','.join(str(port).strip() for port in self._ports),
self._ipset
),
shell=True
)
def _reject_ipset(self, ip):
if not self._ipset:
raise ValueError('Invalid ipset name!')
return subprocess.call(
'{} add {} {} timeout {} -exist'.format(
self._bin['ipset'],
self._ipset,
ip,
self._timeout
),
shell=True
)
def process(self):
if not self._ipset_exists():
self._create_ipset()
if not self._rule_exists():
self._create_rule()
for ip in self._get_new_ips():
if ip not in self._whitelist:
self._reject_ipset(ip)
print('COMPLETED: {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
def main():
import sys, argparse
parser = argparse.ArgumentParser()
parser.add_argument('--url', dest='url', default=None, help='URL to a list of IPs')
parser.add_argument('--file', dest='file', default=None, help='A text file with a list of IPs')
parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help='A list of IPs')
parser.add_argument('--ipset', dest='ipset', default=None, help='ipset name', required=True)
parser.add_argument('--ports', dest='ports', nargs='*', default=(80, 443), help='Affected ports')
parser.add_argument('--timeout', dest='timeout', default=3600, help='Ban timeout')
parser.add_argument('--whitelist', dest='whitelist', nargs='*', default=None, help='Whitelisted list of IPs')
args = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
exit(1)
if args.url is None and args.file is None and sys.stdin.isatty():
parser.error("You have to provide either an URL or a FILE or STDIN with a list of IPs")
exit(1)
try:
blacklist = Blacklist()
if args.url:
blacklist.url = args.url
elif args.file:
blacklist.file = args.file
elif not sys.stdin.isatty():
blacklist.text = sys.stdin.read().strip()
blacklist.whitelist = tuple(args.whitelist) if args.whitelist else tuple()
blacklist.ipset = args.ipset
blacklist.ports = tuple(args.ports) if args.ports else (80, 443)
blacklist.timeout = int(args.timeout)
blacklist.process()
except (ValueError, TypeError, OSError, IOError) as err:
print(err)
exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment