Skip to content

Instantly share code, notes, and snippets.

@masroore
Created August 5, 2014 05:50
Show Gist options
  • Save masroore/c30f59e2d6996afdfcc8 to your computer and use it in GitHub Desktop.
Save masroore/c30f59e2d6996afdfcc8 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
import multiprocessing
import Queue
from grab.spider.error import FatalError
class WorkerBackendInterface(self):
pass
def async_handler(cls_path, handler_name, queue, grab, task):
try:
mod_path, cls_name = cls_path.rsplit('.', 1)
mod = __import__(mod_path, None, None, ['foo'])
cls = getattr(mod, cls_name)(container_mode=True)
handler = getattr(cls, handler_name)
try:
result = handler(grab, task)
if isinstance(result, types.GeneratorType):
for item in result:
queue.put((handler_name, item, task))
else:
queue.put((handler_name, result, task))
except Exception, ex:
self.process_handler_error(handler.__name__, ex, res['task'])
except Exception, ex:
print traceback.format_exc()
class WorkerBackend(WorkerBackendInterface):
def __init__(self, spider, **kwargs):
self.spider = spider
def prepare(self):
self.process_pool = multiprocessing.Pool(1)
self.manager = multiprocessing.Manager()
self.handler_result_queue = self.manager.Queue()
self.multi_request_list = []
def active(self):
return (len([x for x in self.multi_request_list if not x.ready()]) > 0 or
self.handler_result_queue.qsize() > 0)
def process_response(self, handler, grab, task):
if not self.spider.import_path:
self.spider.import_path = self.spider.build_import_path()
req = self.process_pool.apply_async(
async_handler,
(self.spider.import_path, handler.__name__,
self.handler_result_queue, res['grab'], res['task'])
)
self.multi_request_list.append(req)
def get_results(self):
"""
Iterates over handler results which are done so far.
:raises: Queue.Empty
"""
while True:
try:
handler_name, task_result, task = self.handler_result_queue.get(False)
except Queue.Empty:
return
else:
yield (handler_name, task_result, task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment