Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@dnozay
Created November 23, 2014 06:01
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save dnozay/35f8019ac0f0a27fbe51 to your computer and use it in GitHub Desktop.
Save dnozay/35f8019ac0f0a27fbe51 to your computer and use it in GitHub Desktop.
simple distributed web crawler using flask + scrapy + redis

design

Requests are handled by flask, a bunch of urls are inserted in the object store (redis) and arguments are put on the queue (redis again) for workers to consume. More workers would mean more items processed in parallel.

Other possible implementations:

  • multiprocessing module for consuming all cpus.
    • multiprocessing.managers.SyncManager for distributing task to other machines.
  • zmq for queue processing
  • django for webapp
  • processing one url at a time by a worker (balance? keep more workers busy?)

flask

why? easy to prototype with.

linking containers

redis

why? container is alreay available for it.

from scrapy.selector import Selector
from urlparse import urlparse
import requests
def get_links(page):
"""extract links from page"""
# note: links may be relative.
return Selector(text=page).xpath('//a/@href').extract()
def get_images(page):
# note: links may be relative.
return Selector(text=page).css('img').xpath('@src').extract()
def retrieve_page(url):
# XXX: may not handle auth or ssl properly
body = ""
try:
body = requests.get(url).text
finally:
return body
def get_base_url(url):
parsed = urlparse(url)
return "{0.scheme}://{0.netloc}".format(parsed)
def make_absolute(base, url):
if url.startswith('//') or '://' in url:
return url
return "{0}{1}".format(base, url)
def make_absolute_list(base, urls):
return [make_absolute(base, url) for url in urls]
def process_url(url):
page = retrieve_page(url)
base = get_base_url(url)
links = get_links(page)
images = get_images(page)
return make_absolute_list(base,links), make_absolute_list(base,images)
#!/bin/bash
curl -X POST -F "urls=@-" localhost:5000 << EOF
http://www.example.com/
http://www2.example.com/
EOF
curl localhost:5000/status/0
curl localhost:5000/result/0
import os
import redis
from flask import Flask, Response, request, json, abort
# redis docker container linked?
REDIS_HOST = os.getenv('REDIS_PORT_6379_TCP_ADDR', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT_6379_TCP_PORT', 6379))
WEBAPP_PORT = int(os.getenv('WEBAPP_PORT', 5000))
JOBS = 'jobs'
JOBQUEUE = 'jobqueue'
RESULTS = 'results'
NUM_JOBS = 'num_jobs'
app = Flask(__name__)
app.redis = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
app.redis.flushdb()
app.redis.getset(NUM_JOBS, 0)
def get_job_id():
job_id = app.redis.get(NUM_JOBS)
app.redis.incr(NUM_JOBS)
return job_id
@app.route('/', methods=['POST'])
def post_job():
urls = request.files['urls'].read().splitlines()
job_id = get_job_id()
data = {
'job_id': job_id,
'urls': urls,
'completed': 0,
'inprogress': 0,
}
# store item
app.redis.hset(JOBS, job_id, json.dumps(data))
# dispatch to worker(s)
app.redis.rpush(JOBQUEUE, job_id)
resp = Response(json.dumps(data), status=200, mimetype='application/json')
return resp
@app.route('/status/<int:job_id>')
def get_job_status(job_id):
data = app.redis.hget(JOBS, job_id)
if not data:
raise KeyError()
resp = Response(data, status=200, mimetype='application/json')
return resp
@app.route('/result/<int:job_id>')
def get_job_result(job_id):
result_key = '{0}:{1}'.format(RESULTS, job_id)
data = app.redis.lrange(result_key, 0, -1)
resp = Response('\n'.join(data), status=200, mimetype='application/json')
return resp
if __name__ == '__main__':
app.run(host='0.0.0.0', port=WEBAPP_PORT)
import os
import redis
import json
import business_logic
# redis docker container linked?
REDIS_HOST = os.getenv('REDIS_PORT_6379_TCP_ADDR', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT_6379_TCP_PORT', 6379))
JOBS = 'jobs'
JOBQUEUE = 'jobqueue'
RESULTS = 'results'
NUM_JOBS = 'num_jobs'
store = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0)
def get_work_item():
job_id = store.rpop(JOBQUEUE)
data = store.hget(JOBS, job_id)
job = json.loads(data)
return job
def incr_field(job, field):
job[field] = job[field] + 1
store.hset(JOBS, job['job_id'], job)
def decr_field(job, field):
job[field] = job[field] - 1
store.hset(JOBS, job['job_id'], job)
def update_results(job, images, visited):
job_id = job['job_id']
result_key = '{0}:{1}'.format(RESULTS, job_id)
for img in images:
# do not want duplicates.
if img in visited:
continue
# add it to the results.
visited.add(img)
store.rpush(result_key, img)
def work_on_one_item():
job = get_work_item()
incr_field(job, 'inprogress')
urls = job['urls'][:]
maxlevel = 2
output = []
visited = set()
imgvisited = set()
for curr_level in range(maxlevel):
# check if we are already done
if not urls:
break
next_urls = []
for url in urls:
# do not process the same url twice.
if url in visited:
continue
# mark url as visited and process.
visited.add(url)
links, images = business_logic.process_url(url)
next_urls += links
# update store with results
update_results(job, images, imgvisited)
# flip the lists.
urls = next_urls
incr_field(job, 'completed')
decr_field(job, 'inprogress')
if __name__ == '__main__':
work_on_one_item()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment