public
Created

STOMP example with group id

  • Download Gist
gistfile1.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
#!/usr/bin/env python
 
import sys
import time
import logging
import random
import threading
from stompy import Client
 
port = 61613
queue_name = "/queue/test"
 
class MyClient(threading.Thread):
 
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
 
def frame_received(self, frame):
if frame.command == 'MESSAGE':
global total_rcvd
total_rcvd += 1
#print frame
dest = frame.headers['destination']
if not self.msgs.has_key(dest):
self.msgs[dest] = [ ]
self.msgs[dest].append("%.3f\t%d\t%s" % \
(time.time(), self.id, frame.body))
else:
print frame
 
def run(self):
self.running = True
self.msgs = { }
client = Client('127.0.0.1', port)
client.connect({ 'activemq.prefetchSize' : 0 })
client.subscribe(queue_name, ack='client')
while self.running:
try:
f = client.get_nowait()
self.frame_received(f)
time.sleep(random.random())
client.ack(f)
except:
time.sleep(0.01)
client.disconnect()
 
class MyProducer(threading.Thread):
def run(self):
client = Client('127.0.0.1', port)
client.connect()
start = time.time()
for i in range(self.count):
x = random.randint(0, 2)
headers = { 'JMSXGroupID' : 'campaign-%d' % x }
msg = '%.3f\t%d\t%d' % (time.time(), x, i)
client.put(msg,
destination=queue_name,
conf=headers)
retries = 10
while retries > 0:
try:
f = client.get_nowait()
print f
except:
retries -= 1
time.sleep(0.1)
client.disconnect()
elapsed = time.time() - start
print "producer sent %d msgs in %.3f seconds" % (self.count, elapsed)
 
#############################################################################
 
logging.basicConfig(level=logging.INFO)
 
total_rcvd = 0
 
clients = []
for i in range(5):
c = MyClient(i)
c.start()
clients.append(c)
 
p = MyProducer()
p.count = 100
if len(sys.argv) > 1:
p.count = int(sys.argv[1])
 
try:
p.run()
while total_rcvd < p.count:
time.sleep(p.count * .05)
except KeyboardInterrupt:
pass
 
for c in clients:
c.running = False

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.