Skip to content

Instantly share code, notes, and snippets.

@binux
Last active December 6, 2016 10:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save binux/67b276c51e988f8e2c31 to your computer and use it in GitHub Desktop.
Save binux/67b276c51e988f8e2c31 to your computer and use it in GitHub Desktop.
benchmarking for pyspider. using scrpy==0.24.4 mockserver
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
# Author: Binux<i@binux.me>
# http://binux.me
# Created on 2014-11-17 23:29:30
import os
import sys
import time
import logging
import logging.config
logging.config.fileConfig("logging.conf")
logging.getLogger('scheduler').setLevel(logging.ERROR)
logging.getLogger('fetcher').setLevel(logging.ERROR)
logging.getLogger('processor').setLevel(logging.ERROR)
class Get(object):
def __init__(self, getter):
self.getter = getter
def __get__(self, instance, owner):
return self.getter()
from multiprocessing import Queue
from pyspider.database import connect_database
class g(object):
taskdb = Get(lambda : connect_database('sqlite+taskdb://'))
projectdb = Get(lambda : connect_database('sqlite+projectdb:///data/bench/project.db'))
resultdb = Get(lambda : connect_database('sqlite+resultdb://'))
newtask_queue = Queue()
status_queue = Queue()
scheduler2fetcher = Queue()
fetcher2processor = Queue()
processor2result = None
from pyspider.scheduler import Scheduler
class BenchScheduler(Scheduler):
def __init__(self, *args, **kwargs):
super(BenchScheduler, self).__init__(*args, **kwargs)
self.done_cnt = 0
self.start_time = time.time()
self.last_report = 0
def on_task_status(self, task):
self.done_cnt += 1
return super(BenchScheduler, self).on_task_status(task)
def _try_dump_cnt(self):
now = time.time()
if now - self.last_report >= 1:
self.last_report = now
rps = self.done_cnt * 1.0 / (now - self.start_time)
print "Crawled %s pages (at %d pages/min)" % (self.done_cnt, rps*60.0)
return super(BenchScheduler, self)._try_dump_cnt()
def run_scheduler(g=g):
taskdb = g.taskdb
taskdb.insert('bench', 'data:,on_start', {
'taskid': 'data:,on_start',
'project': 'bench',
'url': 'data:,on_start',
'status': g.taskdb.ACTIVE,
'process': {
'callback': 'on_start',
}
})
scheduler = BenchScheduler(taskdb=taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
newtask_queue=g.newtask_queue, status_queue=g.status_queue,
out_queue=g.scheduler2fetcher)
scheduler.run()
from pyspider.fetcher.tornado_fetcher import Fetcher
class BenchFetcher(Fetcher):
def __init__(self, *args, **kwargs):
super(BenchFetcher, self).__init__(*args, **kwargs)
self.done_cnt = 0
self.start_time = time.time()
self.last_report = 0
def on_result(self, type, task, result):
self.done_cnt += 1
now = time.time()
if now - self.last_report >= 1:
self.last_report = now
rps = self.done_cnt * 1.0 / (now - self.start_time)
print " "*(40+3), "Fetched %s pages (at %d pages/min)" % (
self.done_cnt, rps*60.0)
return super(BenchFetcher, self).on_result(type, task, result)
def run_fetcher(g=g):
fetcher = BenchFetcher(inqueue=g.scheduler2fetcher, outqueue=g.fetcher2processor, poolsize=100)
fetcher.run()
from pyspider.processor import Processor
class BenchProcessor(Processor):
def __init__(self, *args, **kwargs):
super(BenchProcessor, self).__init__(*args, **kwargs)
self.done_cnt = 0
self.start_time = time.time()
self.last_report = 0
def on_task(self, task, response):
self.done_cnt += 1
now = time.time()
if now - self.last_report >= 1:
self.last_report = now
rps = self.done_cnt * 1.0 / (now - self.start_time)
print " "*90, "Processed %s pages (at %d pages/min)" % (
self.done_cnt, rps*60.0)
return super(BenchProcessor, self).on_task(task, response)
def run_processor(g=g):
processor = BenchProcessor(projectdb=g.projectdb,
inqueue=g.fetcher2processor, status_queue=g.status_queue,
newtask_queue=g.newtask_queue, result_queue=g.processor2result)
processor.run()
total = 10000
bench_script = '''
from pyspider.libs.base_handler import *
class Handler(BaseHandler):
def on_start(self):
self.crawl('http://localhost:8998/follow', params={'total': %d, 'show': 20, 'order': 'rand', 'maxlatency': 0.0}, callback=self.index_page)
def index_page(self, response):
for each in response.doc('a[href^="http://"]').items():
self.crawl(each.attr.href, callback=self.index_page)
''' % total
if __name__ == '__main__':
import shutil
import signal
from pyspider.libs.utils import run_in_thread, run_in_subprocess
run_in = run_in_thread
shutil.rmtree('./data/bench', ignore_errors=True)
os.makedirs('./data/bench')
g.projectdb.insert('bench', {
'name': 'bench',
'status': 'RUNNING',
'script': bench_script,
'rate': total,
'burst': total,
'updatetime': time.time()
})
threads = []
threads.append(run_in(run_processor, g=g))
threads.append(run_in(run_fetcher, g=g))
run_scheduler()
shutil.rmtree('./data/bench', ignore_errors=True)
@binux
Copy link
Author

binux commented Nov 18, 2014

using scrapy 0.24.4 mockserver
python -u -m scrapy.tests.mockserver

A new version will give you a clear view of the performance of each components.

about 5700 pages/min running in threads with one process.
about 4800 pages/min for scrapy bench

maximum performance (running in subprocesses):
scheduler: 17000 pages/min
fetcher: 33000 pages/min (poolsize=100)
processor: 6000 pages/min

  • fetcher can't exit gracefully when it's working. But it's fine when the requests finished.
  • scheduler will send a on_start and on_get_info at beginning for each project.
  • I using unlimited message queue, so the traffic is pushing to next components as mush as possible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment