Skip to content

Instantly share code, notes, and snippets.

@vkmc
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vkmc/1816061a3e601a93a42b to your computer and use it in GitHub Desktop.
Save vkmc/1816061a3e601a93a42b to your computer and use it in GitHub Desktop.
Marconi API v1 mapping with AMQP 1.0
Interaction with the broker using Apache Proton library
Apache Proton API http://qpid.apache.org/releases/qpid-proton-0.7/protocol-engine/python/api/index.html
# Queues
AMQP Queues are created in a lazy way (when messages are sent). The put() method in the Messenger class is used with that purpose.
AMQP Messenger class http://qpid.apache.org/releases/qpid-proton-0.7/protocol-engine/python/api/proton.Messenger-class.html.
An AMQP messenger can manage multiple queues. How can we get the queues that a messenger has? How can we get the metadata of those queues? Queues are not a first class citizen in this model, they are just an attribute of messages. More on this idea here http://blog.flaper87.com/post/531cd585d987d24e83f082a5/.
A possible approach to implement this without changing the API that much could be:
We let users know: "Hey, AMQP + Marconi 1.1 queue ops are really no-ops. No need to use them."
POV 1) if users keep this in mind, they won't send those messages
POV 2) we return some code that indicates that this operation is not implemented, some 4xx level code -- problem with this is that it sounds like it'd take some transport-level modifications to achieve that which wouldn't be applicable to the other storage drivers
POV 3) we return some code that indicates that this operation has been successful, a 201/204 (no content) level code.
In all cases, we can translate it at the storage layer as a no-op that just bubbles back up to the transport layer as a success
GET /v1/queues{?marker,limit,detailed}
GET /v1/queues/{queue_name} == HEAD /v1/queues/{queue_name} # existence check
PUT /v1/queues/{name}
DELETE /v1/queues/{name}
# Queue metadata
Since queue metadata depends on queue existence we could apply the same criteria as above. No queues, no queues metadata. Just let the user know about this situation and return a proper code when neccesary.
PUT /v1/queues/{queue_name}/metadata
GET /v1/queues/{queue_name}/metadata
GET /v1/queues/{queue_name}/stats
# Messages
AMQP Message class http://qpid.apache.org/releases/qpid-proton-0.7/protocol-engine/python/api/proton.Message-class.html#JSON
AMQP Messages have metadata that Marconi messages don't have. We may want to add support for the basics and leave other metadata with a default value, finishing the support in future releases.
Marconi Messages Fields | AMQP Messages Fields
href | address
ttl | ttl
age | creation_time
body | body
GET /v1/queues/{queue_name}/messages{?marker,limit,echo,include_claimed}
# Proton snippet
mng = Messenger()
(params ...)
mng.start()
for a in args:
mng.subscribe(a)
msg = Message()
while True:
mng.recv()
while mng.incoming:
try:
mng.get(msg)
except Exception, e:
print e
else:
print msg.address, msg.subject or "(no subject)", msg.properties, msg.body
mng.stop()
# end snippet
GET /v1/queues/{queue_name}/messages/{message_id} (deprecated - under discussion)
GET /v1/queues/{queue_name}/messages{?ids} (deprecated - under discussion)
POST /v1/queues/{queue_name}/messages
# Proton snippet
mng = Messenger()
mng.start()
msg = Message()
msg.address = address
msg.body = unicode(m)
mng.put(msg)
mng.send()
# end snippet
DELETE /v1/queues/{queue_name}/messages/{message_id}{?claim_id} (deprecated - under discussion)
DELETE /v1/queues/{queue_name}/messages{?ids} ===>
# Claims
Messenger disposition operations allow a receiver to accept or reject specific messages, or ranges of messages.
I'm not sure if we could use some attribute (user_id/correlation_id) in proton Message to filter and implement the claim functionality.
This discussion happened in #openstack-marconi in regard to this and AMQP < 1.0
https://gist.github.com/vkmc/65f6939aecf4ebc84965
POST /v1/queues/{queue_name}/claims{?limit}
GET /v1/queues/{queue_name}/claims/{claim_id}
PATCH /v1/queues/{queue_name}/claims/{claim_id}
DELETE /v1/queues/{queue_name}/claims/{claim_id}
## Currently blocked ##
1. Queues
Marconi exposes HTTP APIs that allow queues to be created, deleted, and listed.
Is there a way to create, delete and list queues?
Proton provide an interface to create queues (implicitly, since those are created lazily by adding a parameter in to the broker address) but AFAIK queues cannot be deleted or listed. I think this is not ok and probably a JIRA candidate (if queues can be created it would be logic for the user to be able to delete them)
2. Messages
Marconi exposes HTTP APIs that allow messages to be listed without consuming them. This API cannot be implemented on top of AMQP 1.0 which implements a strict queuing semantics.
Is there a way to get a collection of messages from the client side?
The main problem is that in AMQP 1.0 messages cannot be obtained without consuming them (update: it looks that using topics this is possible, not sure how though). You also can't retrieve the collection of messages in a specific queue/topic.
Marconi exposes HTTP APIs that allow random access to messages by ID. This API cannot be implemented on top of AMQP 1.0 which does not allow random access to messages.
Is there a way to get/delete messages by some attribute? (this also affects if we want to filter by tenant, unless we create different queues for each)
There are some attributes that could be used for this purpose (message-id) but there aren't ways to create filters in Proton. However... this is a possibility from broker side http://activemq.apache.org/selectors.html. Maybe there is a way to do this from Proton, I should check it out again.
Message acks can be managed in an easier way than in AMQP 0.9.
I'm not sure how we manage acks in Marconi... do we sent and ack when the clients GETs a message?
In either case, if we need this, it looks like AMQP 1.0 makes sending acks easier than in AMQP < 1.0
http://qpid.2158936.n2.nabble.com/Acknowledgement-for-messages-sent-using-Proton-C-td7590540.html. I have dig a bit more on this.
3. Claims
Marconi message consumption API creates a “claim ID” for a set of consumed messages, up to a limit.
We could use the correlation-id for this. BUT the correlation-id must be setted on message creation. Messages cannot be modified once they are in the queue :(
http://activemq.apache.org/can-i-modify-messages-on-a-queue.html.I assume this applies for Proton as well.
Bottom line... no, we cannot manage claims the way we do it in Marconi.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment