Skip to content

Instantly share code, notes, and snippets.

@odashi
Created March 2, 2014 17:44
Show Gist options
  • Save odashi/9310493 to your computer and use it in GitHub Desktop.
Save odashi/9310493 to your computer and use it in GitHub Desktop.
Simple Web crowler
# coding: utf-8
import chardet
import datetime
import html.parser
import optparse
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
def perror(text):
sys.stderr.write('ERROR: ' + text + '\n')
class Crowler:
def __init__(self, interval=1.0, depth_first=False):
self.__interval = interval
self.__depth_first = depth_first
def __get_contenttype(self, page):
return page.getheader('Content-Type').split(';')[0].strip()
def __get_encoding(self, data):
return chardet.detect(data)['encoding']
def __request(self, uri):
time.sleep(self.__interval)
try:
page = urllib.request.urlopen(uri)
contenttype = self.__get_contenttype(page)
if contenttype != 'text/html':
return None
data = page.read()
encoding = self.__get_encoding(data)
data = data.decode(encoding)
except Exception as ex:
perror('retrieving page failed: %s' % str(ex))
return None
return data
def __expand_uri(self, text, uri, prefix):
class Expander(html.parser.HTMLParser):
def __init__(self):
html.parser.HTMLParser.__init__(self)
self.uris = []
self.__ignorelist = ['javascript:', 'mailto:']
def __ignore(self, text):
for pattern in self.__ignorelist:
if text[:len(pattern)] == pattern:
return True
return False
def __make_uri(self, base, target):
uri = urllib.parse.urljoin(base, target)
return uri.split('#')[0] # remove anchor
def handle_starttag(self, tag, attrs):
if tag.lower() == 'a':
for attrname, attrdata in attrs:
if attrname.lower() != 'href':
continue
if self.__ignore(attrdata):
continue
newuri = self.__make_uri(uri, self.unescape(attrdata))
if newuri[:len(prefix)] != prefix:
continue
self.uris.append(newuri)
try:
expander = Expander()
expander.feed(text)
except Exception as ex:
perror('bad HTML: %s' % str(ex))
return []
return expander.uris
def crowl(self, seed, prefix=''):
openlist = [seed]
closedlist = set()
while openlist:
uri = openlist.pop() if self.__depth_first else openlist.pop(0)
closedlist.add(uri)
data = self.__request(uri)
if data is not None:
for expuri in self.__expand_uri(data, uri, prefix):
if expuri not in closedlist and expuri not in openlist:
openlist.append(expuri)
yield {'data': data, 'uri': uri, 'num_open': len(openlist), 'num_closed': len(closedlist)}
def parse_options():
parser = optparse.OptionParser(usage='crowl.py [options] -s <seed URI> -o <output directory>')
parser.add_option('-s', '--seed', dest='seed', type=str, default='', metavar='STR',
help='[required] seed URI (start crowling from this URI)')
parser.add_option('-p', '--prefix', dest='prefix', type=str, default='', metavar='STR',
help='URI prefix (ignore pages when the head of URI is not this)')
parser.add_option('-o', '--output', dest='output', type=str, default='', metavar='STR',
help='[required] output directory')
parser.add_option('-I', '--interval', dest='interval', type=float, default='1.0', metavar='FLOAT',
help='interval time between each HTTP request')
parser.add_option('-D', '--depth-first', dest='depth_first', action='store_true', default=False,
help='use depth-first search (default is breadth-first)')
options, args = parser.parse_args()
return options
def check_options(options):
ok = True
required = []
if not options.seed:
required.append('-s (--seed)')
if not options.output:
required.append('-o (--output)')
if required:
perror('required options are not satisfied: %s' % ', '.join(required))
ok = False
if options.interval <= 0.0:
perror('interval must be greater than 0.0')
ok = False
if ok:
print('seed URI : ' + options.seed)
print('URI prefix : ' + options.prefix)
print('output directory : ' + options.output)
print('request interval : ' + str(options.interval))
print('strategy : ' + ('depth-first' if options.depth_first else 'breadth-first'))
print()
return ok
def check_outputdir(outputdir):
try:
if not os.path.exists(outputdir):
os.makedirs(outputidr)
except Exception as ex:
perror('could not create directory: %s' % str(ex))
return False
if not os.path.isdir(outputdir):
perror('specified output is not directory')
return False
if not os.access(outputdir, os.X_OK):
perror('could not access output directory')
return False
return True
def main():
options = parse_options()
if not check_options(options):
return
if not check_outputdir(options.output):
return
crowler = Crowler(interval=options.interval, depth_first=options.depth_first)
for info in crowler.crowl(options.seed, options.prefix):
data = info['data']
uri = info['uri']
num_open = info['num_open']
num_closed = info['num_closed']
filename = options.output + '/' + uri.replace('/', '@')
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('%s (open: %d, closed: %d) - %s' % (timestamp, num_open, num_closed, uri))
with open(filename, 'w', encoding='utf-8') as fp:
fp.write(data)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment