Skip to content

Instantly share code, notes, and snippets.

@theduderog
Created February 1, 2011 17:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save theduderog/806225 to your computer and use it in GitHub Desktop.
Save theduderog/806225 to your computer and use it in GitHub Desktop.
Basic setup for how you consume from one queue and produce into another
import logging
import simplejson
from twisted.internet import reactor, defer
from stompest.async import StompConfig, StompCreator
class IncrementTransformer(object):
IN_QUEUE = '/queue/testIn'
OUT_QUEUE = '/queue/testOut'
ERROR_QUEUE = '/queue/testTransformerError'
def __init__(self, config=None):
if config is None:
config = StompConfig('localhost', 61613)
self.config = config
@defer.inlineCallbacks
def run(self):
#Establish connection
stomp = yield StompCreator(self.config).getConnection()
#Subscribe to inbound queue
headers = {
#client-individual mode is only supported in AMQ >= 5.2 but necessary for concurrent processing
'ack': 'client-individual',
#this is the maximum messages the broker will let you work on at the same time
'activemq.prefetchSize': 100,
}
stomp.subscribe(self.IN_QUEUE, self.addOne, headers, errorDestination=self.ERROR_QUEUE)
def addOne(self, stomp, frame):
"""
NOTE: you can return a Deferred here
"""
data = simplejson.loads(frame['body'])
data['count'] += 1
stomp.send(self.OUT_QUEUE, simplejson.dumps(data))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
IncrementTransformer().run()
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment