Skip to content

Instantly share code, notes, and snippets.

@ClaireNeveu
Created November 9, 2017 20:32
Show Gist options
  • Save ClaireNeveu/fd9cfa9e6d4859f7d23262310ba5bdb9 to your computer and use it in GitHub Desktop.
Save ClaireNeveu/fd9cfa9e6d4859f7d23262310ba5bdb9 to your computer and use it in GitHub Desktop.
Phone Number Crawler
from multiprocessing import Pool, Queue
import argparse
from queue import Queue
import sqlite3
import re
from urllib.parse import urljoin, urlparse
import functools
from bs4 import BeautifulSoup
import requests
from attr import attrs, attrib
# Pulled from https://stackoverflow.com/questions/123559/a-comprehensive-regex-for-phone-number-validation
_phone_number_regex = re.compile('''(?:(?:\+?1\s*(?:[.-]\s*)?)?(?:\(\s*([2-9]1[02-9]|[2-9][02-8]1|[2-9][02-8][02-9])\s*\)|([2-9]1[02-9]|[2-9][02-8]1|[2-9][02-8][02-9]))\s*(?:[.-]\s*)?)?([2-9]1[02-9]|[2-9][02-9]1|[2-9][02-9]{2})\s*(?:[.-]\s*)?([0-9]{4})(?:\s*(?:#|x\.?|ext\.?|extension)\s*(\d+))?''')
def normalize_link(referrer, link):
'''Converts all links into absolute links'''
return urljoin(referrer, link)
def crawl(durl, max_depth, same_site):
url = durl.url
urls = []
phone_numbers = []
try:
response = requests.get(url)
except:
return url, [], []
if response.status_code != 200:
return url, [], []
soup = BeautifulSoup(response.text, 'html.parser')
for link in soup.find_all('a'):
link_url = link.get('href')
if link_url and ((max_depth is None) or durl.depth < max_depth):
abs_url = normalize_link(url, link_url)
if not same_site or urlparse(url).netloc == urlparse(abs_url).netloc:
urls.append(abs_url)
for text in soup.find_all(string=_phone_number_regex):
for match in _phone_number_regex.findall(text):
phone_numbers.append(''.join(match))
return durl, urls, phone_numbers
def main():
arg_parser = argparse.ArgumentParser(description='Crawl the web for phone numbers')
arg_parser.add_argument(
'urls',
metavar='URLS',
type=str,
nargs='+',
help='URLs to begin crawl with.')
arg_parser.add_argument(
'-j', '--jobs',
type=int,
default=16,
help='Number of threads to run.')
arg_parser.add_argument(
'-d', '--max-depth',
type=int,
default=None,
help='Maximum page depth to traverse.')
arg_parser.add_argument(
'-s', '--same-site',
action='store_true',
help='Whether to exclude pages from other sites from the crawl.')
args = arg_parser.parse_args()
pool = Pool(args.jobs)
db_conn = sqlite3.connect('crawler.db')
db_cursor = db_conn.cursor()
db_cursor.execute('''CREATE TABLE IF NOT EXISTS phone_numbers (phone_number text)''')
url_queue = Queue()
crawled = set(args.urls)
for url in args.urls:
url_queue.put(Url(url, 0))
def step():
if url_queue.empty():
return None
else:
return url_queue.get()
process_job = functools.partial(
crawl,
max_depth=args.max_depth,
same_site=args.same_site)
while not url_queue.empty():
iterator = iter(step, None)
for last_url, urls, phone_numbers in pool.imap_unordered(process_job, iterator):
print('Crawled {}'.format(last_url.url))
for url in urls:
if url not in crawled:
crawled.add(url)
url_queue.put(Url(url, last_url.depth + 1))
if len(phone_numbers) > 0:
db_cursor.executemany('INSERT INTO phone_numbers VALUES (?)', [[x] for x in phone_numbers])
db_conn.commit()
db_conn.close()
pool.close()
@attrs(slots=True)
class Url:
url = attrib()
depth = attrib()
if __name__ == '__main__':
main()
attrs==17.3.0
beautifulsoup4==4.6.0
certifi==2017.11.5
chardet==3.0.4
idna==2.6
requests==2.18.4
urllib3==1.22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment