Skip to content

Instantly share code, notes, and snippets.

@dziegler
Created April 9, 2012 06:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dziegler/8d8b9d51da827ccd0164 to your computer and use it in GitHub Desktop.
Save dziegler/8d8b9d51da827ccd0164 to your computer and use it in GitHub Desktop.
easter crawler
# -*- coding: utf-8 -*-
#
# A simple spider that hunts for easter eggs on bonobos, use at your own peril :)
#
from gevent import monkey; monkey.patch_all()
import urllib
import re
import urlparse
import time
import gevent
from gevent.queue import JoinableQueue, Empty
from BeautifulSoup import BeautifulSoup
URLS_CHECKED = set(["http://www.bonobos.com/welcome/easter-egg-hunt-2012/"])
BASE_URL = "http://www.bonobos.com/"
CODES = (
"marshmallow chick",
"afikoman",
"chocolate bunny",
"silver lily",
"golden egg",
"bonus",
)
NUM_WORKERS = 100
def urlEncodeNonAscii(b):
return re.sub('[\x80-\xFF]', lambda c: '%%%02x' % ord(c.group(0)), b)
def iriToUri(iri):
parts= urlparse.urlparse(iri)
return urlparse.urlunparse(
part.encode('idna') if parti==1 else urlEncodeNonAscii(part.encode('utf-8'))
for parti, part in enumerate(parts)
)
def format_url(url):
if url.startswith('/'):
url = BASE_URL[:-1] + url
url = url.split("?")[0]
if not url.endswith("/"):
url += "/"
url = iriToUri(url)
return url
def check_codes(url, doc):
for code in CODES:
idx = doc.lower().find(code)
if idx != -1:
print "\n\n=================================================================="
print u'FOUND {0} on {1}'.format(code, url)
print doc[idx:idx+200]
print "==================================================================\n\n"
def spider(url, dig=True):
url = format_url(url)
if url in URLS_CHECKED:
return
URLS_CHECKED.add(url)
if not url.startswith(BASE_URL):
return
try:
c = urllib.urlopen(url)
except IOError:
spider(url)
doc = c.read()
check_codes(url, doc)
soup = BeautifulSoup(doc)
if dig:
for link in soup.findAll("a"):
link_href = link.get('href','').strip()
QUEUE.put(link_href)
def worker():
while True:
try:
url = QUEUE.get(timeout=10)
except Empty:
break
try:
spider(url)
finally:
QUEUE.task_done()
QUEUE = JoinableQueue()
for i in xrange(NUM_WORKERS):
gevent.spawn(worker)
if __name__ == '__main__':
spider(BASE_URL)
while True:
QUEUE.join()
time.sleep(5)
print 'waiting...'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment