Skip to content

Instantly share code, notes, and snippets.

@coopernurse
Created April 16, 2011 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save coopernurse/923228 to your computer and use it in GitHub Desktop.
Save coopernurse/923228 to your computer and use it in GitHub Desktop.
STOMP example with group id
#!/usr/bin/env python
import sys
import time
import logging
import random
import threading
from stompy import Client
port = 61613
queue_name = "/queue/test"
class MyClient(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
def frame_received(self, frame):
if frame.command == 'MESSAGE':
global total_rcvd
total_rcvd += 1
#print frame
dest = frame.headers['destination']
if not self.msgs.has_key(dest):
self.msgs[dest] = [ ]
self.msgs[dest].append("%.3f\t%d\t%s" % \
(time.time(), self.id, frame.body))
else:
print frame
def run(self):
self.running = True
self.msgs = { }
client = Client('127.0.0.1', port)
client.connect({ 'activemq.prefetchSize' : 0 })
client.subscribe(queue_name, ack='client')
while self.running:
try:
f = client.get_nowait()
self.frame_received(f)
time.sleep(random.random())
client.ack(f)
except:
time.sleep(0.01)
client.disconnect()
class MyProducer(threading.Thread):
def run(self):
client = Client('127.0.0.1', port)
client.connect()
start = time.time()
for i in range(self.count):
x = random.randint(0, 2)
headers = { 'JMSXGroupID' : 'campaign-%d' % x }
msg = '%.3f\t%d\t%d' % (time.time(), x, i)
client.put(msg,
destination=queue_name,
conf=headers)
retries = 10
while retries > 0:
try:
f = client.get_nowait()
print f
except:
retries -= 1
time.sleep(0.1)
client.disconnect()
elapsed = time.time() - start
print "producer sent %d msgs in %.3f seconds" % (self.count, elapsed)
#############################################################################
logging.basicConfig(level=logging.INFO)
total_rcvd = 0
clients = []
for i in range(5):
c = MyClient(i)
c.start()
clients.append(c)
p = MyProducer()
p.count = 100
if len(sys.argv) > 1:
p.count = int(sys.argv[1])
try:
p.run()
while total_rcvd < p.count:
time.sleep(p.count * .05)
except KeyboardInterrupt:
pass
for c in clients:
c.running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment