Skip to content

Instantly share code, notes, and snippets.

@atodorov
Created November 7, 2014 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save atodorov/2bc1fcd34531ad260ed7 to your computer and use it in GitHub Desktop.
Save atodorov/2bc1fcd34531ad260ed7 to your computer and use it in GitHub Desktop.
Queue, Kombu and Celery tests
#!/usr/bin/env python
import djapp.mem_serializer
from datetime import datetime
from kombu.pools import producers
from kombu import Connection, Exchange
msg = {u'body': {'expires': None, 'utc': True, 'args': [], 'chord': None, 'callbacks': None, 'errbacks': None, 'taskset': None, 'id': '79ac070e-86fa-4f7a-9ce2-38ad18dbd8d3', 'retries': 0, 'task': 'djapp.celery.debug_task', 'timelimit': (None, None), 'eta': None, 'kwargs': {}}, u'headers': {}, u'content-type': 'application/x-memory', u'properties': {'reply_to': '2b255b23-2b57-3e54-98a9-3ee48645ae02', 'correlation_id': '79ac070e-86fa-4f7a-9ce2-38ad18dbd8d3', 'delivery_mode': 2, u'delivery_info': {u'priority': 0}}, u'content-encoding': 'binary'}
# The exchange we send our news articles to.
news_exchange = Exchange('news')
# The broker where our exchange is.
connection = Connection('fastmemory://')
i = 0
start = datetime.now()
try:
with producers[connection].acquire(block=True) as producer:
for j in range(1000000):
producer.publish(
msg,
exchange=news_exchange,
routing_key='domestic',
declare=[news_exchange],
# serializer='json',
# serializer='pickle',
serializer='mem_serializer',
compression=None)
i += 1
except KeyboardInterrupt:
pass
stop = datetime.now()
delta = stop - start
per_sec = i / (delta.days*24*3600 + delta.seconds)
print
print i, "messages put in queue for", delta, "seconds"
print "Msgs per sec:", per_sec
#!/usr/bin/env python
from Queue import Queue
from datetime import datetime
q = Queue()
msg = {u'body': {'expires': None, 'utc': True, 'args': [], 'chord': None, 'callbacks': None, 'errbacks': None, 'taskset': None, 'id': '79ac070e-86fa-4f7a-9ce2-38ad18dbd8d3', 'retries': 0, 'task': 'djapp.celery.debug_task', 'timelimit': (None, None), 'eta': None, 'kwargs': {}}, u'headers': {}, u'content-type': 'application/x-memory', u'properties': {'reply_to': '2b255b23-2b57-3e54-98a9-3ee48645ae02', 'correlation_id': '79ac070e-86fa-4f7a-9ce2-38ad18dbd8d3', 'delivery_mode': 2, u'delivery_info': {u'priority': 0}}, u'content-encoding': 'binary'}
i = 0
start = datetime.now()
try:
for j in range(1000000):
q.put(msg)
i += 1
except KeyboardInterrupt:
pass
stop = datetime.now()
delta = stop - start
per_sec = i / (delta.days*24*3600 + delta.seconds)
print
print i, "messages put in queue for", delta, "seconds"
print "Msgs per sec:", per_sec
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment