benchmarking for pyspider. using scrpy==0.24.4 mockserver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- encoding: utf-8 -*- | |
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: | |
# Author: Binux<i@binux.me> | |
# http://binux.me | |
# Created on 2014-11-17 23:29:30 | |
import os | |
import sys | |
import time | |
import logging | |
import logging.config | |
logging.config.fileConfig("logging.conf") | |
logging.getLogger('scheduler').setLevel(logging.ERROR) | |
logging.getLogger('fetcher').setLevel(logging.ERROR) | |
logging.getLogger('processor').setLevel(logging.ERROR) | |
class Get(object): | |
def __init__(self, getter): | |
self.getter = getter | |
def __get__(self, instance, owner): | |
return self.getter() | |
from multiprocessing import Queue | |
from pyspider.database import connect_database | |
class g(object): | |
taskdb = Get(lambda : connect_database('sqlite+taskdb://')) | |
projectdb = Get(lambda : connect_database('sqlite+projectdb:///data/bench/project.db')) | |
resultdb = Get(lambda : connect_database('sqlite+resultdb://')) | |
newtask_queue = Queue() | |
status_queue = Queue() | |
scheduler2fetcher = Queue() | |
fetcher2processor = Queue() | |
processor2result = None | |
from pyspider.scheduler import Scheduler | |
class BenchScheduler(Scheduler): | |
def __init__(self, *args, **kwargs): | |
super(BenchScheduler, self).__init__(*args, **kwargs) | |
self.done_cnt = 0 | |
self.start_time = time.time() | |
self.last_report = 0 | |
def on_task_status(self, task): | |
self.done_cnt += 1 | |
return super(BenchScheduler, self).on_task_status(task) | |
def _try_dump_cnt(self): | |
now = time.time() | |
if now - self.last_report >= 1: | |
self.last_report = now | |
rps = self.done_cnt * 1.0 / (now - self.start_time) | |
print "Crawled %s pages (at %d pages/min)" % (self.done_cnt, rps*60.0) | |
return super(BenchScheduler, self)._try_dump_cnt() | |
def run_scheduler(g=g): | |
taskdb = g.taskdb | |
taskdb.insert('bench', 'data:,on_start', { | |
'taskid': 'data:,on_start', | |
'project': 'bench', | |
'url': 'data:,on_start', | |
'status': g.taskdb.ACTIVE, | |
'process': { | |
'callback': 'on_start', | |
} | |
}) | |
scheduler = BenchScheduler(taskdb=taskdb, projectdb=g.projectdb, resultdb=g.resultdb, | |
newtask_queue=g.newtask_queue, status_queue=g.status_queue, | |
out_queue=g.scheduler2fetcher) | |
scheduler.run() | |
from pyspider.fetcher.tornado_fetcher import Fetcher | |
class BenchFetcher(Fetcher): | |
def __init__(self, *args, **kwargs): | |
super(BenchFetcher, self).__init__(*args, **kwargs) | |
self.done_cnt = 0 | |
self.start_time = time.time() | |
self.last_report = 0 | |
def on_result(self, type, task, result): | |
self.done_cnt += 1 | |
now = time.time() | |
if now - self.last_report >= 1: | |
self.last_report = now | |
rps = self.done_cnt * 1.0 / (now - self.start_time) | |
print " "*(40+3), "Fetched %s pages (at %d pages/min)" % ( | |
self.done_cnt, rps*60.0) | |
return super(BenchFetcher, self).on_result(type, task, result) | |
def run_fetcher(g=g): | |
fetcher = BenchFetcher(inqueue=g.scheduler2fetcher, outqueue=g.fetcher2processor, poolsize=100) | |
fetcher.run() | |
from pyspider.processor import Processor | |
class BenchProcessor(Processor): | |
def __init__(self, *args, **kwargs): | |
super(BenchProcessor, self).__init__(*args, **kwargs) | |
self.done_cnt = 0 | |
self.start_time = time.time() | |
self.last_report = 0 | |
def on_task(self, task, response): | |
self.done_cnt += 1 | |
now = time.time() | |
if now - self.last_report >= 1: | |
self.last_report = now | |
rps = self.done_cnt * 1.0 / (now - self.start_time) | |
print " "*90, "Processed %s pages (at %d pages/min)" % ( | |
self.done_cnt, rps*60.0) | |
return super(BenchProcessor, self).on_task(task, response) | |
def run_processor(g=g): | |
processor = BenchProcessor(projectdb=g.projectdb, | |
inqueue=g.fetcher2processor, status_queue=g.status_queue, | |
newtask_queue=g.newtask_queue, result_queue=g.processor2result) | |
processor.run() | |
total = 10000 | |
bench_script = ''' | |
from pyspider.libs.base_handler import * | |
class Handler(BaseHandler): | |
def on_start(self): | |
self.crawl('http://localhost:8998/follow', params={'total': %d, 'show': 20, 'order': 'rand', 'maxlatency': 0.0}, callback=self.index_page) | |
def index_page(self, response): | |
for each in response.doc('a[href^="http://"]').items(): | |
self.crawl(each.attr.href, callback=self.index_page) | |
''' % total | |
if __name__ == '__main__': | |
import shutil | |
import signal | |
from pyspider.libs.utils import run_in_thread, run_in_subprocess | |
run_in = run_in_thread | |
shutil.rmtree('./data/bench', ignore_errors=True) | |
os.makedirs('./data/bench') | |
g.projectdb.insert('bench', { | |
'name': 'bench', | |
'status': 'RUNNING', | |
'script': bench_script, | |
'rate': total, | |
'burst': total, | |
'updatetime': time.time() | |
}) | |
threads = [] | |
threads.append(run_in(run_processor, g=g)) | |
threads.append(run_in(run_fetcher, g=g)) | |
run_scheduler() | |
shutil.rmtree('./data/bench', ignore_errors=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
using scrapy 0.24.4 mockserver
python -u -m scrapy.tests.mockserver
A new version will give you a clear view of the performance of each components.
about 5700 pages/min running in threads with one process.
about 4800 pages/min for
scrapy bench
maximum performance (running in subprocesses):
scheduler: 17000 pages/min
fetcher: 33000 pages/min (poolsize=100)
processor: 6000 pages/min
on_start
andon_get_info
at beginning for each project.