Skip to content

Instantly share code, notes, and snippets.

@bcoe
Created July 22, 2010 00:43
Show Gist options
  • Save bcoe/485405 to your computer and use it in GitHub Desktop.
Save bcoe/485405 to your computer and use it in GitHub Desktop.
from multiprocessing import Process, Queue
from amqplib import client_0_8 as amqp
from hackwars import SETTINGS
import simplejson as json
import threading
CONNECTION_LOOKUP = {}
def handle_message(message):
payload = json.loads(message.body)
incoming_socket = CONNECTION_LOOKUP[payload['access_token']]
try:
# Send something useful here
incoming_socket.send(message.body)
except:
return
def handle_request(incoming_socket, incoming_address):
global signal, messages
# We bind the routing_key to the access token.
access_token = incoming_socket.recv(36)
incoming_socket.send(access_token)
# Store a reference
CONNECTION_LOOKUP[access_token] = incoming_socket
# Connect to RabbitMQ.
conn = amqp.Connection(host=SETTINGS['rabbit_host'] + ":5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()
chan.queue_declare(queue=access_token, durable=True, exclusive=False, auto_delete=False)
chan.exchange_declare(exchange='client', type="direct", durable=True, auto_delete=False,)
chan.queue_bind(queue=access_token, exchange='client', routing_key=access_token)
chan.basic_consume(queue=access_token, no_ack=True, callback=handle_message, consumer_tag='tag', exclusive=False)
# Wait on channel in sub-process so that we do not
# block indefinitely.
spawn_channel_wait_process(chan)
# Cleanup.
chan.queue_delete(queue=access_token)
chan.basic_cancel(consumer_tag='tag')
chan.close()
conn.close()
print 'Comet thread killed.'
def spawn_channel_wait_process(chan):
"""
Wait on a channel.
"""
# A queue is used to copy necessary state information back
# and forth between processes.
queue = Queue()
p = Process(target=_channel_wait, args=(queue, chan))
p.start()
try:
while True:
result = queue.get(block = True, timeout = 30)
# Do something with result from spawned process?
except Exception, e:
"""
A timeout was hit while executing the script.
"""
p.terminate()
def _channel_wait(queue, chan):
"""
Spawned as a sub-process, listens on the
RabbitMQ queue.
"""
while True:
chan.wait()
queue.put({
'status': 'ok'
})
@bcoe
Copy link
Author

bcoe commented Jul 22, 2010

This thread talks to JavaScript over the orbited connection and dispatches inbound messages from RabbitMQ.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment