Skip to content

Instantly share code, notes, and snippets.

@funilrys
Created March 21, 2018 08:44
Show Gist options
  • Save funilrys/900abd388b1f3b399a9da69e0e592fef to your computer and use it in GitHub Desktop.
Save funilrys/900abd388b1f3b399a9da69e0e592fef to your computer and use it in GitHub Desktop.
This script compare the given link (-l LINK) or the given file (-f FILE) with the current version of Ultimate-Hosts-Blacklist
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module has been written because we wanted to know how many new entries
a list can offer to the already working system.
Authors:
- @Funilrys, Nissar Chababy <contactTAfunilrysTODcom>
Contributors:
Let's contribute !
@GitHubUsername, Name, Email (optional)
"""
# pylint: disable=too-many-lines
import argparse
from itertools import repeat
from os import path
from re import compile as comp
from re import escape
from re import sub as substrings
from subprocess import PIPE, Popen
from sys import stdout
from colorama import Fore, Style
from colorama import init as initiate
from requests import get
class Settings(object): # pylint: disable=too-few-public-methods
"""
This class will save all data that can be called from anywhere in the code.
"""
# This variable set the username to use to authenticate to GitHub API
github_api_username = ''
# This variable set the token to use to authenticate to GitHub API
github_api_token = ''
# This variable set the GitHub repository slug.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
github_org_slug = 'Ultimate-Hosts-Blacklist'
# This variable set the name of the whitelist repository.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
whitelist_repo_name = 'whitelist'
# This variable set the github api url.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
github_api_url = 'https://api.github.com'
# This variable set the github raw url.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
github_raw_url = 'https://raw.githubusercontent.com/'
# This variable set the deploy raw url.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
deploy_raw_url = 'https://hosts.ubuntu101.co.za/update_hosts.php'
# This variable set the partially full url when attempting to get the
# raw file.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
raw_link = github_raw_url + github_org_slug + '/%s/master/'
# This variable the organisation url.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
github_org_url = '%s/orgs/%s' % (github_api_url, github_org_slug)
# This variable save the list of repository.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by Initiate()
repositories = []
# This variable set the repository to ignore.
repo_to_ignore = ['repository-structure', 'whitelist']
# This variable save the list of all domains.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by Initiate()
domains = []
# This variable save the list of all ips.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by Initiate()
ips = []
# This variable save the list of all whitelisted domain.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by Initiate()
whitelist = []
# This variable save the list of all whitelisted domain in regex format.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by Initiate()
regex_whitelist = ''
# This variable is used to set the marker that we use to say that we
# match all occurence of the domain or IP.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
whitelist_all_marker = 'ALL '
# This variable is used to save the link to compare.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by ARGS
link = ''
# This variable is used to save the file to compare.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
# Note: This variable is auto updated by ARGS
file = ''
# This variable set the regex to use to catch IPv4.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
regex_ip4 = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?|[0-9]{1,}\/[0-9]{1,})$' # pylint: disable=line-too-long
# This variable set the regex to use to catch IPv4.
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
regex_domain = r'^(?=.{0,253}$)(([a-z0-9][a-z0-9-]{0,61}[a-z0-9]|[a-z0-9])\.)+((?=.*[^0-9])([a-z0-9][a-z0-9-]{0,61}[a-z0-9]|[a-z0-9]))$' # pylint: disable=line-too-long
# This variable set the char to use when something is done.
done = Fore.GREEN + Style.BRIGHT + '✔'
# This variable set the char to use when an error occured
error = Fore.RED + Style.BRIGHT + '✘'
# This variable set if we use/generate cache
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
cache = True
# This variable save the location of our cache file
#
# Note: DO NOT TOUCH UNLESS YOU KNOW WHAT IT MEANS!
cache_location = github_org_slug + '.cache'
class Initiate(object):
"""
This class is used as the main entry of the script.
Please note that this class also initiate several actions before being
used or called.
Argument:
- init: bool
If False we do not run the initiation process.
"""
def __init__(self, init=True):
if init and (Settings.link or Settings.file):
generation = True
if Settings.cache and path.isfile(Settings.cache_location):
choice = input(
"Do you want to use cached data instead of live data ? [y/n] ")
if choice in ['y', 'Y']:
list(map(self.data_parser, Helpers.File(
Settings.cache_location).to_list()))
generation = False
if generation:
self.get_whitelist()
self.list_of_input_sources()
self.data_extractor()
@classmethod
def _whitelist_parser(cls, line):
"""
This method will get and parse all whitelist domain into
Settings.whitelist.
Argument:
- line: str
The extracted line.
"""
if line and not line.startswith('#'):
if line.startswith(Settings.whitelist_all_marker):
to_check = line.split(Settings.whitelist_all_marker)[1]
regex_whitelist = escape(to_check) + '$'
else:
to_check = line
regex_whitelist = '^%s$' % escape(line)
if Helpers.Regex(
to_check,
Settings.regex_ip4,
return_data=False).match() or Helpers.Regex(
to_check,
Settings.regex_domain,
return_data=False).match() or line.startswith(
Settings.whitelist_all_marker):
Settings.whitelist.append(regex_whitelist)
def get_whitelist(self):
"""
This method will get the list of whitelisted domain.
"""
domains_url = (Settings.raw_link +
'domains.list') % Settings.whitelist_repo_name
req = get(domains_url)
print("Getting %s" % Settings.whitelist_repo_name, end=" ")
if req.status_code == 200:
list(map(self._whitelist_parser, req.text.split('\n')))
Settings.whitelist = Helpers.List(Settings.whitelist).format()
Settings.regex_whitelist = '|'.join(Settings.whitelist)
print(Settings.done)
else:
print(Settings.error)
@classmethod
def list_of_input_sources(cls):
"""
This method get the list of input sources to check.
"""
url_to_get = Settings.github_org_url + '/repos'
if Settings.github_api_username and Settings.github_api_token:
pages_finder = get(url_to_get,
auth=(
Settings.github_api_username,
Settings.github_api_token))
else:
pages_finder = get(url_to_get)
if pages_finder.status_code == 200:
last_page = int(
Helpers.Regex(
pages_finder.headers['Link'],
r'.*page=(.*)>.*',
return_data=True,
rematch=True).match()[
-1])
current_page = 1
print("Getting the list of input sources", end=" ")
while current_page <= last_page:
params = {
'page': str(current_page)
}
if Settings.github_api_username and Settings.github_api_token:
req = get(
url_to_get,
params=params,
auth=(
Settings.github_api_username,
Settings.github_api_token))
else:
req = get(
url_to_get,
params=params)
if req.status_code == 200:
for repo in req.json():
name = repo['name']
if name not in Settings.repo_to_ignore:
Settings.repositories.append(name)
else:
print(Settings.error)
raise Exception(
'Impossible to get information about the organisation. Is GitHub down ? (%s)' % # pylint: disable=line-too-long
req.status_code)
current_page += 1
Settings.repositories = Helpers.List(
Settings.repositories).format()
print(Settings.done)
else:
raise Exception(
'Impossible to get the numbers of page to read. Is GitHub down ? (%s) (%s/%s %s)' %
(pages_finder.status_code,
pages_finder.headers['X-RateLimit-Remaining'],
pages_finder.headers['X-RateLimit-Limit'],
pages_finder.headers['X-RateLimit-Reset']))
@classmethod
def _format_line(cls, line):
"""
This method format a line so that we get abstraction of what is not
a domain or IP.
Argument:
- line: str
The line to format.
"""
tabs = '\t'
space = ' '
tabs_position, space_position = (
line.find(tabs), line.find(space))
if tabs_position > -1 and space_position > -1:
if space_position < tabs_position:
separator = space
else:
separator = tabs
elif tabs_position > -1:
separator = tabs
elif space_position > -1:
separator = space
else:
separator = ''
if separator:
splited_line = line.split(separator)
index = 1
while index < len(splited_line):
if splited_line[index]:
break
index += 1
return splited_line[index].split('\r')[0]
return line.split('\r')[0]
def data_parser(self, line, return_data=False):
"""
Given the extracted line, this method append the data
to its final location.
Arguments:
- line: str
The extracted line.
- return_data: bool
If true we return data otherwise we append to final location.
"""
type_of_extracted = ''
if line and not line.startswith('#'):
line = self._format_line(line)
if Helpers.Regex(
line,
Settings.regex_ip4,
return_data=False).match():
type_of_extracted = 'ips'
# print("\rParsing %s" % Style.BRIGHT + line, end="")
if not return_data:
Settings.ips.append(line)
elif Helpers.Regex(line, Settings.regex_domain, return_data=False).match():
# print("\rParsing %s" % Style.BRIGHT + line, end="")
type_of_extracted = 'domains'
if not return_data:
Settings.domains.append(line)
stdout.flush()
if return_data and type_of_extracted:
return [line, type_of_extracted]
elif return_data:
return "well what so say ..."
return ""
def data_extractor(self, repo=None):
"""
This method will read all domains.list or clean.list and append each
domain to Settings.domains and each IP to Settings.ips.
"""
if not repo:
list(map(self.data_extractor, Settings.repositories))
Settings.domains = Helpers.List(Settings.domains).format()
Settings.ips = Helpers.List(Settings.ips).format()
overall = ''
overall += '\n'.join(Settings.domains)
overall += '\n'.join(Settings.ips)
Helpers.File(
Settings.cache_location).write(
overall,
overwrite=True)
else:
domains_url = (Settings.raw_link + 'domains.list') % repo
clean_url = (Settings.raw_link + 'clean.list') % repo
clean_url_data = get(clean_url)
domains_url_data = get(domains_url)
if clean_url_data.status_code == 200:
data = clean_url_data
elif domains_url_data.status_code == 200:
data = domains_url_data
else:
print(Settings.error)
data = ""
if data:
list(map(self.data_parser, data.text.split('\n')))
print(
"\nSplit domains and ips from %s (%s)" %
(Style.BRIGHT + repo, data.url.split('/')[-1]), end=" ")
Settings.domains = Helpers.List(
Helpers.Regex(
Settings.domains,
Settings.regex_whitelist).not_matching_list()).format()
Settings.ips = Helpers.List(
Helpers.Regex(
Settings.ips,
Settings.regex_whitelist).not_matching_list()).format()
print(Settings.done)
class Compare(object): # pylint: disable=too-many-instance-attributes
"""
This class compare a list with our core list.
"""
def __init__(self):
if Settings.link or Settings.file:
self.domains = []
self.ips = []
self.not_present_domains = 0
self.not_present_ips = 0
self.length_core_domains = 0
self.length_core_ips = 0
self.length_domains = 0
self.length_ips = 0
self.length_not_present_domains = 0
self.length_not_present_ips = 0
self.length_overall = 0
self.length_core_overall = 0
self.percentage_not_present_domains = 0
self.percentage_not_present_ips = 0
self.data_extractor()
def _filter_data(self, info):
"""
This method assign data from Initiate()._data_parser()
to self.domains or self.ips
"""
if isinstance(info, list):
current_data = getattr(self, info[1])
current_data.append(info[0])
setattr(self, info[1], current_data)
def calculation(self):
"""
This method will calculate everything that is needed.
"""
self.not_present_domains = list(
set(self.domains) - set(Settings.domains))
self.not_present_ips = list(set(self.ips) - set(Settings.ips))
self.length_core_domains = len(Settings.domains)
self.length_core_ips = len(Settings.ips)
self.length_domains = len(self.domains)
self.length_ips = len(self.ips)
self.length_not_present_domains = len(self.not_present_domains)
self.length_not_present_ips = len(self.not_present_ips)
self.length_overall = self.length_domains + self.length_ips
self.length_core_overall = self.length_core_domains + self.length_core_ips
self.percentage_not_present_domains = int(
(self.length_not_present_domains * 100) / self.length_overall)
self.percentage_not_present_ips = int(
(self.length_not_present_ips * 100) / self.length_overall)
def data_extractor(self):
"""
This method extract everything from the given link.
"""
if Settings.link:
data_from_url = get(Settings.link)
if data_from_url.status_code == 200:
data = data_from_url.text.split('\n')
else:
raise Exception(
'Is the link wrong ? (%s)' %
data_from_url.status_code)
elif Settings.file:
data = Helpers.File(Settings.file).to_list()
parsed = list(map(Initiate(False).data_parser, data, repeat(True)))
list(map(self._filter_data, parsed))
self.calculation()
print('\n')
# pylint: disable=anomalous-backslash-in-string
print(Fore.GREEN + """
###########################################################################
# _ _ _ _ #
# /\ /\| | |_(_)_ __ ___ __ _| |_ ___ #
# / / \ \ | __| | '_ ` _ \ / _` | __/ _ \ #
# \ \_/ / | |_| | | | | | | (_| | || __/ #
# \___/|_|\__|_|_| |_| |_|\__,_|\__\___| #
# _ ___ _ _ _ _ _ #
# /\ /\___ ___| |_ ___ / __\ | __ _ ___| | _| (_)___| |_ #
# / /_/ / _ \/ __| __/ __| /__\// |/ _` |/ __| |/ / | / __| __| #
# / __ / (_) \__ \ |_\__ \ / \/ \ | (_| | (__| <| | \__ \ |_ #
# \/ /_/ \___/|___/\__|___/ \_____/_|\__,_|\___|_|\_\_|_|___/\__| #
# #
###########################################################################
""" + Fore.RESET)
print("Number of entries: %s" % format(self.length_core_overall, ',d'))
print("Number of domains: %s" % format(self.length_core_domains, ',d'))
print("Number of ips: %s" % format(self.length_core_ips, ',d'))
print('\n')
print(Fore.CYAN + """
#####################################################
# _____ _ _ __ _ _ #
# /__ \___ ___| |_ ___ __| | / /(_)___| |_ #
# / /\/ _ \/ __| __/ _ \/ _` | / / | / __| __| #
# / / | __/\__ \ || __/ (_| | / /__| \__ \ |_ #
# \/ \___||___/\__\___|\__,_| \____/_|___/\__| #
# #
#####################################################
""" + Fore.RESET)
print("Number of entries: %s" % format(self.length_overall, ',d'))
print("Number of domains: %s" % format(self.length_domains, ',d'))
print(
"Number of new domains: %s (%s%%)" %
(format(
self.length_not_present_domains,
',d'),
self.percentage_not_present_domains))
print("Number of ips: %s" % format(self.length_ips, ',d'))
print(
"Number of new ips: %s (%s%%)" %
(format(
self.length_not_present_ips),
self.percentage_not_present_ips))
# print('\n',list(self.not_present_domains))
class Helpers(object): # pylint: disable=too-few-public-methods
"""
Well, thanks to those helpers :-)
"""
class List(object): # pylint: disable=too-few-public-methods
"""
List manipulation.
"""
def __init__(self, main_list=None):
if main_list is None:
self.main_list = []
else:
self.main_list = main_list
def format(self):
"""
Return a well formated list. Basicaly, it's sort a list and remove duplicate.
"""
try:
return sorted(list(set(self.main_list)), key=str.lower)
except TypeError:
return self.main_list
class File(object): # pylint: disable=too-few-public-methods
"""
File treatment/manipulations.
Arguments:
file: str
Path to the file to manipulate.
"""
def __init__(self, file):
self.file = file
def write(self, data_to_write, overwrite=False):
"""
Write or append data into the given file path.
:param data_to_write: A string, the data to write.
"""
if data_to_write is not None and isinstance(
data_to_write, str):
if overwrite or not path.isfile(self.file):
with open(self.file, 'w', encoding="utf-8") as file:
file.write(data_to_write)
else:
with open(self.file, 'a', encoding="utf-8") as file:
file.write(data_to_write)
def to_list(self):
"""
Read a file path and return each line as a list element.
"""
result = []
for read in open(self.file):
result.append(read.rstrip('\n').strip())
return result
class Regex(object): # pylint: disable=too-few-public-methods
"""A simple implementation ot the python.re package
:param data: A string, the data to regex check
:param regex: A string, the regex to match
:param return_data: A boolean, if True, return the matched string
:param group: A integer, the group to return
:param rematch: A boolean, if True, return the matched groups into a
formated list. (implementation of Bash ${BASH_REMATCH})
:param replace_with: A string, the value to replace the matched regex with.
:param occurences: A int, the number of occurence to replace.
"""
def __init__(self, data, regex, **args):
# We initiate the needed variable in order to be usable all over
# class
self.data = data
# We assign the default value of our optional arguments
optional_arguments = {
"escape": False,
"group": 0,
"occurences": 0,
"rematch": False,
"replace_with": None,
"return_data": True
}
# We initiate our optional_arguments in order to be usable all over the
# class
for (arg, default) in optional_arguments.items():
setattr(self, arg, args.get(arg, default))
if self.escape: # pylint: disable=no-member
self.regex = escape(regex)
else:
self.regex = regex
def match(self):
"""Used to get exploitable result of re.search"""
# We initate this variable which gonna contain the returned data
result = []
# We compile the regex string
to_match = comp(self.regex)
# In case we have to use the implementation of ${BASH_REMATCH} we use
# re.findall otherwise, we use re.search
if self.rematch: # pylint: disable=no-member
pre_result = to_match.findall(self.data)
else:
pre_result = to_match.search(self.data)
if self.return_data and pre_result is not None: # pylint: disable=no-member
if self.rematch: # pylint: disable=no-member
for data in pre_result:
if isinstance(data, tuple):
result.extend(list(data))
else:
result.append(data)
if self.group != 0: # pylint: disable=no-member
return result[self.group] # pylint: disable=no-member
else:
result = pre_result.group(
self.group).strip() # pylint: disable=no-member
return result
elif not self.return_data and pre_result is not None: # pylint: disable=no-member
return True
return False
def not_matching_list(self):
"""
This method return a list of string which don't match the
given regex.
"""
pre_result = comp(self.regex)
return list(
filter(
lambda element: not pre_result.search(element),
self.data))
def replace(self):
"""Used to replace a matched string with another."""
if self.replace_with is not None: # pylint: disable=no-member
return substrings(
self.regex,
self.replace_with, # pylint: disable=no-member
self.data,
self.occurences) # pylint: disable=no-member
return self.data
class Command(object):
"""
Shell command execution.
Arguments:
command: A string, the command to execute.
allow_stdout: A bool, If true stdout is always printed otherwise stdout
is passed to PIPE.
"""
def __init__(self, command, allow_stdout=True):
self.decode_type = 'utf-8'
self.command = command
self.stdout = allow_stdout
def decode_output(self, to_decode):
"""Decode the output of a shell command in order to be readable.
Arguments:
to_decode: byte(s), Output of a command to decode.
"""
if to_decode is not None:
# return to_decode.decode(self.decode_type)
return str(to_decode, self.decode_type)
return False
def execute(self):
"""Execute the given command."""
if not self.stdout:
process = Popen(
self.command,
stdout=PIPE,
stderr=PIPE,
shell=True)
else:
process = Popen(self.command, stderr=PIPE, shell=True)
(output, error) = process.communicate()
if process.returncode != 0:
decoded = self.decode_output(error)
if not decoded:
return 'Unkown error. for %s' % (self.command)
print(decoded)
exit(1)
return self.decode_output(output)
if __name__ == '__main__':
initiate(autoreset=True)
PARSER = argparse.ArgumentParser(
description='A script to compare a given link or domain to \
Ultimate.Hosts.Blacklist list.',
epilog="Crafted with %s by %s" %
(Fore.RED +
'♥' +
Fore.RESET,
Style.BRIGHT + Fore.CYAN +
'Nissar Chababy (Funilrys)'))
PARSER.add_argument(
'-l',
'--link',
type=str,
help='Link to compare.')
PARSER.add_argument(
'-f',
'--file',
type=str,
help='File to compare.')
PARSER.add_argument(
'-c',
'--cache',
action='store_true',
help='Cache the list of Ultimate.Hosts.Blacklist entries')
ARGS = PARSER.parse_args()
if ARGS.link:
Settings.link = ARGS.link
elif ARGS.file:
Settings.file = ARGS.file
if ARGS.cache:
Settings.cache = ARGS.cache
Initiate()
Compare()
@mitchellkrogza
Copy link

mitchellkrogza commented Mar 21, 2018

Nice work 👍

@xxcriticxx
Copy link

i will test over the weekend +5 points

@xxcriticxx
Copy link

@funilrys

  1. first i had fun installing colorama
  2. first run took 29 min
  3. how do i read result?

capture

@funilrys
Copy link
Author

funilrys commented Apr 8, 2018

@xxcriticxx Sorry for the waiting time.

first i had fun installing colorama

Indeed I should have mentioned that before.

first run took 29 min

We are dealing with almost 2 millions of entries so the time depends on your hardware and sometimes internet (:arrow_down:) speed.

But, we are soon going to run a repository which you can fork then send a PR so that we use Travis CI infrastructure to compare.

how do i read result?

This script has been written to tell us the difference so we can decide to add an input source or not as we keep the upstream URL.
So, for now, this script does not provide the output of the comparison.

But in future, once that previously nouned repository will be live, I will improve this Gist so it can generate the results.

@xxcriticxx
Copy link

ok

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment