Skip to content

Instantly share code, notes, and snippets.

@briancline
Created March 9, 2013 07:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save briancline/5123382 to your computer and use it in GitHub Desktop.
Save briancline/5123382 to your computer and use it in GitHub Desktop.
Queue consumer with some creates/deletes persisted in a Riak bucket
#!/usr/bin/env python
from __future__ import print_function
import json
import ConfigParser
import optparse
import pika
import riak
from pprint import pprint
from pika.adapters import SelectConnection
config = None
channel = None
bucket = None
def on_connect(connection):
connection.channel(on_channel_open)
def on_channel_open(new_channel):
global channel
channel = new_channel
queue = channel.queue_declare(queue=config.get('mq', 'queue'),
durable=True, exclusive=False,
auto_delete=False, callback=on_queue_declare)
def on_queue_declare(frame):
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_consume, queue=config.get('mq', 'queue'))
def on_consume(channel, method, header, body):
if body:
event = json.loads(body)
meta = event['meta'] if 'meta' in event else None
if meta and meta['type'] == 'post':
if meta and 'is_deleted' in meta and meta['is_deleted']:
on_post_delete(event)
elif meta:
on_post_create(event)
channel.basic_ack(delivery_tag=method.delivery_tag)
def on_post_delete(event):
# remove post from Riak
post = bucket.get(event['meta']['id'])
if post:
post.delete()
def on_post_create(event):
# put post into Riak
post = bucket.new(event['meta']['id'], data=event)
post.store()
if __name__ == "__main__":
parser = optparse.OptionParser(usage='usage: %prog [options]')
parser.set_defaults(config_file="config.conf")
parser.add_option('-c', '--config', dest='config_file',
help='read configuration from FILE', metavar='FILE')
(options, args) = parser.parse_args()
config = ConfigParser.ConfigParser()
config.read(options.config_file)
db = riak.RiakClient(host=config.get('riak', 'server'),
port=config.getint('riak', 'port'),
transport_class=riak.RiakPbcTransport)
bucket = db.bucket(config.get('riak', 'bucket'))
credentials = pika.PlainCredentials(config.get('mq', 'user'),
config.get('mq', 'pass'))
conn_params = pika.ConnectionParameters(host=config.get('mq', 'server'),
port=config.getint('mq', 'port'),
virtual_host=config.get('mq',
'vhost'),
credentials=credentials)
connection = pika.SelectConnection(conn_params, on_connect)
try:
connection.ioloop.start()
except KeyboardInterrupt:
connection.close()
connection.ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment