Skip to content

Instantly share code, notes, and snippets.

@dexterbt1
Created May 23, 2018 10:16
Show Gist options
  • Save dexterbt1/e5c49a04abdd0d54e04ff73b53d22204 to your computer and use it in GitHub Desktop.
Save dexterbt1/e5c49a04abdd0d54e04ff73b53d22204 to your computer and use it in GitHub Desktop.
stomp worker pattern
import sys
import uuid
import threading
import stomp
class BcastMTWorkerListener(stomp.ConnectionListener):
def __init__(self, conn, receipt_waiting):
self.conn = conn
self.receipt_waiting = receipt_waiting
# ---
self.tx_id = None
self.tx_receipt = None
def on_error(self, headers, message):
print('%s: error "%s"' % (threading.current_thread(), message))
sys.exit(1)
# logger.error
def on_message(self, message_headers, message):
print('%s: on_message start ... got message "%s"' % (threading.current_thread(), message))
# logger.debug
self.tx_cond = threading.Condition()
self.tx_receipt = 'commit-' + str(uuid.uuid4())
self.tx_id = self.conn.begin()
# TODO: FIXME: do translations here ...
out_message = message
self.conn.send('/queue/hello-reply',
out_message,
content_type='application/octet-stream',
headers={'persistent': 'true'})
self.conn.ack(
message_headers['message-id'],
message_headers['subscription'])
self.conn.commit(self.tx_id, headers={'receipt': self.tx_receipt})
with self.receipt_waiting:
self.receipt_waiting.notify()
print('%s: on_message done' % threading.current_thread())
# logger.info('successfully ack+sent ....')
# ---
def on_receipt(self, headers, body):
if 'receipt-id' in headers and headers['receipt-id'] == self.tx_receipt:
with self.tx_cond:
self.tx_cond.notify()
def wait_on_tx_receipt(self):
with self.tx_cond:
self.tx_cond.wait()
def loop(self, receipt_waiting_cond):
while True:
# print('%s: active threads: %s' % (threading.current_thread(), threading.active_count()))
with receipt_waiting_cond:
receipt_waiting_cond.wait()
print("%s: waiting for tx_receipt ..." % threading.current_thread())
self.wait_on_tx_receipt()
print("%s: got tx commit receipt." % threading.current_thread())
# -- main
receipt_waiting_cond = threading.Condition()
conn = stomp.Connection([('localhost', '61613')], vhost='bcast')
worker = BcastMTWorkerListener(conn, receipt_waiting_cond)
conn.set_listener('', worker)
conn.start()
conn.connect('bcast', 'bcast', wait=True)
conn.subscribe(destination='/queue/hello',
ack='client-individual',
id='subscription-' + str(uuid.uuid4()),
headers={
'prefetch-size': '1',
'persistent': 'true',
})
worker.loop(receipt_waiting_cond)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment