Skip to content

Instantly share code, notes, and snippets.

@jorotenev
Created March 24, 2017 15:44
Show Gist options
  • Save jorotenev/4c4dd71f45d5d85f89fd0dd47a287933 to your computer and use it in GitHub Desktop.
Save jorotenev/4c4dd71f45d5d85f89fd0dd47a287933 to your computer and use it in GitHub Desktop.
Given a list of domains in a csv file, try to find the websites which pass a given predicate.
import threading
from requests import get
import queue
from datetime import datetime as dt
# Given a csv file with domain names, retrieve the content of the webpage, and given a predicate, add the domain name to a result set if the
# response from the passes the check of the predicate.
# Results are written to a timestamped file in the same directory of the script.
### Predicates
# Given a requests.Response object return True if the URL should be added to the result
#
def page_errors_out(response):
return response.status_code > 403
def site_is_for_sale(response):
return "domain is for sale" in response.text.lower()
def sponsored_links(response):
return "sponsored listings" or "gerelateerde links" in response.text.lower()
def blogging(response):
return "blogger" in response.text.lower()
def under_construction(response):
return "under construction" in response.text.lower()
def predicate_true_for_url(url, predicate):
"""
given a url and a predicate, make a request to the url and pass the response to the predicate
:returns True if the predicate returns True for the Response, False otherwise
"""
try:
urls = [url, "www." + url]
return any([predicate(get("http://" + url, timeout=2)) for url in urls])
except Exception as ex:
return False
###
### Helpers
def load_urls(file_name):
"""
Read a csv file with a format like this:
"<domain name:>", "<domain extension:>"[,""*]
e.g.
"apple","be",""
:param file_name filename containing urls, one on a line
:returns a list with urls
"""
with open(file_name) as file:
lines = []
for line in file:
l = line.strip()
if not l:
continue
l = l.replace("\"", "")
l = l.split(",") # it's a csv file with three columns
lines.append(l[0] + "." + l[1]) # first column is domain, second is ".be"
return lines
def thread_run(urls, predicate, max_results):
"""
A method to be execute by a thread.
Given an array of urls, see which ones satisfy a given predicate.
The result is kept in a global thread-safe queue.
:param urls: list of URL addresses
:param predicate: function taking one parameter (a requests.Response object) and returning a bool. On True, the url is added to the result
:param max_results: int. when to stop. if the result contains more than this number, the thread will exit.
:return: None
"""
global result
print("Threads %s started " % urls[0])
for url in urls:
if predicate_true_for_url(url, predicate):
result.put(url)
if result.qsize() > max_results:
break
# wait until all threads are ready
print("thread %s is ready" % urls[0])
def export_result():
"""
Write the contents of the result to a file. The results is a collection of URLs, thus the file
will contain
"""
global result
with open('result_%s.txt' % dt.now().isoformat(), 'a') as f:
while not result.empty():
f.write(result.get() + "\n")
###
def start(file_path, predicate, max_entries=15):
"""
Main driver of the script.
Load the input URLs, create split the input into chunks, which are then fed into different threads.
Then wait until all threads are ready. Write the result (contained in a thread-safe queue) to a file.
"""
all_urls = load_urls(file_path)
number_splits = len(all_urls) // 10 # best effort (might be more splits than that)
print("Urls loaded")
# split the urls in evenly sized chunks, each of which will be passed to a thread
splitted = [all_urls[i:i + number_splits] for i in range(0, len(all_urls), number_splits)]
threads = []
for split in splitted:
t = threading.Thread(target=thread_run, args=(split, predicate, max_entries))
# t.daemon = True
t.start()
threads.append(t)
print('Waiting for all threads to finish')
for t in threads:
t.join()
print("Found any results? [%s]" % not result.empty())
export_result()
if __name__ == "__main__":
# thread safe structure to hold the result
result = queue.Queue()
## Script configuration
# the file with the input URLs
urls_file_path = "domainlabels_empty.csv"
# predicate from the Predicates section above.
predicate = blogging
start(file_path=urls_file_path,
predicate=predicate,
max_entries=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment