Skip to content

Instantly share code, notes, and snippets.

@sboily
Forked from nballas-wazo/Examples
Last active August 12, 2019 18:04
Show Gist options
  • Save sboily/6a784942213fd1a18e35737fd1129ad4 to your computer and use it in GitHub Desktop.
Save sboily/6a784942213fd1a18e35737fd1129ad4 to your computer and use it in GitHub Desktop.
Documentation for Stasis + RabbitMQ Event Forwarding
{
"_copyright": "Copyright (C) 2019 The Wazo Authors (see the AUTHORS file)",
"_author": "Nicolaos Ballas",
"_svn_revision": "$Revision$",
"apiVersion": "2.0.0",
"swaggerVersion": "1.1",
"basePath": "http://localhost:8088/ari",
"resourcePath": "/api-docs/amqp.{format}",
"apis": [
{
"path": "/amqp/{applicationName}",
"description": "Stasis application",
"operations": [
{
"httpMethod": "POST",
"summary": "Create a stasis subscription to AMQP.",
"notes": "Create a Stasis application and subscribe to it's event and forward them to AMQP. The application's name must be unique.",
"nickname": "stasisSubscribe",
"responseClass": "Application",
"parameters": [
{
"name": "applicationName",
"description": "Application's name",
"paramType": "path",
"required": true,
"allowMultiple": false,
"dataType": "string"
}
],
"errorResponses": [
{
"code": 400,
"reason": "Bad request."
}
]
},
{
"httpMethod": "DELETE",
"summary": "Remove a stasis subscription to AMQP.",
"notes": "Remove an internal Stasis application and its associated subscription.",
"nickname": "stasisUnsubscribe",
"responseClass": "Application",
"parameters": [
{
"name": "applicationName",
"description": "Application's name",
"paramType": "path",
"required": true,
"allowMultiple": false,
"dataType": "string"
}
],
"errorResponses": [
{
"code": 400,
"reason": "Bad request."
}
]
}
]
}
],
"models": {
}
}
Dialplan:
-------------------------------------------------
For an application named 'bar'
exten = 6001,1,NoOp()
same = n,Answer()
same = n,Stasis(bar) ; this will generate events which will be forwarded to stasis (websocket or AMQP)
same = n,Hangup()
REST API:
-------------------------------------------------
To create a Stasis Application named 'bar'
1. POST with applicationName=bar # This will create an internal application that will send events to AMQP
To delete the application created above
2. DELETE with applicationName=bar # This will delete the application, events will no longer be sent to AMQP
*** Ask if the user prefers EID in stasis event or in its container***
=> Forwarding of Stasis events to a receiver, RabbitMQ
Let's do this right: the receiver can be anyone, not just RabbitMQ.
Asterisk receives
1. Client 2. Asterisk 3. Receiver receives events
sends request creates Application
application and registers it to Stasis
4. Client receives
response
+--- stasis application
|
v | |
[client]-------------->| topic --> (*)--> callback---+ |
| | ^ | |
| | | +-->| ==== event ==> [RabbitMQ Exchange(s)]
| v | |
<--------------| event >>> Stasis -+ |
To listening events, listen on routing key stasis.app.# for an application.
Works with a latest wazo version or Asterisk 16.5.0
AMQP client for Asterisk
Clone the repository:
cd /usr/src/
git clone https://github.com/wazo-pbx/wazo-res-amqp
make
make install
Forward Stasis events to AMQP
cd /usr/src/
git clone https://github.com/wazo-pbx/wazo-res-stasis-amqp
git checkout WAZO-939-Stasis-Event-Forwarding
make
make install
Restart Asterisk
1. Support for multiple RabbitMQ Exchanges
- The res_stasis_amqp module has a global configuration set when the module is loaded.
Changing the global configuration at runtime does not appear to work; at least not with a simple implementation.
After some discussion it will be probably not an issue, but i would like to don't losse this informations. Looks like works same for websocket.
2. If you register an application with the websocket, it's possible to disabled it by the amqp endpoint ARI.
3. If we restart Asterisk we loose the application
4. If you registering an application on the websocket with the same name of an application already registered with the AMQP events, the callback is on websocket.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2015-2018 The Wazo Authors (see the AUTHORS file)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>
import argparse
import kombu
import logging
from kombu.mixins import ConsumerMixin
from pprint import pformat
EXCHANGE = kombu.Exchange('xivo', type='topic')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(process)d] (%(levelname)s) (%(name)s): %(message)s')
logger = logging.getLogger(__name__)
class C(ConsumerMixin):
def __init__(self, connection, routing_key):
self.connection = connection
self.routing_key = routing_key
def get_consumers(self, Consumer, channel):
return [Consumer(kombu.Queue(exchange=EXCHANGE, routing_key=self.routing_key, exclusive=True),
callbacks=[self.on_message])]
def on_message(self, body, message):
logger.info('Received: %s', pformat(body))
message.ack()
def main():
parser = argparse.ArgumentParser('read RabbitMQ xivo exchange')
parser.add_argument('-n', '--hostname', help='RabbitMQ server',
default='localhost')
parser.add_argument('-p', '--port', help='Port of RabbitMQ',
default='5672')
parser.add_argument('-r', '--routing-key', help='Routing key to bind on bus',
dest='routing_key', default='#')
args = parser.parse_args()
url_amqp = 'amqp://guest:guest@%s:%s//' % (args.hostname, args.port)
with kombu.Connection(url_amqp) as conn:
try:
C(conn, args.routing_key).run()
except KeyboardInterrupt:
return
main()
import ari as ari_client
from ari.exceptions import ARINotFound
import logging
import os
import pytest
from hamcrest import *
from hamcrest import assert_that
from hamcrest import calling
from hamcrest import has_property
from requests.exceptions import HTTPError
from xivo_test_helpers import until
from xivo_test_helpers.bus import BusClient
from xivo_test_helpers.asset_launching_test_case import AssetLaunchingTestCase
from xivo_test_helpers.hamcrest.raises import raises
log_level = logging.DEBUG if os.environ.get('TEST_LOGS') == 'verbose' else logging.INFO
logging.basicConfig(level=log_level)
class AssetLauncher(AssetLaunchingTestCase):
assets_root = os.path.join(os.path.dirname(__file__), '..', 'assets')
asset = 'amqp'
service = 'asterisk'
@pytest.fixture()
def ari():
AssetLauncher.kill_containers()
AssetLauncher.rm_containers()
AssetLauncher.launch_service_with_asset()
ari_url = 'http://localhost:{port}'.format(port=AssetLauncher.service_port(5039, 'ari_amqp'))
ari = until.return_(ari_client.connect, ari_url, 'wazo', 'wazo', timeout=5, interval=0.1)
yield ari
AssetLauncher.kill_containers()
AssetLauncher.docker_exec(["asterisk", "-rx", "module load res_stasis_amqp.so"], service_name='ari_amqp')
def test_stasis_amqp_events(ari):
bus_client = BusClient.from_connection_fields(port=AssetLauncher.service_port(5672, 'rabbitmq'))
# AssetLauncher.docker_exec(["asterisk", "-rx", "module load res_stasis_amqp.so"], service_name='ari_amqp')
events = bus_client.accumulator("stasis.app.amqp_gateway")
ari.bridges.create()
def event_received(events):
assert_that(events.accumulate(), has_item(
has_entry('data',
has_entry('type', 'BridgeCreated')
)
))
until.assert_(event_received, events, timeout=5)
subscribe_args = {'applicationName': 'NewStasisApplication'}
def test_app_subscribe(ari):
assert_that(
calling(ari.amqp.stasisSubscribe).with_args(**subscribe_args),
not_(raises(Exception))
)
def test_app_unsubscribe(ari):
ari.amqp.stasisSubscribe(**subscribe_args)
assert_that(
calling(ari.amqp.stasisUnsubscribe).with_args(**subscribe_args),
not_(raises(Exception))
)
def test_app_subscribe_duplicate_fail(ari):
ari.amqp.stasisSubscribe(**subscribe_args)
assert_that(
calling(ari.amqp.stasisSubscribe).with_args(**subscribe_args),
raises(ARINotFound).matching(has_property('original_error',
has_property('response', has_property('status_code', 409)))
)
)
def test_app_unsubscribe_fail(ari):
assert_that(
calling(ari.amqp.stasisUnsubscribe).with_args(**subscribe_args),
raises(ARINotFound).matching(has_property('original_error',
has_property('response', has_property('status_code', 404)))
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment