Skip to content

Instantly share code, notes, and snippets.

@timmwagener
Created February 20, 2022 22:05
Show Gist options
  • Save timmwagener/0aa27404112e8faba98a6f719e0fce7b to your computer and use it in GitHub Desktop.
Save timmwagener/0aa27404112e8faba98a6f719e0fce7b to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.component import Component
logger = getLogger(__name__)
class Addition(ApplicationSession):
"""Simple component that registers a function as RPC callable
that adds 2 values.
"""
async def onJoin(self, details):
# TODO: Pydantic models for extra arguments
config = self.config
extra = config.extra
uri = extra["uri"]
def add(x: int, y: int) -> int:
logger.debug(f"Callee calling add({x}, {y})")
return x + y
await self.register(add, uri)
def create_component(uri: str, *args, **kwargs) -> Component:
"""Factory function to create component instances."""
component = Component(*args, **kwargs)
@component.register(uri)
async def add(x: int, y: int) -> int:
logger.debug(f"Callee calling add({x}, {y})")
return x + y
return component
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import AbstractEventLoop
from autobahn.wamp.types import SessionDetails
from autobahn.asyncio.wamp import Session
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.component import Component
logger = getLogger(__name__)
async def main(loop: AbstractEventLoop, session: Session) -> int:
"""Components also offer the ability to run a main coroutine after a
connection has been established. This coroutine runs until it is finished at
which point the session is left automatically.
Basically a neat shortcut for running some code after connecting, which
internally creates an onJoin() function on the fly and connects it.
"""
# TODO: Pydantic models for extra arguments
config = session.config
extra = config.extra
uri = extra["uri"]
queue = extra["queue"]
x = extra["x"]
y = extra["y"]
logger.debug(f"Caller calling add({x}, {y})")
try:
result = await session.call(uri, x, y)
except Exception as exception_instance:
logger.error(f"Error: {exception_instance}")
raise
else:
logger.debug(f"Result of add: {result}")
await queue.put(result)
return result
class Addition(ApplicationSession):
"""Consumer of the RPC add service."""
async def onJoin(self, details: SessionDetails) -> None:
# TODO: Pydantic models for extra arguments
config = self.config
extra = config.extra
uri = extra["uri"]
queue = extra["queue"]
x = extra["x"]
y = extra["y"]
logger.debug(f"Caller calling add({x}, {y})")
try:
result = await self.call(uri, x, y)
except Exception as exception_instance:
logger.error(f"Error: {exception_instance}")
raise
else:
logger.debug(f"Result of add: {result}")
await queue.put(result)
await self.leave()
def create_component(*args, **kwargs) -> Component:
"""Factory function to create component instances."""
component = Component(*args, **kwargs)
@component.on_join
async def join(session: Session, details: SessionDetails) -> None:
# TODO: Pydantic models for extra arguments
config = session.config
extra = config.extra
uri = extra["uri"]
queue = extra["queue"]
x = extra["x"]
y = extra["y"]
logger.debug(f"Caller calling add({x}, {y})")
try:
result = await session.call(uri, x, y)
except Exception as exception_instance:
logger.error(f"Error: {exception_instance}")
raise
else:
logger.debug(f"Result of add: {result}")
await queue.put(result)
# TODO: Should we await session.leave()? Some docs do it, but the code
# seems not awaitable. However there's also no error upon awaiting.
# SeeAlso: https://autobahn.readthedocs.io/en/latest/wamp/programming.html#longer-example
await session.leave()
return component
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from asyncio import Event
from typing import Tuple
from typing import List
from typing import Tuple
from autobahn.asyncio.wamp import ApplicationSession
logger = getLogger(__name__)
class GetCalleesForURI(ApplicationSession):
"""Take a uri from extras and retrieve its callees if any. Return the result
via given queue.
SeeAlso:
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/#retrieving-information-about-a-specific-registration
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/
"""
def get_data(self) -> Tuple[str, Queue]:
config = self.config
extra = config.extra
uri = extra["uri"]
queue = extra["queue"]
return uri, queue
async def callees_for_uri(self, uri: str) -> List[int]:
registration_id = await self.call("wamp.registration.lookup", uri)
logger.debug(f"{registration_id=}")
list_callees = await self.call(
"wamp.registration.list_callees", registration_id
)
logger.debug(f"{list_callees=}")
return list_callees
async def onJoin(self, details):
uri, queue = self.get_data()
list_callees = await self.callees_for_uri(uri)
self.leave()
await queue.put(list_callees)
class KillCalleesForURI(GetCalleesForURI):
"""Extend retrieval of callees by killing them. This can be useful for example in
test teardown.
SeeAlso:
https://crossbar.io/docs/Session-Metaevents-and-Procedures/#killing-a-session
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/
"""
async def onJoin(self, details):
uri, queue = self.get_data()
list_callees = await self.callees_for_uri(uri)
for callee_id in list_callees:
logger.debug(f"Kill session: {callee_id=}")
session_kill = await self.call("wamp.session.kill", callee_id)
self.leave()
await queue.put(list_callees)
class KillSessions(ApplicationSession):
"""Kill session for given session ids.
SeeAlso:
https://crossbar.io/docs/Session-Metaevents-and-Procedures/#killing-a-session
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/
"""
async def onJoin(self, details):
config = self.config
extra = config.extra
session_ids: List[int] = extra["session_ids"]
event: Event = extra["event"]
for session_id in session_ids:
logger.warning(f"Kill session: {session_id=}")
session_kill = await self.call("wamp.session.kill", session_id)
logger.warning(f"Session killed: {session_kill=}")
self.leave()
event.set()
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from json import dumps
from asyncio import create_subprocess_exec
from asyncio.subprocess import PIPE
from pathlib import Path
from urllib.parse import urlparse
from typing import Tuple
import pytest
from asyncio_playground.ssl.ssl_certificates import create_ssl_certificates
from .utilities import IsResponsiveHTTP
from .utilities import IsResponsiveWebsocket
from .utilities import wait_until_responsive
logger = getLogger(__name__)
@pytest.fixture
async def crossbar_directory(tmp_path: Path) -> Path:
# create directory
crossbar_directory = tmp_path / "crossbar"
crossbar_directory.mkdir(parents=True, exist_ok=False)
yield crossbar_directory
# in debug log mode, log logfile content
# logger.debug([x for x in crossbar_directory.iterdir() if x.is_file()])
logfile = crossbar_directory / "node.log"
try:
log = logfile.read_text()
except FileNotFoundError as exception_instance:
msg = f"Could not log logfile content. Logfile not found. ({logfile})"
logger.debug(msg)
else:
logger.debug(log)
@pytest.fixture
def ssl_certificates(crossbar_directory: Path) -> Tuple[Path, Path]:
"""Expose self signed certificates via fixture."""
return create_ssl_certificates(crossbar_directory)
@pytest.fixture
async def crossbar_router(crossbar_configuration, crossbar_directory):
"""Provide a crossbar router for the duration of the test
and tear it down afterwards.
"""
configuration, url, realm = crossbar_configuration
crossbar_directory.mkdir(parents=True, exist_ok=True)
crossbar_configuration_serialized = dumps(configuration, indent=4)
crossbar_configuration_file = crossbar_directory / "configuration.json"
crossbar_configuration_file.write_text(crossbar_configuration_serialized)
# crossbar router process
cmd = [
"crossbar",
"start",
"--cbdir",
str(crossbar_directory),
"--config",
str(crossbar_configuration_file),
"--logdir",
str(crossbar_directory),
"--logtofile",
]
process = await create_subprocess_exec(
*cmd,
stdout=PIPE,
stderr=PIPE,
)
yield url, realm
# shutdown process
process.terminate()
stdout, stderr = await process.communicate()
@pytest.fixture
async def crossbar_router_responsive(crossbar_router):
"""Ensure that router is up and responsive. Uses basic ready check
by default but can be overridden for specific checks. (For example
against websocket endpoints).
"""
url, realm = crossbar_router
url_parsed = urlparse(url)
scheme = url_parsed.scheme
if scheme == "http":
coro_factory = IsResponsiveHTTP("get", url)
elif scheme == "ws":
coro_factory = IsResponsiveWebsocket(url, realm)
else:
msg = f"No responsiveness check implemented for scheme {scheme}"
raise NotImplementedError(msg)
await wait_until_responsive(coro_factory=coro_factory, timeout=20.0, pause=0.25)
yield url, realm
# -*- coding: utf-8 -*-
from __future__ import annotations
import logging
from pathlib import Path
import pytest
logger = logging.getLogger(__name__)
# session and module scoped fixtures can be accessed by
# function scoped fixtures, but not the other way round
@pytest.fixture(scope="session")
def repository_directory():
return Path(__file__).parent.parent.parent
@pytest.fixture(scope="session")
def test_directory(repository_directory):
return repository_directory / "tests" / "systemtests"
@pytest.fixture(scope="session")
def test_data_directory(test_directory):
"""Override in sub-testpackages for more local test data.."""
return test_directory / "test_data"
# autouse fixtures for the whole test suite.
# They are supposed to be overwritten on subpackage level for customization.
# At this level, the main goal is test isolation.
@pytest.fixture(scope="session", autouse=True)
def reset_state_for_session():
pass
@pytest.fixture(scope="module", autouse=True)
def reset_state_for_module():
pass
@pytest.fixture(autouse=True)
def reset_state_for_function():
pass
<mxfile host="Electron" modified="2022-02-13T14:46:01.288Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/16.5.1 Chrome/96.0.4664.110 Electron/16.0.7 Safari/537.36" etag="0neny86yiuhX1PNNk_1-" version="16.5.1" type="device"><diagram id="_6lKf6-y1SfZ0dx7U7l8" name="fixtures">7VpRb+I4EP41PF6VxE6Ax0Lb25W62qrV6XT7gtzEgLUmzjlmgf31N04cQmJyBZZAu4rEA57YE+f7vhlPYvfQeLH+U5Jk/kVElPc8J1r30F3P81zseT39c6JNbhkiJzfMJItMp9Lwwn5SYyy6LVlE00pHJQRXLKkaQxHHNFQVG5FSrKrdpoJX75qQGbUMLyHhtvVvFql5bh34Tmn/RNlsXtzZdcyVBSk6G0M6J5FY7ZjQfQ+NpRAq/7dYjynX4BW45OMeGq5uJyZprA4ZIJ7pp+Hz568P9w/fvv6LX4aDf+gf7iB384PwpXniZbxMaTRRYTJJhFRm8mpTIDJXCw7/3B4ambFUKrpunJW7fVYQCRULquQGuhQDhgaeTa29KtHe2uY7SBcMEEPwbOu6xAD+GBiOgWRoQfJXSi0YaBzdan1BSyQ0BjTAYvTrwgOPIpLOaWSQ2gFNimUcZRecLYQ0slRYAxDuLZYypP8z8SLUFJEzqt4k3aZkF/LAwCspJ4r9qM5uH+bG3ZNgMO+SXlSl1x/UaMufyozaVW/NkTd8w1H+1JajTALbZzxdFUUUfzhVIHygKrzgYqpADq6QiYKTVRFUs0fdUduqcH97VXgXUwUOaiHunqgKv+YI1x21rQrPUkUoRZq+EjmBOmXKZksJQInYEkq6YgtOYmiNpiJWL+aKJj+cMx49ko1YarpSRcLvRWs0F5L9hP6k0A1clsoIDOIavDHOx4ILCYZYZDcoB71oZ+Y2kqYw7KmQlFszfSHrSsdHkqpigoJzkqTsNZuyHrgAmFk8EkqJRUXCv1QxeI5fTR54T8UQ7KkY3CLrnL1k8JBFeM8b5VXpDtvoVhfCLLRrKkBDZbRJ8Z3WeNpDHeFsBu7uOJ3qYRpOBiXrrTErkWhnCQlZPHvM+tzh0vJsINEmAWOnPMtMcxZFWW6SQhFFXrfKS3ScZJD5I/gBsmPnxu/5MPExtN2yDT/dXaqxiOFZCMsYpaCRFdU6OYz+o/KQEYEXHKaBot/5JeA3SGApeU48ANLxfm7efe/avOMG3iUlfNEx3xrz/cG1mQ+aV/mISRoqoe8ScA3DhlEeQWNWYtIt+0ct+7j2Anjwsl+kiPMLoN8Q+iX7Wfg/6S9BXfwfHf8N76PvZcVHdubfxj+8tykqu+A/V/AjXK35fXRg8BerxPnZt7N/V++dIerzqHq/UY+b6vyu3muX+atX+sjeKqnl+wnkxwRw0Z/Auiz/61k+6F89y9t7QV2WP0esN+xAvZss37S6d1m+XeavnuWLt8qPt6NjcuCbOzpN62wbOzquf+O5fTR0kYMRHlRr+FM3/RDCN37gbx1XFw3s177yt7zZg+0twFdYdEEOE5Kwybp7CWyrPBi41y4PsNeVB20sErhBCO+mPGj68teVB+0yf/3ywN7p/RjlQbFMvV0eXO5wGEYN73vHVgSWI9e7bA1gfxpQEAl68a8J47yHJ1G/uicyHOxZEd19eyJtHZ7EH/XwJD60fG7KUJeIj1MPT9rxUauITo4PaJYnmPPu5TlwdP8f</diagram></mxfile>
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Event
from typing import List
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.wamp.request import Subscription
logger = getLogger(__name__)
class WAMPMetaEvents(ApplicationSession):
"""Log WAMP Meta events
SeeAlso:
https://crossbar.io/docs/Session-Metaevents-and-Procedures/
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/
https://crossbar.io/docs/Subscription-Meta-Events-and-Procedures/
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor-sessions.py
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor.html
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle
"""
async def onJoin(self, details):
config = self.config
extra = config.extra
event: Event = extra["event"]
meta_topics = [
# Meta Events for Sessions
"wamp.session.on_join",
"wamp.session.on_leave",
# Meta Events for Subscriptions
"wamp.subscription.on_create",
"wamp.subscription.on_subscribe",
"wamp.subscription.on_unsubscribe",
"wamp.subscription.on_delete",
# Meta Events for Registrations
"wamp.registration.on_create",
"wamp.registration.on_register",
"wamp.registration.on_unregister",
"wamp.registration.on_delete",
# Meta Events for Schemas
"wamp.schema.on_define",
"wamp.schema.on_undefine",
]
async def on_event(*args, **kwargs):
logger.debug(f"on_event()")
logger.debug(f"{args=}")
logger.debug(f"{kwargs=}")
# subscriptions
subscriptions: List[Subscription] = [
await self.subscribe(on_event, topic) for topic in meta_topics
]
# subscribe to kill and also deregister from router
async def on_kill_event(*args, **kwargs):
logger.debug(f"on_kill_event({args}, {kwargs})")
unsubscriptions = [
await subscruption.unsubscribe() for subscruption in subscriptions
]
self.leave()
await self.subscribe(on_kill_event, "com.kill.all")
# notify to clients that all subscriptions have happened
# At this point the WAMP router has the on_event handler registered
# for all events.
event.set()
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from asyncio import Event
from asyncio import sleep
from typing import List
from typing import Optional
from typing import Tuple
from typing import Callable
from typing import Any
from typing import Awaitable
from typing import Coroutine
from typing import Iterable
from enum import Enum
from enum import unique
from autobahn.wamp.types import SessionDetails
from autobahn.wamp.types import CloseDetails
from autobahn.asyncio.websocket import WampWebSocketClientProtocol
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.component import Component
from autobahn.asyncio.wamp import Session
logger = getLogger(__name__)
AwaitableFunction = Callable[[Any, Any], Coroutine[Any, Any, Any]]
@unique
class LifecycleEvent(Enum):
on_connect = "on_connect"
on_challenge = "on_challenge"
on_join = "on_join"
on_ready = "on_ready"
on_leave = "on_leave"
on_disconnect = "on_disconnect"
class SessionLifecycleTrackerSubclassAPI(ApplicationSession):
"""Track the lifecycle events of a WAMP session using the older subclassing
based API.
SeeAlso:
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logger.warning(f"__init__")
self.lifecycle_events: List[LifecycleEvent] = []
self.event: Optional[Event] = None
self.queue: Optional[Queue] = None
# ready needs to registered manually
self.on("ready", self.onReady)
async def onConnect(self):
logger.warning(f"onConnect")
self.join(self.config.realm)
self.lifecycle_events.append(LifecycleEvent.on_connect)
async def onChallenge(self, challenge):
logger.warning(f"onChallenge")
self.lifecycle_events.append(LifecycleEvent.on_challenge)
async def onJoin(self, details: SessionDetails):
logger.warning(f"onJoin")
self.lifecycle_events.append(LifecycleEvent.on_join)
config = self.config
extra = config.extra
event = extra["event"]
queue = extra["queue"]
self.event = event
self.queue = queue
await self.leave()
async def onReady(self, details: SessionDetails):
logger.warning(f"onReady")
self.lifecycle_events.append(LifecycleEvent.on_ready)
async def onLeave(self, details: SessionDetails):
logger.warning(f"onLeave")
self.lifecycle_events.append(LifecycleEvent.on_leave)
self.disconnect()
async def onDisconnect(self):
logger.warning(f"onDisconnect")
self.lifecycle_events.append(LifecycleEvent.on_disconnect)
self.event.set()
for event_ in self.lifecycle_events:
await self.queue.put(event_)
class TypecheckingSession(Session):
"""Only for typechecking"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lifecycle_events: List[LifecycleEvent] = []
self.queue: Queue = Queue()
raise NotImplementedError("Only for typechecking")
def create_component(
additional_listener: Iterable[Tuple[str, AwaitableFunction]] = tuple(),
*args,
**kwargs,
) -> Component:
"""Factory function to create component instances."""
component = Component(*args, **kwargs)
@component.on_connect
async def onConnect(
session: TypecheckingSession, protocol: WampWebSocketClientProtocol
) -> None:
logger.warning(f"onConnect")
logger.warning(f"{session=}")
logger.warning(f"{protocol=}")
config = session.config
extra = config.extra
queue: Queue = extra["queue"]
# extend session
session.lifecycle_events = []
session.queue = queue
session.lifecycle_events.append(LifecycleEvent.on_connect)
@component.on_join
async def onJoin(session: TypecheckingSession, details: SessionDetails) -> None:
logger.warning(f"onJoin")
logger.warning(f"{session=}")
logger.warning(f"{details=}")
session.lifecycle_events.append(LifecycleEvent.on_join)
@component.on_ready
async def onReady(session: TypecheckingSession) -> None:
logger.warning(f"onReady")
logger.warning(f"{session=}")
session.lifecycle_events.append(LifecycleEvent.on_ready)
await session.leave()
@component.on_leave
async def onLeave(session: TypecheckingSession, details: CloseDetails) -> None:
logger.warning(f"onLeave")
logger.warning(f"{session=}")
logger.warning(f"{details=}")
session.lifecycle_events.append(LifecycleEvent.on_leave)
@component.on_disconnect
async def onDisconnect(session: TypecheckingSession, *args, **kwargs) -> None:
logger.warning(f"onDisconnect")
logger.warning(f"{session=}")
logger.warning(f"{args=}")
logger.warning(f"{kwargs=}")
session.lifecycle_events.append(LifecycleEvent.on_disconnect)
for event_ in session.lifecycle_events:
await session.queue.put(event_)
# register additional passed callbacks
for event, coroutine_function in additional_listener:
component.on(event, coroutine_function)
return component
# -*- coding: utf-8 -*-
"""Test invocation of crossbar cli.
SeeAlso:
https://crossbar.io/docs/
https://crossbar.io/docs/Command-Line/
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import create_subprocess_exec
from asyncio.subprocess import PIPE
import pytest
logger = getLogger(__name__)
async def test_crossbar_command_succeeds():
"""Smoketest check if the crossbar command is installed/works."""
cmd = ["crossbar"]
process = await create_subprocess_exec(
*cmd,
stdout=PIPE,
stderr=PIPE,
)
stdout, stderr = await process.communicate()
assert "Crossbar" in stdout.decode()
assert stderr.decode() == ""
# -*- coding: utf-8 -*-
"""Test a most minimal, simple router setup with crossbar.io
SeeAlso:
https://crossbar.io/docs/
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from typing import Tuple
from typing import Any
from typing import Dict
import pytest
from aiohttp import ClientSession
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"http://localhost:{port}/info"
realm = f"realm1"
configuration = {
"$schema": "https://raw.githubusercontent.com/crossbario/crossbar/master/crossbar.json",
"version": 2,
"controller": {},
"workers": [
{
"type": "router",
"realms": [
{
"name": "realm1",
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
}
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port, "backlog": 1024},
"paths": {"info": {"type": "nodeinfo"}},
}
],
}
],
}
return configuration, url, realm
async def test_minimal_crossbar_io(crossbar_router_responsive):
"""Create a crossbar router and do the most minimal HTTP io."""
url, realm = crossbar_router_responsive
# get response from crossbar server
async with ClientSession() as session:
async with session.get(url) as response:
data = await response.text()
# context manager variable assignments happen in main function
# scope and aren't bound to the context manager block
assert response.status == 200
assert f"Crossbar" in data
# -*- coding: utf-8 -*-
"""Test a minimal PubSub setup with crossbar.io"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from asyncio import Event
from asyncio import wait_for
from typing import Tuple
from typing import Any
from typing import Dict
from typing import cast
import pytest
from autobahn.asyncio.wamp import ApplicationRunner
from autobahn.wamp.websocket import WampWebSocketProtocol
from asyncio_playground.wamp.publisher.time_service import TimeService
from asyncio_playground.wamp.publisher.simple_publisher import SimplePublisher
from asyncio_playground.wamp.subscriber.recording_subscriber import RecordingSubscriber
from asyncio_playground.wamp.meta.meta_events import WAMPMetaEvents
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"ws://localhost:{port}/ws"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": "systemtests",
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port},
"paths": {
"info": {"type": "nodeinfo"},
"ws": {"type": "websocket"},
},
}
],
},
],
}
return configuration, url, realm
@pytest.fixture
async def api_timeservice(crossbar_router_responsive):
# data
url, realm = crossbar_router_responsive
uri = "com.example.time"
queue = Queue()
event = Event()
# meta_events
runner_meta_events = ApplicationRunner(url, realm, extra={"event": event})
coro_meta_events = runner_meta_events.run(WAMPMetaEvents, start_loop=False)
transport_meta_events, protocol_meta_events = await coro_meta_events
await wait_for(event.wait(), 10)
event.clear()
# backend
runner_backend = ApplicationRunner(url, realm, extra={"topic": uri, "event": event})
coro_backend = runner_backend.run(TimeService, start_loop=False)
transport_backend, protocol_backend = await coro_backend
protocol_backend = cast(WampWebSocketProtocol, protocol_backend)
yield url, realm
# TODO: Could this be more elegant? Should this be more simple?
# I guess we could just call protocol_backend.close() and handle
# the TransportLost exception in the Session. However, use cleaner
# deregistration here for now.
# session_id
runner_kill_backend = ApplicationRunner(url, realm, extra={"topic": "com.kill.all"})
coro_kill_backend = runner_kill_backend.run(SimplePublisher, start_loop=False)
transport_kill_backend, protocol_kill_backend = await coro_kill_backend
await event.wait()
async def test_minimal_pubsub(api_timeservice):
"""Register a TimeService that acts as a publisher and make sure a registered
subscriber is notified.
"""
# data
url, realm = api_timeservice
uri = "com.example.time"
queue = Queue()
event = Event()
limit = 3
# client
runner_client = ApplicationRunner(
url, realm, extra={"queue": queue, "topic": uri, "limit": limit, "event": event}
)
coro_client = runner_client.run(RecordingSubscriber, start_loop=False)
transport_client, protocol_client = await coro_client
# wait till client has recorded a number of events, then assert
await event.wait()
timestamps = [await queue.get() for _ in range(queue.qsize())]
assert len(timestamps) == limit
# -*- coding: utf-8 -*-
"""Test a minimal RPC caller/callee setup with crossbar.io
Components:
Router Components:
Run in the same process as the router. They should be lightweight, for example like
authorization.
Container Components:
Run in their own process. They can run Python applications natively. They do their
own authentication and authorization and can scale independently.
Crossbar containers:
* Different from docker containers
* Describes a Crossbar worker configuration defined in config.json
Hosted Container Workers are Python ApplicationSession objects written using twisted.
Hosted Container Components cannot be used with asyncio!
Workers:
Native Workers (runnning native Python Code):
Router
Container
Guest Workers:
Arbitrary processes connecting to router via Autobahn|Python, Autobahn|JS,
Autobahn|CPP etc. They can connect with ApplicationsSession objects using asyncio.
Session meta events:
https://crossbar.io/docs/Session-Metaevents-and-Procedures/
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from typing import Tuple
from typing import Any
from typing import Dict
from random import randint
import pytest
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend
from asyncio_playground.wamp.caller.addition import Addition as AdditionClient
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI
from asyncio_playground.wamp.meta.waiter import SessionKiller
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"ws://localhost:{port}/ws"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": realm,
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port},
"paths": {
"info": {"type": "nodeinfo"},
"ws": {"type": "websocket"},
},
}
],
},
# Only twisted components can be hosted containers and started by
# crossbar.io configuration
# {
# "type": "container",
# # "options": {"pythonpath": [".."]},
# "components": [
# {
# "type": "class",
# "classname": f"{Component.__module__}.{Component.__qualname__}",
# "realm": "systemtests",
# "transport": {
# "type": "websocket",
# "endpoint": {
# "type": "tcp",
# "host": "127.0.0.1",
# "port": port,
# },
# "url": f"ws://127.0.0.1:{port}/ws",
# },
# }
# ],
# },
],
}
return configuration, url, realm
@pytest.fixture
async def api_add(crossbar_router_responsive):
"""Take a crossbar router and register an API with it before passing
router data along to test. This is a proper way to register an API for tests.
"""
# data
url, realm = crossbar_router_responsive
uri = "com.example.add"
# backend
filterfunc = FilterRegistrationCreateByURI(uri)
waiter = BackendReadyWaiter(
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc
)
async with waiter:
runner_backend = ApplicationRunner(url, realm, extra={"uri": uri})
coro_backend = runner_backend.run(AdditionBackend, start_loop=False)
transport_backend, protocol_backend = await coro_backend
session_id, event = waiter.awaited_event
# # kill the registration of backend to avoid error on crossbar router
# # process termination like this:
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost
# # [WAMP transport was lost without closing the session 456128172270144 before]
# # These errors seem more like noise then an actual problem, however, lets
# # avoid them if we can.
async with SessionKiller(url, realm, [session_id]):
yield url, realm
async def test_minimal_rpc(crossbar_router_responsive):
"""Create a crossbar router, that exposes some RPC API and assert
that a client can call it via websockets.
"""
# data
url, realm = crossbar_router_responsive
queue = Queue()
uri = "com.example.add"
x, y = randint(0, 100), randint(0, 100)
expected = x + y
# backend
runner_backend = ApplicationRunner(url, realm, extra={"uri": uri})
coro_backend = runner_backend.run(AdditionBackend, start_loop=False)
transport_backend, protocol_backend = await coro_backend
# client
runner_client = ApplicationRunner(
url, realm, extra={"queue": queue, "x": x, "y": y, "uri": uri}
)
coro_client = runner_client.run(AdditionClient, start_loop=False)
transport_client, protocol_client = await coro_client
# wait till client has produced result and assert
result = await queue.get()
assert result == expected
async def test_minimal_rpc_api_fixture(api_add):
"""Test the minimal RPC registering the API in the fixture already. This should
allow to dry some setup code, making it more bullet proof by adding waiters for
proper registration and also making the test code more focused.
"""
# data
url, realm = api_add
uri = "com.example.add"
queue = Queue()
# run 3 calls against API
for _ in range(3):
x, y = randint(0, 100), randint(0, 100)
expected = x + y
# client
runner_client = ApplicationRunner(
url, realm, extra={"queue": queue, "x": x, "y": y, "uri": uri}
)
coro_client = runner_client.run(AdditionClient, start_loop=False)
transport_client, protocol_client = await coro_client
# wait till client has produced result and assert
result = await queue.get()
assert result == expected
# -*- coding: utf-8 -*-
"""Components also have a decorator based API. These tests are using it.
SeeAlso:
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#longer-example
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from typing import Tuple
from typing import Any
from typing import Dict
from random import randint
import pytest
from autobahn.asyncio.component import Component
from asyncio_playground.wamp.callee.addition import (
create_component as create_addition_backend,
)
from asyncio_playground.wamp.caller.addition import (
create_component as create_addition_client,
)
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI
from asyncio_playground.wamp.meta.waiter import SessionKiller
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"ws://localhost:{port}/ws"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": realm,
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port},
"paths": {
"info": {"type": "nodeinfo"},
"ws": {"type": "websocket"},
},
}
],
},
],
}
return configuration, url, realm
@pytest.fixture
async def api_add(crossbar_router_responsive):
"""Take a crossbar router and register an API with it before passing
router data along to test. This is a proper way to register an API for tests.
"""
# data
url, realm = crossbar_router_responsive
uri = f"com.example.add"
# backend
filterfunc = FilterRegistrationCreateByURI(uri)
waiter = BackendReadyWaiter(
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc
)
component = create_addition_backend(
uri,
transports=[
{
"url": url,
},
],
realm=realm,
)
async with waiter:
future_ = component.start()
session_id, event = waiter.awaited_event
# # kill the registration of backend to avoid error on crossbar router
# # process termination like this:
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost
# # [WAMP transport was lost without closing the session 456128172270144 before]
# # These errors seem more like noise then an actual problem, however, lets
# # avoid them if we can.
async with SessionKiller(url, realm, [session_id]):
yield url, realm
async def test_minimal_rpc_api_component_decorators(api_add):
"""Run the addition hello world example using the decorator based Component API."""
# data
url, realm = api_add
uri = "com.example.add"
queue = Queue()
# run 3 calls against API
for _ in range(3):
x, y = randint(0, 100), randint(0, 100)
expected = x + y
component = create_addition_client(
transports=[
{
"url": url,
},
],
realm=realm,
extra={"queue": queue, "x": x, "y": y, "uri": uri},
)
# run the component and wait till the passed main coroutine has finished.
result = await component.start()
assert result is None
# wait till client has produced result and assert
result = await queue.get()
assert result == expected
# -*- coding: utf-8 -*-
"""Components can also execute a "main" coroutine after transport has connected.
This functionality is tested here.
SeeAlso:
https://autobahn.readthedocs.io/en/latest/wamp/programming.html
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from typing import Tuple
from typing import Any
from typing import Dict
from random import randint
import pytest
from autobahn.asyncio.component import Component
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend
from asyncio_playground.wamp.caller.addition import main as addition_client_main
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI
from asyncio_playground.wamp.meta.waiter import SessionKiller
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"ws://localhost:{port}/ws"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": realm,
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port},
"paths": {
"info": {"type": "nodeinfo"},
"ws": {"type": "websocket"},
},
}
],
},
],
}
return configuration, url, realm
@pytest.fixture
async def api_add(crossbar_router_responsive):
"""Take a crossbar router and register an API with it before passing
router data along to test. This is a proper way to register an API for tests.
"""
# data
url, realm = crossbar_router_responsive
uri = "com.example.add"
# backend
filterfunc = FilterRegistrationCreateByURI(uri)
waiter = BackendReadyWaiter(
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc
)
component = Component(
transports=[
{
"url": url,
},
],
realm=realm,
session_factory=AdditionBackend,
extra={"uri": uri},
)
async with waiter:
future_ = component.start()
session_id, event = waiter.awaited_event
# # kill the registration of backend to avoid error on crossbar router
# # process termination like this:
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost
# # [WAMP transport was lost without closing the session 456128172270144 before]
# # These errors seem more like noise then an actual problem, however, lets
# # avoid them if we can.
async with SessionKiller(url, realm, [session_id]):
yield url, realm
async def test_minimal_rpc_api_component_main(api_add):
"""Components can also accept a "main" keyword argument, which is a coroutine
that runs automatically to completion after a transport has been established.
In this scenario, the "on_*" handlers are usually not needed.
"""
# data
url, realm = api_add
uri = "com.example.add"
queue = Queue()
# run 3 calls against API
for _ in range(3):
x, y = randint(0, 100), randint(0, 100)
expected = x + y
component = Component(
main=addition_client_main,
transports=[
{
"url": url,
},
],
realm=realm,
extra={"queue": queue, "x": x, "y": y, "uri": uri},
)
# run the component and wait till the passed main coroutine has finished.
result = await component.start()
assert result is None
# wait till client has produced result and assert
result = await queue.get()
assert result == expected
# -*- coding: utf-8 -*-
"""Test a minimal RPC caller/callee setup with crossbar.io using components and reusing
the subclassing API of ApplicationSession objects and "session_factory=" pattern.
A WAMP distributed application is made up of multiple components that register with a
central broker/dealer. That central broker/dealer is the router and it is application
agnostic/generic.
So far the older ApplicationSession subclassing API has been used alongside the older
runner ApplicationRunner. The docs mentioned in numerous locations that this API is
still fine, however the newer Component based approach with the run() method is clearly
preferred for newer code.
SeeAlso:
https://autobahn.readthedocs.io/en/latest/wamp/programming.html
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from typing import Tuple
from typing import Any
from typing import Dict
from random import randint
import pytest
from autobahn.asyncio.component import Component
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend
from asyncio_playground.wamp.caller.addition import Addition as AdditionClient
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI
from asyncio_playground.wamp.meta.waiter import SessionKiller
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"ws://localhost:{port}/ws"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": realm,
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port},
"paths": {
"info": {"type": "nodeinfo"},
"ws": {"type": "websocket"},
},
}
],
},
],
}
return configuration, url, realm
@pytest.fixture
async def api_add(crossbar_router_responsive):
"""Take a crossbar router and register an API with it before passing
router data along to test. This is a proper way to register an API for tests.
"""
# data
url, realm = crossbar_router_responsive
uri = "com.example.add"
# backend
filterfunc = FilterRegistrationCreateByURI(uri)
waiter = BackendReadyWaiter(
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc
)
component = Component(
transports=[
{
"url": url,
},
],
realm=realm,
session_factory=AdditionBackend,
extra={"uri": uri},
)
async with waiter:
future_ = component.start()
session_id, event = waiter.awaited_event
# # kill the registration of backend to avoid error on crossbar router
# # process termination like this:
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost
# # [WAMP transport was lost without closing the session 456128172270144 before]
# # These errors seem more like noise then an actual problem, however, lets
# # avoid them if we can.
async with SessionKiller(url, realm, [session_id]):
yield url, realm
async def test_minimal_rpc_api_component_subclass(api_add):
"""Subclassing based APIs (using the ApplicationSession subclass approach) can also
be run using Components. In this case the ApplicationSession subclass has to be
passed to the session_factory keyword argument.
"""
# data
url, realm = api_add
uri = "com.example.add"
queue = Queue()
# run 3 calls against API
for _ in range(3):
x, y = randint(0, 100), randint(0, 100)
expected = x + y
component = Component(
transports=[
{
"url": url,
},
],
realm=realm,
session_factory=AdditionClient,
extra={"queue": queue, "x": x, "y": y, "uri": uri},
)
# start component without blocking until it has finished.
# if we awaited here, then the component would have to finish somehow, like
# failing or succeeding or .leave() being called.
future_ = component.start()
# wait till client has produced result and assert
result = await queue.get()
assert result == expected
# -*- coding: utf-8 -*-
"""Test a minimal RPC caller/callee setup with crossbar.io using tls for encryption in
transit.
"""
from __future__ import unicode_literals, print_function, division, absolute_import
import ssl
from logging import getLogger
from asyncio import Queue
from asyncio import wait_for
from typing import Tuple
from typing import Any
from typing import Dict
from random import randint
from pathlib import Path
import pytest
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend
from asyncio_playground.wamp.caller.addition import Addition as AdditionClient
from asyncio_playground.wamp.meta.callee_introspection import KillCalleesForURI
from .utilities import wait_until_responsive
from .utilities import IsResponsiveWebsocket
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(
unused_tcp_port: int, ssl_certificates: Tuple[Path, Path]
) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
ssl_key_file, ssl_cert_file = ssl_certificates
url = f"wss://localhost:{port}"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": realm,
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "websocket",
"id": "systemtests_tls_websocket",
"endpoint": {
"type": "tcp",
"port": port,
"tls": {
"key": str(ssl_key_file),
"certificate": str(ssl_cert_file),
},
},
}
],
},
],
}
return configuration, url, realm
@pytest.fixture
async def crossbar_router_responsive(
crossbar_router, ssl_certificates: Tuple[Path, Path]
):
"""Ensure that router is up and responsive. Uses basic ready check
by default but can be overridden for specific checks. (For example
against websocket endpoints).
"""
# data
url, realm = crossbar_router
ssl_key_file, ssl_cert_file = ssl_certificates
ssl_context = ssl.create_default_context(cafile=ssl_cert_file)
# ping till websocket available
is_responsive = IsResponsiveWebsocket(url, realm, ssl_context)
await wait_until_responsive(coro_factory=is_responsive, timeout=20.0, pause=0.25)
yield url, realm
@pytest.fixture
async def api_add(crossbar_router_responsive, ssl_certificates):
"""Take a crossbar router and register an API with it before passing
router data along to test. This is a proper way to register an API for tests.
"""
# data
url, realm = crossbar_router_responsive
ssl_key_file, ssl_cert_file = ssl_certificates
uri = "com.example.add"
queue = Queue()
ssl_context = ssl.create_default_context(cafile=ssl_cert_file)
# backend
runner_backend = ApplicationRunner(url, realm, extra={"uri": uri}, ssl=ssl_context)
coro_backend = runner_backend.run(AdditionBackend, start_loop=False)
transport_backend, protocol_backend = await coro_backend
yield url, realm
# kill backend registration to avoid transport lost error
runner_kill_backend = ApplicationRunner(
url, realm, extra={"uri": uri, "queue": queue}, ssl=ssl_context
)
coro_kill_backend = runner_kill_backend.run(KillCalleesForURI, start_loop=False)
transport_kill_backend, protocol_kill_backend = await coro_kill_backend
session_ids = await queue.get()
async def test_minimal_rpc_tls(api_add, ssl_certificates):
"""Test the minimal RPC caller/callee setup using encrypted communication via TLS."""
# data
url, realm = api_add
ssl_key_file, ssl_cert_file = ssl_certificates
uri = "com.example.add"
queue = Queue()
ssl_context = ssl.create_default_context(cafile=ssl_cert_file)
# run 3 calls against API
for _ in range(3):
x, y = randint(0, 100), randint(0, 100)
expected = x + y
# client
runner_client = ApplicationRunner(
url,
realm,
extra={"queue": queue, "x": x, "y": y, "uri": uri},
ssl=ssl_context,
)
coro_client = runner_client.run(AdditionClient, start_loop=False)
transport_client, protocol_client = await coro_client
# wait till client has produced result and assert
result = await wait_for(queue.get(), 5)
assert result == expected
# -*- coding: utf-8 -*-
"""Test assumptions about the WAMP Session lifecycle.
SeeAlso:
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle
"""
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Queue
from asyncio import Event
from asyncio import wait_for
from asyncio import sleep
from typing import Tuple
from typing import Any
from typing import Dict
import pytest
from autobahn.wamp.types import CloseDetails
from autobahn.asyncio.websocket import WampWebSocketClientProtocol
from autobahn.wamp.types import SessionDetails
from autobahn.asyncio.wamp import ApplicationRunner
from autobahn.asyncio.component import Component
from asyncio_playground.wamp.meta.session_lifecycle import TypecheckingSession
from asyncio_playground.wamp.meta.session_lifecycle import LifecycleEvent
from asyncio_playground.wamp.meta.session_lifecycle import (
SessionLifecycleTrackerSubclassAPI,
)
from asyncio_playground.wamp.meta.session_lifecycle import create_component
logger = getLogger(__name__)
@pytest.fixture
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]:
"""Configuration dictionary of crossbar application."""
port = unused_tcp_port
url = f"ws://localhost:{port}/ws"
realm = f"systemtests"
configuration = {
"version": 2,
"workers": [
{
"type": "router",
"realms": [
{
"name": realm,
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": True,
"register": True,
"publish": True,
"subscribe": True,
},
"disclose": {
"caller": False,
"publisher": False,
},
"cache": True,
},
],
}
],
}
],
"transports": [
{
"type": "web",
"endpoint": {"type": "tcp", "port": port},
"paths": {
"info": {"type": "nodeinfo"},
"ws": {"type": "websocket"},
},
}
],
},
],
}
return configuration, url, realm
async def test_session_lifecycle_subclassing_api_and_application_runner(
crossbar_router_responsive,
):
"""Test the WAMP session lifecycle events when running an ApplicationSession
via ApplicationRunner.
"""
# data
url, realm = crossbar_router_responsive
expected = [
LifecycleEvent.on_connect,
LifecycleEvent.on_join,
LifecycleEvent.on_ready,
LifecycleEvent.on_leave,
LifecycleEvent.on_disconnect,
]
event = Event()
queue = Queue()
# session
runner_session = ApplicationRunner(
url, realm, extra={"event": event, "queue": queue}
)
coro_session = runner_session.run(
SessionLifecycleTrackerSubclassAPI, start_loop=False
)
transport_session, protocol_session = await coro_session
await wait_for(event.wait(), 10)
# wait till client has produced result and assert
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))]
assert results == expected
async def test_session_lifecycle_subclassing_api_and_component_runner(
crossbar_router_responsive,
):
"""Test the WAMP session lifecycle events when running an ApplicationSession
via Component runner. The component runner is newer and generally preferred over
the older ApplicationRunner. However, it doesn't seem to matter so much whether
to write an ApplicationSession subclass and use the "session_factory" on a Component
or use Component decorators with a factory function approach.
"""
# data
url, realm = crossbar_router_responsive
expected = [
LifecycleEvent.on_connect,
LifecycleEvent.on_join,
LifecycleEvent.on_ready,
LifecycleEvent.on_leave,
LifecycleEvent.on_disconnect,
]
event = Event()
queue = Queue()
# session
component = Component(
transports=[
{
"url": url,
},
],
realm=realm,
session_factory=SessionLifecycleTrackerSubclassAPI,
extra={"event": event, "queue": queue},
)
# we don't need an Event with components. start() will
# return when the component finished
await wait_for(component.start(), 10)
# wait till client has produced result and assert
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))]
assert results == expected
async def test_session_lifecycle_decorator_api_and_component_runner(
crossbar_router_responsive,
):
"""The decorator API seems to be the most modern and preferred one according to the
docs.
"""
# data
url, realm = crossbar_router_responsive
expected = [
LifecycleEvent.on_connect,
LifecycleEvent.on_join,
LifecycleEvent.on_ready,
LifecycleEvent.on_leave,
LifecycleEvent.on_disconnect,
]
queue = Queue()
# session
component = create_component(
transports=[
{
"url": url,
},
],
realm=realm,
extra={"queue": queue},
)
# we don't need an Event with components. start() will
# return when the component finished
await wait_for(component.start(), 10)
# wait till client has produced result and assert
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))]
assert results == expected
async def test_session_lifecycle_decorator_additional_listeners(
crossbar_router_responsive,
):
"""It's possible to register multiple listeners for events. Let's test that multiple
listeners are invoked as expected.
"""
# data
url, realm = crossbar_router_responsive
expected = [
LifecycleEvent.on_connect,
LifecycleEvent.on_connect,
LifecycleEvent.on_join,
LifecycleEvent.on_join,
LifecycleEvent.on_join,
LifecycleEvent.on_ready,
LifecycleEvent.on_leave,
LifecycleEvent.on_leave,
LifecycleEvent.on_disconnect,
LifecycleEvent.on_disconnect,
]
queue = Queue()
# additional listeners
async def onConnectAdditional(
session: TypecheckingSession, protocol: WampWebSocketClientProtocol
) -> None:
logger.warning(f"onConnectAdditional")
logger.warning(f"{session=}")
logger.warning(f"{protocol=}")
session.lifecycle_events.append(LifecycleEvent.on_connect)
async def onJoinAdditional(
session: TypecheckingSession, details: SessionDetails
) -> None:
logger.warning(f"onJoinAdditional")
logger.warning(f"{session=}")
logger.warning(f"{details=}")
await sleep(5)
session.lifecycle_events.append(LifecycleEvent.on_join)
async def onLeaveAdditional(
session: TypecheckingSession, details: CloseDetails
) -> None:
logger.warning(f"onLeaveAdditional")
logger.warning(f"{session=}")
logger.warning(f"{details=}")
session.lifecycle_events.append(LifecycleEvent.on_leave)
async def onDisconnectAdditional(
session: TypecheckingSession, *args, **kwargs
) -> None:
logger.warning(f"onDisconnectAdditional")
logger.warning(f"{session=}")
logger.warning(f"{args=}")
logger.warning(f"{kwargs=}")
event_ = LifecycleEvent.on_disconnect
session.lifecycle_events.append(event_)
await session.queue.put(event_)
# session
component = create_component(
additional_listener=[
("connect", onConnectAdditional),
("join", onJoinAdditional),
("join", onJoinAdditional),
("leave", onLeaveAdditional),
("disconnect", onDisconnectAdditional),
],
transports=[
{
"url": url,
},
],
realm=realm,
extra={"queue": queue},
)
# we don't need an Event with components. start() will
# return when the component finished
await wait_for(component.start(), 10)
# wait till client has produced result and assert
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))]
assert results == expected
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
import ssl
from logging import getLogger
from pathlib import Path
from uuid import uuid4
import pytest
from asyncio_playground.ssl.ssl_certificates import create_ssl_certificates
logger = getLogger(__name__)
async def test_create_ssl_certificates(tmp_path: Path):
"""Test the little helper function meant to provide self-signed ssl
certificates on the fly.
"""
# assert non-existence
directory = tmp_path / "ssl"
assert not directory.exists()
# create certificate
cn = f"{uuid4()}"
ssl_key_file, ssl_cert_file = create_ssl_certificates(directory, cn=cn)
# assert existence
assert directory.exists()
assert directory.is_dir()
assert ssl_key_file.is_file()
assert ssl_cert_file.is_file()
# assert certificate properties
# SeeAlso: https://docs.python.org/3/library/ssl.html#ssl.SSLContext.get_ca_certs
ssl_context = ssl.create_default_context(cafile=ssl_cert_file)
ca_cert = ssl_context.get_ca_certs()[0]
commonName = [
value
for nested_tuple in ca_cert["subject"]
for key, value in nested_tuple
if key == "commonName"
][0]
assert commonName == cn
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
import ssl
import socket
from logging import getLogger
from asyncio import sleep
from typing import Tuple
from typing import Optional
from asyncio import wait_for
from asyncio import Event
from contextlib import closing
from typing import Awaitable
from itertools import count
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from aiohttp import ClientSession
logger = getLogger(__name__)
def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
async def request(
method: str, url: str, ssl_context: Optional[ssl.SSLContext] = None
) -> Tuple[int, str]:
async with ClientSession() as session:
async with session.request(method, url, ssl=ssl_context) as response:
return response.status, await response.text()
async def wait_until_responsive(coro_factory, timeout, pause):
"""Wait until a service is responsive."""
async def check_():
for index in count(start=1):
logger.debug(f"Responsiveness check: {index}")
coro = coro_factory()
result = await coro
if result is True:
break
await sleep(pause)
try:
await wait_for(check_(), timeout)
except TimeoutError as exception_instance:
exception = RuntimeError(f"Timeout reached while waiting on service")
raise exception from exception_instance
class IsResponsiveHTTP:
def __init__(
self, method: str, url: str, ssl_context: Optional[ssl.SSLContext] = None
):
self.method = method
self.url = url
self.ssl_context = ssl_context
def __call__(self, *args, **kwargs) -> Awaitable:
"""Return a new coroutine when called."""
return self.is_responsive()
async def is_responsive(self) -> bool:
try:
status, _ = await request(self.method, self.url, self.ssl_context)
except Exception:
return False
else:
return status == 200
class IsResponsiveWebsocket:
def __init__(
self, url: str, realm: str, ssl_context: Optional[ssl.SSLContext] = None
):
self.url = url
self.realm = realm
self.ssl_context = ssl_context
self.event = Event()
def __call__(self, *args, **kwargs) -> Awaitable:
"""Return a new coroutine when called."""
return self.is_responsive()
async def is_responsive(self) -> bool:
try:
logger.debug(f"is_responsive()")
runner_ping = ApplicationRunner(
self.url, self.realm, ssl=self.ssl_context, extra={"event": self.event}
)
coro_ping = runner_ping.run(Ping, start_loop=False)
transport_backend, protocol_backend = await coro_ping
await wait_for(self.event.wait(), 5)
except Exception as exception_instance:
logger.debug(f"No. {exception_instance=}")
return False
else:
logger.debug(f"Yes")
return True
class Ping(ApplicationSession):
"""Check if an address can be connected to."""
async def onJoin(self, details):
logger.debug(f"onJoin({details})")
config = self.config
extra = config.extra
event: Event = extra["event"]
self.leave()
event.set()
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function, division, absolute_import
from logging import getLogger
from asyncio import Event, wait_for
from enum import Enum
from typing import Callable
from typing import Optional
from typing import Any
from typing import List
from asyncio import Queue
from asyncio import Event
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from autobahn.wamp.request import Subscription
from .callee_introspection import KillSessions
logger = getLogger(__name__)
# wamp.registration.on_create
# {
# 'id': 4280645366663556,
# 'created': '2022-02-13T19:43:06.805Z',
# 'uri': 'com.example.add',
# 'match': 'exact',
# 'invoke': 'single'
# }
def passthrough(*args, **kwargs) -> bool:
logger.warning(f"passthrough({args=}, {kwargs=})")
return True
class FilterRegistrationCreateByURI:
def __init__(self, uri: str) -> None:
self.uri = uri
def __call__(self, *args: Any, **kwargs: Any) -> Any:
logger.warning(f"FilterRegistrationCreateByURI: __call__")
logger.warning(f"{args=}")
logger.warning(f"{kwargs=}")
# args have wrong length?
if len(args) != 2:
return False
id_, event = args
# event not a dict?
if not isinstance(event, dict):
return False
# no uri key?
if "uri" not in event:
return False
uri = event["uri"]
# uri doesn't match?
if uri != self.uri:
return False
return True
class WAMPMetaEvent(str, Enum):
# Meta Events for Sessions
wamp_session_on_join = "wamp.session.on_join"
wamp_session_on_leave = "wamp.session.on_leave"
# Meta Events for Subscriptions
wamp_subscription_on_create = "wamp.subscription.on_create"
wamp_subscription_on_subscribe = "wamp.subscription.on_subscribe"
wamp_subscription_on_unsubscribe = "wamp.subscription.on_unsubscribe"
wamp_subscription_on_delete = "wamp.subscription.on_delete"
# Meta Events for Registrations
wamp_registration_on_create = "wamp.registration.on_create"
wamp_registration_on_register = "wamp.registration.on_register"
wamp_registration_on_unregister = "wamp.registration.on_unregister"
wamp_registration_on_delete = "wamp.registration.on_delete"
# Meta Events for Schemas
wamp_schema_on_define = "wamp.schema.on_define"
wamp_schema_on_undefine = "wamp.schema.on_undefine"
class WaitForMetaEvent(ApplicationSession):
"""Wait for passed meta event possibly using additional filtering. Once a target
event that passes the optional filtering has happened, set the event, unsubscribe
and leave.
SeeAlso:
https://crossbar.io/docs/Session-Metaevents-and-Procedures/
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/
https://crossbar.io/docs/Subscription-Meta-Events-and-Procedures/
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor-sessions.py
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor.html
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle
"""
async def onJoin(self, details):
self.subscription_: Optional[Subscription] = None
config = self.config
extra = config.extra
meta_event: WAMPMetaEvent = extra["meta_event"]
event_subscription: Event = extra["event_subscription"]
event: Event = extra["event"]
queue: Queue = extra["queue"]
filterfunc: Callable[[Any], bool] = extra["filterfunc"]
meta_event_value: str = meta_event.value
async def on_meta_event(*args, **kwargs):
logger.warning(f"WaitForMetaEvent: on_meta_event()")
logger.warning(f"{args=}")
logger.warning(f"{kwargs=}")
# filterfunc not successful? wait more
logger.warning(f"WaitForMetaEvent: apply filterfunc")
if not filterfunc(*args):
logger.warning(f"WaitForMetaEvent: filterfunc false")
return
logger.warning(f"WaitForMetaEvent: finish wait")
await self.subscription_.unsubscribe()
await queue.put(args)
self.leave()
event.set()
# subscriptions
self.subscription_: Subscription = await self.subscribe(
on_meta_event, meta_event_value
)
event_subscription.set()
class BackendReadyWaiter:
"""Async. context manager that waits for the complete registration
of the RPC API registered in the enclosed context.
"""
def __init__(
self,
url: str,
realm: str,
meta_event: WAMPMetaEvent,
timeout: float = 20.0,
filterfunc: Callable[[Any], bool] = passthrough,
):
self.url = url
self.realm = realm
self.meta_event = meta_event
self.timeout = timeout
self.filterfunc = filterfunc
self._subscription: Optional[Subscription] = None
self.queue = Queue()
self.event_subscription = Event()
self.event = Event()
self.awaited_event: Optional[Any] = None
async def __aenter__(self):
logger.warning(f"BackendReadyWaiter: __aenter__")
url = self.url
realm = self.realm
meta_event = self.meta_event
event = self.event
event_subscription = self.event_subscription
queue = self.queue
timeout = self.timeout
filterfunc = self.filterfunc
# subscribe to meta event
runner_meta_event = ApplicationRunner(
url,
realm,
extra={
"meta_event": meta_event,
"event": event,
"queue": queue,
"event_subscription": event_subscription,
"filterfunc": filterfunc,
},
)
coro_backend = runner_meta_event.run(WaitForMetaEvent, start_loop=False)
transport_backend, protocol_backend = await coro_backend
# and make sure subscription happened before entering context
logger.warning(f"BackendReadyWaiter: wait for subscription")
await wait_for(event_subscription.wait(), timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Runs upon leaving of context upon finishing or exception."""
logger.warning(f"BackendReadyWaiter: __aexit__")
event = self.event
queue = self.queue
timeout = self.timeout
# exception occurred? bubble up
if exc_val:
return
# enclosed context ran without error, wait for meta event
logger.warning(f"BackendReadyWaiter: wait for event")
await wait_for(event.wait(), timeout)
logger.warning(f"BackendReadyWaiter: awaited event from queue")
self.awaited_event = await queue.get()
class SessionKiller:
"""Async. context manager that kills sessions for given ids on its
way out.
"""
def __init__(
self,
url: str,
realm: str,
session_ids: List[int],
timeout: float = 20.0,
):
self.url = url
self.realm = realm
self.session_ids = session_ids
self.timeout = timeout
self.event = Event()
async def __aenter__(self):
logger.warning(f"SessionKiller: __aenter__")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Runs upon leaving of context upon finishing or exception."""
logger.warning(f"SessionKiller: __aexit__")
url = self.url
realm = self.realm
session_ids = self.session_ids
timeout = self.timeout
event = self.event
# exception occurred? bubble up
if exc_val:
return
# enclosed context ran without error, kill sessions
logger.warning(f"SessionKiller: kill sessions")
runner_kill_sessions = ApplicationRunner(
url, realm, extra={"session_ids": session_ids, "event": event}
)
coro_kill_sessions = runner_kill_sessions.run(KillSessions, start_loop=False)
transport_kill_sessions, protocol_kill_sessions = await coro_kill_sessions
logger.warning(f"SessionKiller: wait for event")
await wait_for(event.wait(), timeout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment