Created
May 24, 2011 08:57
-
-
Save fabware/988375 to your computer and use it in GitHub Desktop.
webdis slow publish rps compared with raw client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--------- webdis_consumer.py ---------- | |
# -*- coding: utf-8 - | |
import simplejson | |
import httplib | |
def check_callable(cb): | |
if not callable(cb): | |
raise TypeError("callback isn't a callable") | |
class IterableResponse(httplib.HTTPResponse, object): | |
def __iter__(self): | |
assert self.chunked != httplib._UNKNOWN, "Response is not chunked!" | |
chunk_left = self.chunk_left | |
while 1: | |
if chunk_left is None: | |
line = self.fp.readline() | |
i = line.find(';') | |
if i >= 0: | |
line = line[:i] | |
chunk_left = int(line, 16) | |
if chunk_left == 0: | |
break | |
yield self._safe_read(chunk_left) | |
self._safe_read(2) | |
chunk_left = None | |
while 1: | |
line = self.fp.readline() | |
if not line: | |
break | |
if line == '\r\n': | |
break | |
self.close() | |
class WebdisConsumer(): | |
def __init__(self, host, channel, **kwargs): | |
self.connection = httplib.HTTPConnection(host) | |
self.connection.response_class = IterableResponse | |
self.channel = 'ch:%s'%channel | |
def wait(self, cb, **params): | |
check_callable(cb) | |
subscribe_path = "/SUBSCRIBE/%s"%self.channel | |
self.connection.request('GET', subscribe_path) | |
resp = self.connection.getresponse() | |
for msg in resp: | |
try: | |
msg = simplejson.loads(msg) | |
cb(msg) | |
except: | |
import traceback; traceback.print_exc() | |
-------- test_webdis_subscribe.py ---------- | |
# -*- coding: utf-8 -*- | |
import os | |
import time | |
import simplejson | |
import base64 | |
import logging | |
import logging.handlers | |
from webdis_consumer import WebdisConsumer | |
import settings | |
counter = 0 | |
def process_msg(msg): | |
print '[%s]'%counter, msg | |
time.sleep(0.01) | |
global counter | |
counter += 1 | |
if __name__ == '__main__': | |
while True: | |
try: | |
print 'wait..' | |
ground_consumer = WebdisConsumer(settings.APP_MESSAGER_WEBDIS_HOST, 'test') | |
ground_consumer.wait(process_msg) | |
except Exception, e: | |
logging.error('consumer wait error: %s'%e) | |
time.sleep(10) | |
-------- test_redispy_publish.py --------- | |
# -*- coding: utf-8 -*- | |
import sys | |
import time | |
from redis import Redis | |
import settings | |
temp_rclient = Redis(host=settings.TEMP_REDIS_SERVER, port=settings.TEMP_REDIS_PORT, db=0) | |
def publish(num_msg): | |
for i in range(num_msg): | |
temp_rclient.publish('ch:test', 'hi') | |
if __name__ == '__main__': | |
num_msg = int(sys.argv[1]) | |
print 'starting...' | |
start_time = time.time() | |
publish(num_msg) | |
end_time = time.time() | |
print 'done: %s rps'%(num_msg / (end_time - start_time)) | |
-------- test_redispy_subscribe.py --------- | |
# -*- coding: utf-8 -*- | |
import time | |
import sys | |
from redis import Redis | |
import settings | |
temp_rclient = Redis(host=settings.TEMP_REDIS_SERVER, port=settings.TEMP_REDIS_PORT, db=0) | |
temp_rclient.subscribe('ch:test') | |
counter = 0 | |
for msg in temp_rclient.listen(): | |
print '[%d]'%counter, msg['data'] | |
time.sleep(0.01) | |
counter += 1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment