Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Last active May 28, 2019 09:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Gsantomaggio/bff62fe28210622cfa9d32d164412800 to your computer and use it in GitHub Desktop.
Save Gsantomaggio/bff62fe28210622cfa9d32d164412800 to your computer and use it in GitHub Desktop.
mandatory

Basic Idea

The idea is to add another parameter to the function RPCCLient as:

client = oslo_messaging.RPCClient(transport, target, options={'mandatory': True})
client.call({}, 'foo', id_value=str(i), test_value="hello oslo")

Inside the function, _publish decode the option value, as:

then pass the mandatory flag to the publish.

on_return function raises a new exception called:

MessageUndeliverable

so in case, the message is not routed to any queue, the call will raise the exception.

in this way:

       r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
            print("hello" + r + " - number: " + str(number))
        except exceptions.MessageUndeliverable as e:
            print("MessageUndeliverable error, RabbitMQ Exception: %s, routing_key: %s message: %s exchange: %s:" % (
                e.exception, e.routing_key, e.message.body, e.exchange))

Here you can find the file test.

the result would be:

id_value: 1 - test_value: ciao
hello1 - number: 1
id_value: 2 - test_value: ciao
hello2 - number: 1
id_value: 3 - test_value: ciao
hello3 - number: 1
MessageUndeliverable error, RabbitMQ Exception: Basic.return: (312) NO_ROUTE, routing_key: myroutingkey message: {"oslo.version": "2.0", "oslo.message.options": {"mandatory": true}, "oslo.message": "{\"method\": \"foo\", \"args\": {\"id_value\": \"4\", \"test_value\": \"ciao\"}, \"namespace\": \"test\", \"version\": \"2.0\", \"options\": {\"mandatory\": true}, \"_msg_id\": \"a697620caf0c445d90352646caa193bc\", \"_reply_q\": \"reply_4f239e94e1234785952448a79e60a38b\", \"_timeout\": null, \"_unique_id\": \"c27ea03c9b2b4870b3e34a19997cf143\"}"} exchange: myexchange:

how to test it

I execute the test file and then execute this script more than once.

it deletes the queues, to the message is not routed.

@Gsantomaggio
Copy link
Author

Gsantomaggio commented May 14, 2019

import oslo_messaging
from oslo_config import cfg
from oslo_messaging import exceptions
import _thread
import time


class TestEndpoint(object):
    target = oslo_messaging.Target(namespace='test', version='2.0')

    def __init__(self, server):
        self.server = server

    def foo(self, _ctx, id_value, test_value):
        print("id_value: " + str(id_value) + " - test_value: " + test_value)
        return id_value


def start_server():
    oslo_messaging.set_transport_defaults('myexchange')
    transport = oslo_messaging.get_transport(cfg.CONF)
    target = oslo_messaging.Target(topic='myroutingkey', server='myserver')
    endpoints = [TestEndpoint(None)]
    server = oslo_messaging.get_rpc_server(transport, target, endpoints,
                                           executor='blocking')
    server.start()
    server.wait()


def call(transport, target, number):
    client = oslo_messaging.RPCClient(transport, target, options={'mandatory': True})

    for i in range(1, 50):
        time.sleep(3)
        try:
            r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
            print("hello" + r + " - number: " + str(number))
        except exceptions.MessageUndeliverable as e:
            print("MessageUndeliverable error, RabbitMQ Exception: %s, routing_key: %s message: %s exchange: %s:" % (
                e.exception, e.routing_key, e.message.body, e.exchange))


def start_client():
    oslo_messaging.set_transport_defaults('myexchange')
    transport = oslo_messaging.get_transport(cfg.CONF)
    target = oslo_messaging.Target(topic='myroutingkey', version='2.0',
                                   namespace='test')
    _thread.start_new_thread(call, (transport, target, 1,))
    # _thread.start_new_thread(call, (transport, target, 2,))
    # _thread.start_new_thread(call, (transport, target, 3,))
    # _thread.start_new_thread(call, (transport, target, 4,))
    # _thread.start_new_thread(call, (transport, target, 5,))
    # _thread.start_new_thread(call, (transport, target, 6,))


# Create two threads as follows
try:
    _thread.start_new_thread(start_server, ())

    time.sleep(3)
    start_client()
except:
    print("Error: unable to start thread")

while 1:
    pass

@4383
Copy link

4383 commented May 27, 2019

I think your example is wrong (indentation level and a missing try statement):

       r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
            print("hello" + r + " - number: " + str(number))
        except exceptions.MessageUndeliverable as e:
            print("MessageUndeliverable error, RabbitMQ Exception: %s, routing_key: %s message: %s exchange: %s:" % (
                e.exception, e.routing_key, e.message.body, e.exchange))

Maybe you want to do something like this:

        try:
            r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
        except exceptions.MessageUndeliverable as e:
            print("MessageUndeliverable error, RabbitMQ Exception: %s, routing_key: %s message: %s exchange: %s:" % (
                e.exception, e.routing_key, e.message.body, e.exchange))
        else:
            print("hello" + r + " - number: " + str(number))

@4383
Copy link

4383 commented May 27, 2019

Really interesting changes!

We can by example use it with tenacity on the customer side to provide retries on calling failures or something like that.

from tenacity import *
import oslo_messaging
...
from oslo_messaging.exceptions import MessageUndeliverable
...

@retry(retry=retry_if_exception_type(MessageUndeliverable))
def call(transport, target, number):
    client = oslo_messaging.RPCClient(transport, target, options={'mandatory': True})

    for i in range(1, 50):
        time.sleep(3)
        r = client.call({}, 'foo', id_value=str(i), test_value="hello oslo")
        print("hello" + r + " - number: " + str(number))
...

@Gsantomaggio
Copy link
Author

thank you Hervé,
moved the example: https://github.com/Gsantomaggio/rabbitmq-utils/tree/master/openstack/mandatory_test
PR are welcome :))!!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment