Skip to content

Instantly share code, notes, and snippets.

@wil
Created February 18, 2010 12:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wil/307604 to your computer and use it in GitHub Desktop.
Save wil/307604 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
### patch socket and tornado
import gevent.monkey; gevent.monkey.patch_all()
import gtornado.monkey; gtornado.monkey.patch_all()
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from amqplib import client_0_8 as amqp_client
from amqplib.client_0_8 import Message
from tornado.options import define, options
"""
1. Run this example
2. Open http://localhost:8123/ in browser, it will stall until 3 messages are received
3. Publish 3 messages using curl
$ curl http://localhost:8123/pub?q=message1
$ curl http://localhost:8123/pub?q=message2
$ curl http://localhost:8123/pub?q=message3
"""
XNAME="tornado_test"
define("port", default=8123, help="run on the given port", type=int)
class MainHandler(tornado.web.RequestHandler):
def msg_cb(self, msg):
self.write("got msg: %s<br>" % msg.body)
def get(self):
conn = amqp_client.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()
qname = str(id(self))
chan.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
chan.queue_bind(queue=qname, exchange=XNAME)
print "consuming messages"
tag = chan.basic_consume(queue=qname, no_ack=True, callback=self.msg_cb)
i = 0
while i < 3:
print "waiting.."
chan.wait()
i += 1
chan.basic_cancel(tag)
chan.close()
class PubHandler(tornado.web.RequestHandler):
def get(self):
conn = amqp_client.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
msg = Message(self.get_argument("q"))
chan = conn.channel()
self.write("publishing...")
chan.basic_publish(msg, exchange=XNAME)
chan.close()
def amqp_setup():
conn = amqp_client.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()
chan.exchange_declare(exchange=XNAME, type="fanout", durable=True, auto_delete=False)
def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/", MainHandler),
(r"/pub", PubHandler),
])
amqp_setup()
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment