Skip to content

Instantly share code, notes, and snippets.

@nballas-wazo
Last active August 12, 2019 17:10
Show Gist options
  • Save nballas-wazo/df4017c164a01eb9043815254996a36d to your computer and use it in GitHub Desktop.
Save nballas-wazo/df4017c164a01eb9043815254996a36d to your computer and use it in GitHub Desktop.
Documentation for Stasis + RabbitMQ Event Forwarding
{
"_copyright": "Copyright (C) 2019 The Wazo Authors (see the AUTHORS file)",
"_author": "Nicolaos Ballas",
"_svn_revision": "$Revision$",
"apiVersion": "2.0.0",
"swaggerVersion": "1.1",
"basePath": "http://localhost:8088/ari",
"resourcePath": "/api-docs/amqp.{format}",
"apis": [
{
"path": "/amqp/{applicationName}",
"description": "Stasis application",
"operations": [
{
"httpMethod": "POST",
"summary": "Create a stasis subscription to AMQP.",
"notes": "Create a Stasis application and subscribe to it's event and forward them to AMQP. The application's name must be unique.",
"nickname": "stasisSubscribe",
"responseClass": "Application",
"parameters": [
{
"name": "applicationName",
"description": "Application's name",
"paramType": "path",
"required": true,
"allowMultiple": false,
"dataType": "string"
}
],
"errorResponses": [
{
"code": 400,
"reason": "Bad request."
}
]
},
{
"httpMethod": "DELETE",
"summary": "Remove a stasis subscription to AMQP.",
"notes": "Remove an internal Stasis application and its associated subscription.",
"nickname": "stasisUnsubscribe",
"responseClass": "Application",
"parameters": [
{
"name": "applicationName",
"description": "Application's name",
"paramType": "path",
"required": true,
"allowMultiple": false,
"dataType": "string"
}
],
"errorResponses": [
{
"code": 400,
"reason": "Bad request."
}
]
}
]
}
],
"models": {
}
}
Dialplan:
-------------------------------------------------
For an application named 'foo'
exten = 6001,1,NoOp
same = n,Voicemail(6001,default,u)
same = n,Answer
same = n,Stasis(foo) ; this will generate events which will be forwarded to AMQP
same = n,Hangup
same = n,return
REST API:
-------------------------------------------------
To create a Stasis Application named 'bar'
1. POST with applicationName=bar # This will create an internal application that will send events to AMQP
To delete the application created above
2. DELETE wtith applicationName=bar # This will delete the application, events will no longer be sent to AMQP
*** Ask if the user prefers EID in stasis event or in its container***
=> Forwarding of Stasis events to a receiver, RabbitMQ
Let's do this right: the receiver can be anyone, not just RabbitMQ.
Asterisk receives
1. Client 2. Asterisk 3. Receiver receives events
sends request creates Application
application and registers it to Stasis
4. Client receives
response
+--- stasis application
|
v | |
[client]-------------->| topic --> (*)--> callback---+ |
| | ^ | |
| | | +-->| ==== event ==> [RabbitMQ Exchange(s)]
| v | |
<--------------| event >>> Stasis -+ |
1. Support for multiple RabbitMQ Exchanges
- The res_stais_amqp module has a global configuartion set when the module is loaded.
Changing the global configuration at runtime does not appear to work; at least not with a simple implementation.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2015-2018 The Wazo Authors (see the AUTHORS file)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>
import argparse
import kombu
import logging
from kombu.mixins import ConsumerMixin
from pprint import pformat
EXCHANGE = kombu.Exchange('xivo', type='topic')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(process)d] (%(levelname)s) (%(name)s): %(message)s')
logger = logging.getLogger(__name__)
class C(ConsumerMixin):
def __init__(self, connection, routing_key):
self.connection = connection
self.routing_key = routing_key
def get_consumers(self, Consumer, channel):
return [Consumer(kombu.Queue(exchange=EXCHANGE, routing_key=self.routing_key, exclusive=True),
callbacks=[self.on_message])]
def on_message(self, body, message):
logger.info('Received: %s', pformat(body))
message.ack()
def main():
parser = argparse.ArgumentParser('read RabbitMQ xivo exchange')
parser.add_argument('-n', '--hostname', help='RabbitMQ server',
default='localhost')
parser.add_argument('-p', '--port', help='Port of RabbitMQ',
default='5672')
parser.add_argument('-r', '--routing-key', help='Routing key to bind on bus',
dest='routing_key', default='#')
args = parser.parse_args()
url_amqp = 'amqp://guest:guest@%s:%s//' % (args.hostname, args.port)
with kombu.Connection(url_amqp) as conn:
try:
C(conn, args.routing_key).run()
except KeyboardInterrupt:
return
main()
import ari as ari_client
from ari.exceptions import ARINotFound
import logging
import os
import pytest
from hamcrest import *
from hamcrest import assert_that
from hamcrest import calling
from hamcrest import has_property
from requests.exceptions import HTTPError
from xivo_test_helpers import until
from xivo_test_helpers.bus import BusClient
from xivo_test_helpers.asset_launching_test_case import AssetLaunchingTestCase
from xivo_test_helpers.hamcrest.raises import raises
log_level = logging.DEBUG if os.environ.get('TEST_LOGS') == 'verbose' else logging.INFO
logging.basicConfig(level=log_level)
class AssetLauncher(AssetLaunchingTestCase):
assets_root = os.path.join(os.path.dirname(__file__), '..', 'assets')
asset = 'amqp'
service = 'asterisk'
@pytest.fixture()
def ari():
AssetLauncher.kill_containers()
AssetLauncher.rm_containers()
AssetLauncher.launch_service_with_asset()
ari_url = 'http://localhost:{port}'.format(port=AssetLauncher.service_port(5039, 'ari_amqp'))
ari = until.return_(ari_client.connect, ari_url, 'wazo', 'wazo', timeout=5, interval=0.1)
yield ari
AssetLauncher.kill_containers()
AssetLauncher.docker_exec(["asterisk", "-rx", "module load res_stasis_amqp.so"], service_name='ari_amqp')
def test_stasis_amqp_events(ari):
bus_client = BusClient.from_connection_fields(port=AssetLauncher.service_port(5672, 'rabbitmq'))
# AssetLauncher.docker_exec(["asterisk", "-rx", "module load res_stasis_amqp.so"], service_name='ari_amqp')
events = bus_client.accumulator("stasis.app.amqp_gateway")
ari.bridges.create()
def event_received(events):
assert_that(events.accumulate(), has_item(
has_entry('data',
has_entry('type', 'BridgeCreated')
)
))
until.assert_(event_received, events, timeout=5)
subscribe_args = {'applicationName': 'NewStasisApplication'}
def test_app_subscribe(ari):
assert_that(
calling(ari.amqp.stasisSubscribe).with_args(**subscribe_args),
not_(raises(Exception))
)
def test_app_unsubscribe(ari):
ari.amqp.stasisSubscribe(**subscribe_args)
assert_that(
calling(ari.amqp.stasisUnsubscribe).with_args(**subscribe_args),
not_(raises(Exception))
)
def test_app_subscribe_duplicate_fail(ari):
ari.amqp.stasisSubscribe(**subscribe_args)
assert_that(
calling(ari.amqp.stasisSubscribe).with_args(**subscribe_args),
raises(ARINotFound).matching(has_property('original_error',
has_property('response', has_property('status_code', 409)))
)
)
def test_app_unsubscribe_fail(ari):
assert_that(
calling(ari.amqp.stasisUnsubscribe).with_args(**subscribe_args),
raises(ARINotFound).matching(has_property('original_error',
has_property('response', has_property('status_code', 404)))
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment