Skip to content

Instantly share code, notes, and snippets.

@leoken
Forked from ozgurakan/marconi-sample.py
Created July 31, 2013 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leoken/6125742 to your computer and use it in GitHub Desktop.
Save leoken/6125742 to your computer and use it in GitHub Desktop.
import requests
import json
username = 'my-user'
apikey = 'my-api-key'
url = 'https://test.my-marconi-server.com:443'
class Queue_Connection(object):
def __init__(self, username, apikey):
url = 'https://identity.api.rackspacecloud.com/v2.0/tokens'
payload = {"auth":{"RAX-KSKEY:apiKeyCredentials":{"username": username , "apiKey": apikey }}}
headers = {'Content-Type': 'application/json'}
r = requests.post(url, data=json.dumps(payload), headers=headers)
self.token = r.json()['access']['token']['id']
self.headers = {'X-Auth-Token' : self.token, 'Content-Type': 'application/json', 'Client-ID': 'QClient1'}
def token(self):
return self.token
def get(self, url, payload=None):
r = requests.get(url, data=json.dumps(payload), headers=self.headers)
return [r.status_code, r.headers, r.content]
def post(self, url, payload=None):
r = requests.post(url, data=json.dumps(payload), headers=self.headers)
return [r.status_code, r.headers, r.content]
def put(self, url, payload=None):
r = requests.put(url, data=json.dumps(payload), headers=self.headers)
return [r.status_code, r.headers, r.content]
def delete(self, url, payload=None):
r = requests.delete(url, data=json.dumps(payload), headers=self.headers)
return [r.status_code, r.headers, r.content]
class Producer(Queue_Connection):
def __init__(self, url, username, apikey):
super(Producer, self).__init__(username, apikey)
self.base_url = url
def queue_name():
def fget(self):
return self._queue_name
def fset(self, value):
self._queue_name = value
def fdel(self):
del self._queue_name
return locals()
queue_name = property(**queue_name())
def queue_exists(self):
url = self.base_url + '/v1/queues/' + self.queue_name + '/stats'
if self.get(url)[0] == 200:
return True
return False
def create_queue(self, payload=None):
url = self.base_url + "/v1/queues/" + self.queue_name
res = self.put(url, payload)
if res[0] == 200:
print '%s created' % self.queue_name
elif res[0] == 204:
print 'A queue named %s is present' % self.queue_name
else:
print 'Problem with queue creation,'
def post_messages(self, payload):
url = self.base_url + '/v1/queues/' + self.queue_name + '/messages'
res = self.post(url, payload)
if res[0] == 201:
return json.loads(res[2])['resources']
else:
print "Couldn't post messages"
class Consumer(Queue_Connection):
def __init__(self, url, username, apikey):
super(Consumer, self).__init__(username, apikey)
self.base_url = url
def claim_messages(self, payload, limit=1):
url = self.base_url + '/v1/queues/' + self.queue_name + '/claims?limit=' + str(limit)
res = self.post(url, payload)
if res[0] == 200:
return json.loads(res[2])
else:
print "Couldn't claim messages"
def delete_message(self, url):
url = self.base_url + url
res = self.delete(url)
if res[0] == 204:
print "Message deleted"
""" create a Producer instance """
pub = Producer(url, username, apikey)
pub.queue_name = 'testqueue'
if not pub.queue_exists():
print "Creating queue", pub.queue_name
pub.create_queue({"metadata": "My Queue"})
""" create and post two messages """
data = [{"ttl": 60,"body": {"task":"one"}},{"ttl": 60,"body": {"task":"two"}}]
for message in pub.post_messages(data):
print "message: ", message
""" create a Consumer instance """
con = Consumer(url, username, apikey)
""" define ttl and grace times for the claim """
data = {"ttl":60, "grace":60}
con.queue_name = 'testqueue'
messages = con.claim_messages(data, 2)
for message in messages:
print "task : ", message['body']['task']
print message['href']
"""
do something with the messages
when done delete
"""
for message in messages:
con.delete_message(message['href'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment