Created
August 5, 2014 05:47
-
-
Save masroore/7cb66428a009e281a6c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pycurl | |
from cStringIO import StringIO | |
import time | |
from collections import deque | |
from itertools import islice | |
from urlparse import urlsplit | |
import select | |
# curl_multi_perform -> transfer data on ready sockets & get num of active handlers | |
# curl_multi_fdset -> extract handlers to use in select/poll | |
# curl_multi_info_read -> info about handlers completed their work | |
DEFAULT_POOL_SIZE = 200 | |
# Idea: | |
# task generator (in term of Spider) and task queue are both | |
# just task generators of different kind. They both produces tasks | |
# and therefor should have same interface: | |
# * get_task | |
# * is_active | |
class Crawler(object): | |
def __init__(self, pool_size=DEFAULT_POOL_SIZE): | |
self.pool_size = pool_size | |
self._timer = { | |
'stat': time.time(), | |
'watchdog': time.time(), | |
} | |
self._timer_watchdog = time.time() | |
self.init_multi_interface() | |
self.build_task_generators() | |
def build_task_generators(self): | |
self.task_gens = [] | |
if hasattr(self, 'task_generator'): | |
self.task_gens.append(CallbackTaskGenerator(self.task_generator())) | |
def init_multi_interface(self): | |
self.handler_reg = {} | |
self.free_handlers = deque() | |
self.active_handlers = deque() | |
self.multi = pycurl.CurlMulti() | |
for x in xrange(self.pool_size): | |
handler = pycurl.Curl() | |
self.free_handlers.append(handler) | |
self._next_timeout = None | |
def process_handler_data(self, handler, task, body): | |
try: | |
result = body.splitlines()[0] | |
except IndexError: | |
result = 'NA' | |
print task['url'], '-->', result | |
def add_task_to_work(self, task): | |
handler = self.free_handlers.pop() | |
handler.setopt(pycurl.URL, task['url']) | |
handler.setopt(pycurl.FOLLOWLOCATION, True) | |
handler.setopt(pycurl.MAXREDIRS, 5) | |
TIMEOUT = 15 | |
handler.setopt(pycurl.TIMEOUT, TIMEOUT) | |
handler.setopt(pycurl.CONNECTTIMEOUT, 5) | |
buf = StringIO() | |
self.handler_reg[handler] = { | |
'buffer': buf, | |
'task': task, | |
'time_start': time.time(), | |
'timeout': TIMEOUT, | |
} | |
handler.setopt(pycurl.WRITEFUNCTION, buf.write) | |
self.multi.add_handle(handler) | |
self.active_handlers.append(handler) | |
def trigger_stat_logging(self, now): | |
if now - self._timer['stat'] > 2: | |
self._timer['stat'] = now | |
print '===>>>' | |
print 'Active handlers:' | |
for handler in self.active_handlers: | |
info = self.handler_reg[handler] | |
age = time.time() - info['start_time'] | |
print '* %s --> %s [%.3f]' % (id(handler), info['task']['url'], age) | |
print '<<<===' | |
def trigger_watchdog(self, now): | |
if now - self._timer['watchdog'] > 1: | |
killed_handlers = set() | |
kill_list = [] | |
self._timer['watchdog'] = now | |
for handler in self.active_handlers: | |
info = self.handler_reg[handler] | |
if now - info['time_start'] > info['timeout']: | |
kill_list.append((handler, info)) | |
for handler, info in kill_list: | |
print 'Explicitly kill timed out handler' | |
self.process_completed_handler(handler, -1, 'Watchdog timeout (%d)' % info['timeout']) | |
killed_handlers.add(handler) | |
return killed_handlers | |
def trigger_data_io(self): | |
rlist, wlist, xlist = self.multi.fdset() | |
if rlist or wlist or xlist: | |
timeout = self.multi.timeout() | |
if timeout: | |
select.select(rlist, wlist, xlist, timeout / 1000.0) | |
else: | |
time.sleep(0.1) | |
# Trigger data transfers | |
while True: | |
opcode, active_num = self.multi.perform() | |
if opcode != pycurl.E_CALL_MULTI_PERFORM: | |
break | |
def get_completed_handlers(self): | |
more_count, ok_handlers, fails = self.multi.info_read() | |
for ok_handler in ok_handlers: | |
yield ok_handler, None, None | |
for fail_handler, err_no, err_msg in fails: | |
yield fail_handler, err_no, err_msg | |
def process_completed_handler(self, handler, err_no, err_msg): | |
info = self.handler_reg[handler] | |
if err_no is None: | |
info['task']['callback'](handler, info['task'], info['buffer'].getvalue()) | |
else: | |
print u'ERROR [%s]: %s' % (info['task']['url'], err_msg) | |
self.active_handlers.remove(handler) | |
self.multi.remove_handle(handler) | |
handler.reset() | |
self.free_handlers.append(handler) | |
def has_active_handlers(self): | |
return len(self.active_handlers) > 0 | |
def get_next_task(self): | |
task = None | |
for gen in self.task_gens: | |
if gen.is_active(): | |
task = gen.get_task() | |
if task is not None: | |
break | |
return task | |
def run(self): | |
dump_time = time.time() | |
while self.has_active_handlers() or any(x.is_active() for x in self.task_gens): | |
if self.free_handlers: | |
task = self.get_next_task() | |
if task is not None: | |
self.add_task_to_work(task) | |
now = time.time() | |
self.trigger_stat_logging(now) | |
killed_handlers = self.trigger_watchdog(now) | |
self.trigger_data_io() | |
for handler, err_no, err_msg in self.get_completed_handlers(): | |
if killed_handlers and handler in killed_handlers: | |
pass | |
else: | |
self.process_completed_handler(handler, err_no, err_msg) | |
class CallbackTaskGenerator(object): | |
def __init__(self, gen): | |
self.gen = gen | |
self._active = True | |
def get_task(self): | |
if self._active: | |
try: | |
return self.gen.next() | |
except StopIteration: | |
self._active = False | |
return None | |
def is_active(self): | |
return self._active |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment