Skip to content

Instantly share code, notes, and snippets.

@fabware
Created May 24, 2011 08:57
Show Gist options
  • Save fabware/988375 to your computer and use it in GitHub Desktop.
Save fabware/988375 to your computer and use it in GitHub Desktop.
webdis slow publish rps compared with raw client
--------- webdis_consumer.py ----------
# -*- coding: utf-8 -
import simplejson
import httplib
def check_callable(cb):
if not callable(cb):
raise TypeError("callback isn't a callable")
class IterableResponse(httplib.HTTPResponse, object):
def __iter__(self):
assert self.chunked != httplib._UNKNOWN, "Response is not chunked!"
chunk_left = self.chunk_left
while 1:
if chunk_left is None:
line = self.fp.readline()
i = line.find(';')
if i >= 0:
line = line[:i]
chunk_left = int(line, 16)
if chunk_left == 0:
break
yield self._safe_read(chunk_left)
self._safe_read(2)
chunk_left = None
while 1:
line = self.fp.readline()
if not line:
break
if line == '\r\n':
break
self.close()
class WebdisConsumer():
def __init__(self, host, channel, **kwargs):
self.connection = httplib.HTTPConnection(host)
self.connection.response_class = IterableResponse
self.channel = 'ch:%s'%channel
def wait(self, cb, **params):
check_callable(cb)
subscribe_path = "/SUBSCRIBE/%s"%self.channel
self.connection.request('GET', subscribe_path)
resp = self.connection.getresponse()
for msg in resp:
try:
msg = simplejson.loads(msg)
cb(msg)
except:
import traceback; traceback.print_exc()
-------- test_webdis_subscribe.py ----------
# -*- coding: utf-8 -*-
import os
import time
import simplejson
import base64
import logging
import logging.handlers
from webdis_consumer import WebdisConsumer
import settings
counter = 0
def process_msg(msg):
print '[%s]'%counter, msg
time.sleep(0.01)
global counter
counter += 1
if __name__ == '__main__':
while True:
try:
print 'wait..'
ground_consumer = WebdisConsumer(settings.APP_MESSAGER_WEBDIS_HOST, 'test')
ground_consumer.wait(process_msg)
except Exception, e:
logging.error('consumer wait error: %s'%e)
time.sleep(10)
-------- test_redispy_publish.py ---------
# -*- coding: utf-8 -*-
import sys
import time
from redis import Redis
import settings
temp_rclient = Redis(host=settings.TEMP_REDIS_SERVER, port=settings.TEMP_REDIS_PORT, db=0)
def publish(num_msg):
for i in range(num_msg):
temp_rclient.publish('ch:test', 'hi')
if __name__ == '__main__':
num_msg = int(sys.argv[1])
print 'starting...'
start_time = time.time()
publish(num_msg)
end_time = time.time()
print 'done: %s rps'%(num_msg / (end_time - start_time))
-------- test_redispy_subscribe.py ---------
# -*- coding: utf-8 -*-
import time
import sys
from redis import Redis
import settings
temp_rclient = Redis(host=settings.TEMP_REDIS_SERVER, port=settings.TEMP_REDIS_PORT, db=0)
temp_rclient.subscribe('ch:test')
counter = 0
for msg in temp_rclient.listen():
print '[%d]'%counter, msg['data']
time.sleep(0.01)
counter += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment