Created
February 20, 2022 22:05
-
-
Save timmwagener/0aa27404112e8faba98a6f719e0fce7b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.asyncio.component import Component | |
logger = getLogger(__name__) | |
class Addition(ApplicationSession): | |
"""Simple component that registers a function as RPC callable | |
that adds 2 values. | |
""" | |
async def onJoin(self, details): | |
# TODO: Pydantic models for extra arguments | |
config = self.config | |
extra = config.extra | |
uri = extra["uri"] | |
def add(x: int, y: int) -> int: | |
logger.debug(f"Callee calling add({x}, {y})") | |
return x + y | |
await self.register(add, uri) | |
def create_component(uri: str, *args, **kwargs) -> Component: | |
"""Factory function to create component instances.""" | |
component = Component(*args, **kwargs) | |
@component.register(uri) | |
async def add(x: int, y: int) -> int: | |
logger.debug(f"Callee calling add({x}, {y})") | |
return x + y | |
return component |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import AbstractEventLoop | |
from autobahn.wamp.types import SessionDetails | |
from autobahn.asyncio.wamp import Session | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.asyncio.component import Component | |
logger = getLogger(__name__) | |
async def main(loop: AbstractEventLoop, session: Session) -> int: | |
"""Components also offer the ability to run a main coroutine after a | |
connection has been established. This coroutine runs until it is finished at | |
which point the session is left automatically. | |
Basically a neat shortcut for running some code after connecting, which | |
internally creates an onJoin() function on the fly and connects it. | |
""" | |
# TODO: Pydantic models for extra arguments | |
config = session.config | |
extra = config.extra | |
uri = extra["uri"] | |
queue = extra["queue"] | |
x = extra["x"] | |
y = extra["y"] | |
logger.debug(f"Caller calling add({x}, {y})") | |
try: | |
result = await session.call(uri, x, y) | |
except Exception as exception_instance: | |
logger.error(f"Error: {exception_instance}") | |
raise | |
else: | |
logger.debug(f"Result of add: {result}") | |
await queue.put(result) | |
return result | |
class Addition(ApplicationSession): | |
"""Consumer of the RPC add service.""" | |
async def onJoin(self, details: SessionDetails) -> None: | |
# TODO: Pydantic models for extra arguments | |
config = self.config | |
extra = config.extra | |
uri = extra["uri"] | |
queue = extra["queue"] | |
x = extra["x"] | |
y = extra["y"] | |
logger.debug(f"Caller calling add({x}, {y})") | |
try: | |
result = await self.call(uri, x, y) | |
except Exception as exception_instance: | |
logger.error(f"Error: {exception_instance}") | |
raise | |
else: | |
logger.debug(f"Result of add: {result}") | |
await queue.put(result) | |
await self.leave() | |
def create_component(*args, **kwargs) -> Component: | |
"""Factory function to create component instances.""" | |
component = Component(*args, **kwargs) | |
@component.on_join | |
async def join(session: Session, details: SessionDetails) -> None: | |
# TODO: Pydantic models for extra arguments | |
config = session.config | |
extra = config.extra | |
uri = extra["uri"] | |
queue = extra["queue"] | |
x = extra["x"] | |
y = extra["y"] | |
logger.debug(f"Caller calling add({x}, {y})") | |
try: | |
result = await session.call(uri, x, y) | |
except Exception as exception_instance: | |
logger.error(f"Error: {exception_instance}") | |
raise | |
else: | |
logger.debug(f"Result of add: {result}") | |
await queue.put(result) | |
# TODO: Should we await session.leave()? Some docs do it, but the code | |
# seems not awaitable. However there's also no error upon awaiting. | |
# SeeAlso: https://autobahn.readthedocs.io/en/latest/wamp/programming.html#longer-example | |
await session.leave() | |
return component |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from asyncio import Event | |
from typing import Tuple | |
from typing import List | |
from typing import Tuple | |
from autobahn.asyncio.wamp import ApplicationSession | |
logger = getLogger(__name__) | |
class GetCalleesForURI(ApplicationSession): | |
"""Take a uri from extras and retrieve its callees if any. Return the result | |
via given queue. | |
SeeAlso: | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/#retrieving-information-about-a-specific-registration | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/ | |
""" | |
def get_data(self) -> Tuple[str, Queue]: | |
config = self.config | |
extra = config.extra | |
uri = extra["uri"] | |
queue = extra["queue"] | |
return uri, queue | |
async def callees_for_uri(self, uri: str) -> List[int]: | |
registration_id = await self.call("wamp.registration.lookup", uri) | |
logger.debug(f"{registration_id=}") | |
list_callees = await self.call( | |
"wamp.registration.list_callees", registration_id | |
) | |
logger.debug(f"{list_callees=}") | |
return list_callees | |
async def onJoin(self, details): | |
uri, queue = self.get_data() | |
list_callees = await self.callees_for_uri(uri) | |
self.leave() | |
await queue.put(list_callees) | |
class KillCalleesForURI(GetCalleesForURI): | |
"""Extend retrieval of callees by killing them. This can be useful for example in | |
test teardown. | |
SeeAlso: | |
https://crossbar.io/docs/Session-Metaevents-and-Procedures/#killing-a-session | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/ | |
""" | |
async def onJoin(self, details): | |
uri, queue = self.get_data() | |
list_callees = await self.callees_for_uri(uri) | |
for callee_id in list_callees: | |
logger.debug(f"Kill session: {callee_id=}") | |
session_kill = await self.call("wamp.session.kill", callee_id) | |
self.leave() | |
await queue.put(list_callees) | |
class KillSessions(ApplicationSession): | |
"""Kill session for given session ids. | |
SeeAlso: | |
https://crossbar.io/docs/Session-Metaevents-and-Procedures/#killing-a-session | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/ | |
""" | |
async def onJoin(self, details): | |
config = self.config | |
extra = config.extra | |
session_ids: List[int] = extra["session_ids"] | |
event: Event = extra["event"] | |
for session_id in session_ids: | |
logger.warning(f"Kill session: {session_id=}") | |
session_kill = await self.call("wamp.session.kill", session_id) | |
logger.warning(f"Session killed: {session_kill=}") | |
self.leave() | |
event.set() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from json import dumps | |
from asyncio import create_subprocess_exec | |
from asyncio.subprocess import PIPE | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from typing import Tuple | |
import pytest | |
from asyncio_playground.ssl.ssl_certificates import create_ssl_certificates | |
from .utilities import IsResponsiveHTTP | |
from .utilities import IsResponsiveWebsocket | |
from .utilities import wait_until_responsive | |
logger = getLogger(__name__) | |
@pytest.fixture | |
async def crossbar_directory(tmp_path: Path) -> Path: | |
# create directory | |
crossbar_directory = tmp_path / "crossbar" | |
crossbar_directory.mkdir(parents=True, exist_ok=False) | |
yield crossbar_directory | |
# in debug log mode, log logfile content | |
# logger.debug([x for x in crossbar_directory.iterdir() if x.is_file()]) | |
logfile = crossbar_directory / "node.log" | |
try: | |
log = logfile.read_text() | |
except FileNotFoundError as exception_instance: | |
msg = f"Could not log logfile content. Logfile not found. ({logfile})" | |
logger.debug(msg) | |
else: | |
logger.debug(log) | |
@pytest.fixture | |
def ssl_certificates(crossbar_directory: Path) -> Tuple[Path, Path]: | |
"""Expose self signed certificates via fixture.""" | |
return create_ssl_certificates(crossbar_directory) | |
@pytest.fixture | |
async def crossbar_router(crossbar_configuration, crossbar_directory): | |
"""Provide a crossbar router for the duration of the test | |
and tear it down afterwards. | |
""" | |
configuration, url, realm = crossbar_configuration | |
crossbar_directory.mkdir(parents=True, exist_ok=True) | |
crossbar_configuration_serialized = dumps(configuration, indent=4) | |
crossbar_configuration_file = crossbar_directory / "configuration.json" | |
crossbar_configuration_file.write_text(crossbar_configuration_serialized) | |
# crossbar router process | |
cmd = [ | |
"crossbar", | |
"start", | |
"--cbdir", | |
str(crossbar_directory), | |
"--config", | |
str(crossbar_configuration_file), | |
"--logdir", | |
str(crossbar_directory), | |
"--logtofile", | |
] | |
process = await create_subprocess_exec( | |
*cmd, | |
stdout=PIPE, | |
stderr=PIPE, | |
) | |
yield url, realm | |
# shutdown process | |
process.terminate() | |
stdout, stderr = await process.communicate() | |
@pytest.fixture | |
async def crossbar_router_responsive(crossbar_router): | |
"""Ensure that router is up and responsive. Uses basic ready check | |
by default but can be overridden for specific checks. (For example | |
against websocket endpoints). | |
""" | |
url, realm = crossbar_router | |
url_parsed = urlparse(url) | |
scheme = url_parsed.scheme | |
if scheme == "http": | |
coro_factory = IsResponsiveHTTP("get", url) | |
elif scheme == "ws": | |
coro_factory = IsResponsiveWebsocket(url, realm) | |
else: | |
msg = f"No responsiveness check implemented for scheme {scheme}" | |
raise NotImplementedError(msg) | |
await wait_until_responsive(coro_factory=coro_factory, timeout=20.0, pause=0.25) | |
yield url, realm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import annotations | |
import logging | |
from pathlib import Path | |
import pytest | |
logger = logging.getLogger(__name__) | |
# session and module scoped fixtures can be accessed by | |
# function scoped fixtures, but not the other way round | |
@pytest.fixture(scope="session") | |
def repository_directory(): | |
return Path(__file__).parent.parent.parent | |
@pytest.fixture(scope="session") | |
def test_directory(repository_directory): | |
return repository_directory / "tests" / "systemtests" | |
@pytest.fixture(scope="session") | |
def test_data_directory(test_directory): | |
"""Override in sub-testpackages for more local test data..""" | |
return test_directory / "test_data" | |
# autouse fixtures for the whole test suite. | |
# They are supposed to be overwritten on subpackage level for customization. | |
# At this level, the main goal is test isolation. | |
@pytest.fixture(scope="session", autouse=True) | |
def reset_state_for_session(): | |
pass | |
@pytest.fixture(scope="module", autouse=True) | |
def reset_state_for_module(): | |
pass | |
@pytest.fixture(autouse=True) | |
def reset_state_for_function(): | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<mxfile host="Electron" modified="2022-02-13T14:46:01.288Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/16.5.1 Chrome/96.0.4664.110 Electron/16.0.7 Safari/537.36" etag="0neny86yiuhX1PNNk_1-" version="16.5.1" type="device"><diagram id="_6lKf6-y1SfZ0dx7U7l8" name="fixtures">7VpRb+I4EP41PF6VxE6Ax0Lb25W62qrV6XT7gtzEgLUmzjlmgf31N04cQmJyBZZAu4rEA57YE+f7vhlPYvfQeLH+U5Jk/kVElPc8J1r30F3P81zseT39c6JNbhkiJzfMJItMp9Lwwn5SYyy6LVlE00pHJQRXLKkaQxHHNFQVG5FSrKrdpoJX75qQGbUMLyHhtvVvFql5bh34Tmn/RNlsXtzZdcyVBSk6G0M6J5FY7ZjQfQ+NpRAq/7dYjynX4BW45OMeGq5uJyZprA4ZIJ7pp+Hz568P9w/fvv6LX4aDf+gf7iB384PwpXniZbxMaTRRYTJJhFRm8mpTIDJXCw7/3B4ambFUKrpunJW7fVYQCRULquQGuhQDhgaeTa29KtHe2uY7SBcMEEPwbOu6xAD+GBiOgWRoQfJXSi0YaBzdan1BSyQ0BjTAYvTrwgOPIpLOaWSQ2gFNimUcZRecLYQ0slRYAxDuLZYypP8z8SLUFJEzqt4k3aZkF/LAwCspJ4r9qM5uH+bG3ZNgMO+SXlSl1x/UaMufyozaVW/NkTd8w1H+1JajTALbZzxdFUUUfzhVIHygKrzgYqpADq6QiYKTVRFUs0fdUduqcH97VXgXUwUOaiHunqgKv+YI1x21rQrPUkUoRZq+EjmBOmXKZksJQInYEkq6YgtOYmiNpiJWL+aKJj+cMx49ko1YarpSRcLvRWs0F5L9hP6k0A1clsoIDOIavDHOx4ILCYZYZDcoB71oZ+Y2kqYw7KmQlFszfSHrSsdHkqpigoJzkqTsNZuyHrgAmFk8EkqJRUXCv1QxeI5fTR54T8UQ7KkY3CLrnL1k8JBFeM8b5VXpDtvoVhfCLLRrKkBDZbRJ8Z3WeNpDHeFsBu7uOJ3qYRpOBiXrrTErkWhnCQlZPHvM+tzh0vJsINEmAWOnPMtMcxZFWW6SQhFFXrfKS3ScZJD5I/gBsmPnxu/5MPExtN2yDT/dXaqxiOFZCMsYpaCRFdU6OYz+o/KQEYEXHKaBot/5JeA3SGApeU48ANLxfm7efe/avOMG3iUlfNEx3xrz/cG1mQ+aV/mISRoqoe8ScA3DhlEeQWNWYtIt+0ct+7j2Anjwsl+kiPMLoN8Q+iX7Wfg/6S9BXfwfHf8N76PvZcVHdubfxj+8tykqu+A/V/AjXK35fXRg8BerxPnZt7N/V++dIerzqHq/UY+b6vyu3muX+atX+sjeKqnl+wnkxwRw0Z/Auiz/61k+6F89y9t7QV2WP0esN+xAvZss37S6d1m+XeavnuWLt8qPt6NjcuCbOzpN62wbOzquf+O5fTR0kYMRHlRr+FM3/RDCN37gbx1XFw3s177yt7zZg+0twFdYdEEOE5Kwybp7CWyrPBi41y4PsNeVB20sErhBCO+mPGj68teVB+0yf/3ywN7p/RjlQbFMvV0eXO5wGEYN73vHVgSWI9e7bA1gfxpQEAl68a8J47yHJ1G/uicyHOxZEd19eyJtHZ7EH/XwJD60fG7KUJeIj1MPT9rxUauITo4PaJYnmPPu5TlwdP8f</diagram></mxfile> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Event | |
from typing import List | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.wamp.request import Subscription | |
logger = getLogger(__name__) | |
class WAMPMetaEvents(ApplicationSession): | |
"""Log WAMP Meta events | |
SeeAlso: | |
https://crossbar.io/docs/Session-Metaevents-and-Procedures/ | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/ | |
https://crossbar.io/docs/Subscription-Meta-Events-and-Procedures/ | |
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor-sessions.py | |
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor.html | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle | |
""" | |
async def onJoin(self, details): | |
config = self.config | |
extra = config.extra | |
event: Event = extra["event"] | |
meta_topics = [ | |
# Meta Events for Sessions | |
"wamp.session.on_join", | |
"wamp.session.on_leave", | |
# Meta Events for Subscriptions | |
"wamp.subscription.on_create", | |
"wamp.subscription.on_subscribe", | |
"wamp.subscription.on_unsubscribe", | |
"wamp.subscription.on_delete", | |
# Meta Events for Registrations | |
"wamp.registration.on_create", | |
"wamp.registration.on_register", | |
"wamp.registration.on_unregister", | |
"wamp.registration.on_delete", | |
# Meta Events for Schemas | |
"wamp.schema.on_define", | |
"wamp.schema.on_undefine", | |
] | |
async def on_event(*args, **kwargs): | |
logger.debug(f"on_event()") | |
logger.debug(f"{args=}") | |
logger.debug(f"{kwargs=}") | |
# subscriptions | |
subscriptions: List[Subscription] = [ | |
await self.subscribe(on_event, topic) for topic in meta_topics | |
] | |
# subscribe to kill and also deregister from router | |
async def on_kill_event(*args, **kwargs): | |
logger.debug(f"on_kill_event({args}, {kwargs})") | |
unsubscriptions = [ | |
await subscruption.unsubscribe() for subscruption in subscriptions | |
] | |
self.leave() | |
await self.subscribe(on_kill_event, "com.kill.all") | |
# notify to clients that all subscriptions have happened | |
# At this point the WAMP router has the on_event handler registered | |
# for all events. | |
event.set() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from asyncio import Event | |
from asyncio import sleep | |
from typing import List | |
from typing import Optional | |
from typing import Tuple | |
from typing import Callable | |
from typing import Any | |
from typing import Awaitable | |
from typing import Coroutine | |
from typing import Iterable | |
from enum import Enum | |
from enum import unique | |
from autobahn.wamp.types import SessionDetails | |
from autobahn.wamp.types import CloseDetails | |
from autobahn.asyncio.websocket import WampWebSocketClientProtocol | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.asyncio.component import Component | |
from autobahn.asyncio.wamp import Session | |
logger = getLogger(__name__) | |
AwaitableFunction = Callable[[Any, Any], Coroutine[Any, Any, Any]] | |
@unique | |
class LifecycleEvent(Enum): | |
on_connect = "on_connect" | |
on_challenge = "on_challenge" | |
on_join = "on_join" | |
on_ready = "on_ready" | |
on_leave = "on_leave" | |
on_disconnect = "on_disconnect" | |
class SessionLifecycleTrackerSubclassAPI(ApplicationSession): | |
"""Track the lifecycle events of a WAMP session using the older subclassing | |
based API. | |
SeeAlso: | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle | |
""" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
logger.warning(f"__init__") | |
self.lifecycle_events: List[LifecycleEvent] = [] | |
self.event: Optional[Event] = None | |
self.queue: Optional[Queue] = None | |
# ready needs to registered manually | |
self.on("ready", self.onReady) | |
async def onConnect(self): | |
logger.warning(f"onConnect") | |
self.join(self.config.realm) | |
self.lifecycle_events.append(LifecycleEvent.on_connect) | |
async def onChallenge(self, challenge): | |
logger.warning(f"onChallenge") | |
self.lifecycle_events.append(LifecycleEvent.on_challenge) | |
async def onJoin(self, details: SessionDetails): | |
logger.warning(f"onJoin") | |
self.lifecycle_events.append(LifecycleEvent.on_join) | |
config = self.config | |
extra = config.extra | |
event = extra["event"] | |
queue = extra["queue"] | |
self.event = event | |
self.queue = queue | |
await self.leave() | |
async def onReady(self, details: SessionDetails): | |
logger.warning(f"onReady") | |
self.lifecycle_events.append(LifecycleEvent.on_ready) | |
async def onLeave(self, details: SessionDetails): | |
logger.warning(f"onLeave") | |
self.lifecycle_events.append(LifecycleEvent.on_leave) | |
self.disconnect() | |
async def onDisconnect(self): | |
logger.warning(f"onDisconnect") | |
self.lifecycle_events.append(LifecycleEvent.on_disconnect) | |
self.event.set() | |
for event_ in self.lifecycle_events: | |
await self.queue.put(event_) | |
class TypecheckingSession(Session): | |
"""Only for typechecking""" | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.lifecycle_events: List[LifecycleEvent] = [] | |
self.queue: Queue = Queue() | |
raise NotImplementedError("Only for typechecking") | |
def create_component( | |
additional_listener: Iterable[Tuple[str, AwaitableFunction]] = tuple(), | |
*args, | |
**kwargs, | |
) -> Component: | |
"""Factory function to create component instances.""" | |
component = Component(*args, **kwargs) | |
@component.on_connect | |
async def onConnect( | |
session: TypecheckingSession, protocol: WampWebSocketClientProtocol | |
) -> None: | |
logger.warning(f"onConnect") | |
logger.warning(f"{session=}") | |
logger.warning(f"{protocol=}") | |
config = session.config | |
extra = config.extra | |
queue: Queue = extra["queue"] | |
# extend session | |
session.lifecycle_events = [] | |
session.queue = queue | |
session.lifecycle_events.append(LifecycleEvent.on_connect) | |
@component.on_join | |
async def onJoin(session: TypecheckingSession, details: SessionDetails) -> None: | |
logger.warning(f"onJoin") | |
logger.warning(f"{session=}") | |
logger.warning(f"{details=}") | |
session.lifecycle_events.append(LifecycleEvent.on_join) | |
@component.on_ready | |
async def onReady(session: TypecheckingSession) -> None: | |
logger.warning(f"onReady") | |
logger.warning(f"{session=}") | |
session.lifecycle_events.append(LifecycleEvent.on_ready) | |
await session.leave() | |
@component.on_leave | |
async def onLeave(session: TypecheckingSession, details: CloseDetails) -> None: | |
logger.warning(f"onLeave") | |
logger.warning(f"{session=}") | |
logger.warning(f"{details=}") | |
session.lifecycle_events.append(LifecycleEvent.on_leave) | |
@component.on_disconnect | |
async def onDisconnect(session: TypecheckingSession, *args, **kwargs) -> None: | |
logger.warning(f"onDisconnect") | |
logger.warning(f"{session=}") | |
logger.warning(f"{args=}") | |
logger.warning(f"{kwargs=}") | |
session.lifecycle_events.append(LifecycleEvent.on_disconnect) | |
for event_ in session.lifecycle_events: | |
await session.queue.put(event_) | |
# register additional passed callbacks | |
for event, coroutine_function in additional_listener: | |
component.on(event, coroutine_function) | |
return component |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test invocation of crossbar cli. | |
SeeAlso: | |
https://crossbar.io/docs/ | |
https://crossbar.io/docs/Command-Line/ | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import create_subprocess_exec | |
from asyncio.subprocess import PIPE | |
import pytest | |
logger = getLogger(__name__) | |
async def test_crossbar_command_succeeds(): | |
"""Smoketest check if the crossbar command is installed/works.""" | |
cmd = ["crossbar"] | |
process = await create_subprocess_exec( | |
*cmd, | |
stdout=PIPE, | |
stderr=PIPE, | |
) | |
stdout, stderr = await process.communicate() | |
assert "Crossbar" in stdout.decode() | |
assert stderr.decode() == "" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test a most minimal, simple router setup with crossbar.io | |
SeeAlso: | |
https://crossbar.io/docs/ | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
import pytest | |
from aiohttp import ClientSession | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"http://localhost:{port}/info" | |
realm = f"realm1" | |
configuration = { | |
"$schema": "https://raw.githubusercontent.com/crossbario/crossbar/master/crossbar.json", | |
"version": 2, | |
"controller": {}, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": "realm1", | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
} | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port, "backlog": 1024}, | |
"paths": {"info": {"type": "nodeinfo"}}, | |
} | |
], | |
} | |
], | |
} | |
return configuration, url, realm | |
async def test_minimal_crossbar_io(crossbar_router_responsive): | |
"""Create a crossbar router and do the most minimal HTTP io.""" | |
url, realm = crossbar_router_responsive | |
# get response from crossbar server | |
async with ClientSession() as session: | |
async with session.get(url) as response: | |
data = await response.text() | |
# context manager variable assignments happen in main function | |
# scope and aren't bound to the context manager block | |
assert response.status == 200 | |
assert f"Crossbar" in data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test a minimal PubSub setup with crossbar.io""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from asyncio import Event | |
from asyncio import wait_for | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
from typing import cast | |
import pytest | |
from autobahn.asyncio.wamp import ApplicationRunner | |
from autobahn.wamp.websocket import WampWebSocketProtocol | |
from asyncio_playground.wamp.publisher.time_service import TimeService | |
from asyncio_playground.wamp.publisher.simple_publisher import SimplePublisher | |
from asyncio_playground.wamp.subscriber.recording_subscriber import RecordingSubscriber | |
from asyncio_playground.wamp.meta.meta_events import WAMPMetaEvents | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"ws://localhost:{port}/ws" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": "systemtests", | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port}, | |
"paths": { | |
"info": {"type": "nodeinfo"}, | |
"ws": {"type": "websocket"}, | |
}, | |
} | |
], | |
}, | |
], | |
} | |
return configuration, url, realm | |
@pytest.fixture | |
async def api_timeservice(crossbar_router_responsive): | |
# data | |
url, realm = crossbar_router_responsive | |
uri = "com.example.time" | |
queue = Queue() | |
event = Event() | |
# meta_events | |
runner_meta_events = ApplicationRunner(url, realm, extra={"event": event}) | |
coro_meta_events = runner_meta_events.run(WAMPMetaEvents, start_loop=False) | |
transport_meta_events, protocol_meta_events = await coro_meta_events | |
await wait_for(event.wait(), 10) | |
event.clear() | |
# backend | |
runner_backend = ApplicationRunner(url, realm, extra={"topic": uri, "event": event}) | |
coro_backend = runner_backend.run(TimeService, start_loop=False) | |
transport_backend, protocol_backend = await coro_backend | |
protocol_backend = cast(WampWebSocketProtocol, protocol_backend) | |
yield url, realm | |
# TODO: Could this be more elegant? Should this be more simple? | |
# I guess we could just call protocol_backend.close() and handle | |
# the TransportLost exception in the Session. However, use cleaner | |
# deregistration here for now. | |
# session_id | |
runner_kill_backend = ApplicationRunner(url, realm, extra={"topic": "com.kill.all"}) | |
coro_kill_backend = runner_kill_backend.run(SimplePublisher, start_loop=False) | |
transport_kill_backend, protocol_kill_backend = await coro_kill_backend | |
await event.wait() | |
async def test_minimal_pubsub(api_timeservice): | |
"""Register a TimeService that acts as a publisher and make sure a registered | |
subscriber is notified. | |
""" | |
# data | |
url, realm = api_timeservice | |
uri = "com.example.time" | |
queue = Queue() | |
event = Event() | |
limit = 3 | |
# client | |
runner_client = ApplicationRunner( | |
url, realm, extra={"queue": queue, "topic": uri, "limit": limit, "event": event} | |
) | |
coro_client = runner_client.run(RecordingSubscriber, start_loop=False) | |
transport_client, protocol_client = await coro_client | |
# wait till client has recorded a number of events, then assert | |
await event.wait() | |
timestamps = [await queue.get() for _ in range(queue.qsize())] | |
assert len(timestamps) == limit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test a minimal RPC caller/callee setup with crossbar.io | |
Components: | |
Router Components: | |
Run in the same process as the router. They should be lightweight, for example like | |
authorization. | |
Container Components: | |
Run in their own process. They can run Python applications natively. They do their | |
own authentication and authorization and can scale independently. | |
Crossbar containers: | |
* Different from docker containers | |
* Describes a Crossbar worker configuration defined in config.json | |
Hosted Container Workers are Python ApplicationSession objects written using twisted. | |
Hosted Container Components cannot be used with asyncio! | |
Workers: | |
Native Workers (runnning native Python Code): | |
Router | |
Container | |
Guest Workers: | |
Arbitrary processes connecting to router via Autobahn|Python, Autobahn|JS, | |
Autobahn|CPP etc. They can connect with ApplicationsSession objects using asyncio. | |
Session meta events: | |
https://crossbar.io/docs/Session-Metaevents-and-Procedures/ | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/ | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
from random import randint | |
import pytest | |
from autobahn.asyncio.wamp import ApplicationRunner | |
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend | |
from asyncio_playground.wamp.caller.addition import Addition as AdditionClient | |
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter | |
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent | |
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI | |
from asyncio_playground.wamp.meta.waiter import SessionKiller | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"ws://localhost:{port}/ws" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": realm, | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port}, | |
"paths": { | |
"info": {"type": "nodeinfo"}, | |
"ws": {"type": "websocket"}, | |
}, | |
} | |
], | |
}, | |
# Only twisted components can be hosted containers and started by | |
# crossbar.io configuration | |
# { | |
# "type": "container", | |
# # "options": {"pythonpath": [".."]}, | |
# "components": [ | |
# { | |
# "type": "class", | |
# "classname": f"{Component.__module__}.{Component.__qualname__}", | |
# "realm": "systemtests", | |
# "transport": { | |
# "type": "websocket", | |
# "endpoint": { | |
# "type": "tcp", | |
# "host": "127.0.0.1", | |
# "port": port, | |
# }, | |
# "url": f"ws://127.0.0.1:{port}/ws", | |
# }, | |
# } | |
# ], | |
# }, | |
], | |
} | |
return configuration, url, realm | |
@pytest.fixture | |
async def api_add(crossbar_router_responsive): | |
"""Take a crossbar router and register an API with it before passing | |
router data along to test. This is a proper way to register an API for tests. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
uri = "com.example.add" | |
# backend | |
filterfunc = FilterRegistrationCreateByURI(uri) | |
waiter = BackendReadyWaiter( | |
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc | |
) | |
async with waiter: | |
runner_backend = ApplicationRunner(url, realm, extra={"uri": uri}) | |
coro_backend = runner_backend.run(AdditionBackend, start_loop=False) | |
transport_backend, protocol_backend = await coro_backend | |
session_id, event = waiter.awaited_event | |
# # kill the registration of backend to avoid error on crossbar router | |
# # process termination like this: | |
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost | |
# # [WAMP transport was lost without closing the session 456128172270144 before] | |
# # These errors seem more like noise then an actual problem, however, lets | |
# # avoid them if we can. | |
async with SessionKiller(url, realm, [session_id]): | |
yield url, realm | |
async def test_minimal_rpc(crossbar_router_responsive): | |
"""Create a crossbar router, that exposes some RPC API and assert | |
that a client can call it via websockets. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
queue = Queue() | |
uri = "com.example.add" | |
x, y = randint(0, 100), randint(0, 100) | |
expected = x + y | |
# backend | |
runner_backend = ApplicationRunner(url, realm, extra={"uri": uri}) | |
coro_backend = runner_backend.run(AdditionBackend, start_loop=False) | |
transport_backend, protocol_backend = await coro_backend | |
# client | |
runner_client = ApplicationRunner( | |
url, realm, extra={"queue": queue, "x": x, "y": y, "uri": uri} | |
) | |
coro_client = runner_client.run(AdditionClient, start_loop=False) | |
transport_client, protocol_client = await coro_client | |
# wait till client has produced result and assert | |
result = await queue.get() | |
assert result == expected | |
async def test_minimal_rpc_api_fixture(api_add): | |
"""Test the minimal RPC registering the API in the fixture already. This should | |
allow to dry some setup code, making it more bullet proof by adding waiters for | |
proper registration and also making the test code more focused. | |
""" | |
# data | |
url, realm = api_add | |
uri = "com.example.add" | |
queue = Queue() | |
# run 3 calls against API | |
for _ in range(3): | |
x, y = randint(0, 100), randint(0, 100) | |
expected = x + y | |
# client | |
runner_client = ApplicationRunner( | |
url, realm, extra={"queue": queue, "x": x, "y": y, "uri": uri} | |
) | |
coro_client = runner_client.run(AdditionClient, start_loop=False) | |
transport_client, protocol_client = await coro_client | |
# wait till client has produced result and assert | |
result = await queue.get() | |
assert result == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Components also have a decorator based API. These tests are using it. | |
SeeAlso: | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#longer-example | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
from random import randint | |
import pytest | |
from autobahn.asyncio.component import Component | |
from asyncio_playground.wamp.callee.addition import ( | |
create_component as create_addition_backend, | |
) | |
from asyncio_playground.wamp.caller.addition import ( | |
create_component as create_addition_client, | |
) | |
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter | |
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent | |
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI | |
from asyncio_playground.wamp.meta.waiter import SessionKiller | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"ws://localhost:{port}/ws" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": realm, | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port}, | |
"paths": { | |
"info": {"type": "nodeinfo"}, | |
"ws": {"type": "websocket"}, | |
}, | |
} | |
], | |
}, | |
], | |
} | |
return configuration, url, realm | |
@pytest.fixture | |
async def api_add(crossbar_router_responsive): | |
"""Take a crossbar router and register an API with it before passing | |
router data along to test. This is a proper way to register an API for tests. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
uri = f"com.example.add" | |
# backend | |
filterfunc = FilterRegistrationCreateByURI(uri) | |
waiter = BackendReadyWaiter( | |
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc | |
) | |
component = create_addition_backend( | |
uri, | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
) | |
async with waiter: | |
future_ = component.start() | |
session_id, event = waiter.awaited_event | |
# # kill the registration of backend to avoid error on crossbar router | |
# # process termination like this: | |
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost | |
# # [WAMP transport was lost without closing the session 456128172270144 before] | |
# # These errors seem more like noise then an actual problem, however, lets | |
# # avoid them if we can. | |
async with SessionKiller(url, realm, [session_id]): | |
yield url, realm | |
async def test_minimal_rpc_api_component_decorators(api_add): | |
"""Run the addition hello world example using the decorator based Component API.""" | |
# data | |
url, realm = api_add | |
uri = "com.example.add" | |
queue = Queue() | |
# run 3 calls against API | |
for _ in range(3): | |
x, y = randint(0, 100), randint(0, 100) | |
expected = x + y | |
component = create_addition_client( | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
extra={"queue": queue, "x": x, "y": y, "uri": uri}, | |
) | |
# run the component and wait till the passed main coroutine has finished. | |
result = await component.start() | |
assert result is None | |
# wait till client has produced result and assert | |
result = await queue.get() | |
assert result == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Components can also execute a "main" coroutine after transport has connected. | |
This functionality is tested here. | |
SeeAlso: | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
from random import randint | |
import pytest | |
from autobahn.asyncio.component import Component | |
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend | |
from asyncio_playground.wamp.caller.addition import main as addition_client_main | |
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter | |
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent | |
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI | |
from asyncio_playground.wamp.meta.waiter import SessionKiller | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"ws://localhost:{port}/ws" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": realm, | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port}, | |
"paths": { | |
"info": {"type": "nodeinfo"}, | |
"ws": {"type": "websocket"}, | |
}, | |
} | |
], | |
}, | |
], | |
} | |
return configuration, url, realm | |
@pytest.fixture | |
async def api_add(crossbar_router_responsive): | |
"""Take a crossbar router and register an API with it before passing | |
router data along to test. This is a proper way to register an API for tests. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
uri = "com.example.add" | |
# backend | |
filterfunc = FilterRegistrationCreateByURI(uri) | |
waiter = BackendReadyWaiter( | |
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc | |
) | |
component = Component( | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
session_factory=AdditionBackend, | |
extra={"uri": uri}, | |
) | |
async with waiter: | |
future_ = component.start() | |
session_id, event = waiter.awaited_event | |
# # kill the registration of backend to avoid error on crossbar router | |
# # process termination like this: | |
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost | |
# # [WAMP transport was lost without closing the session 456128172270144 before] | |
# # These errors seem more like noise then an actual problem, however, lets | |
# # avoid them if we can. | |
async with SessionKiller(url, realm, [session_id]): | |
yield url, realm | |
async def test_minimal_rpc_api_component_main(api_add): | |
"""Components can also accept a "main" keyword argument, which is a coroutine | |
that runs automatically to completion after a transport has been established. | |
In this scenario, the "on_*" handlers are usually not needed. | |
""" | |
# data | |
url, realm = api_add | |
uri = "com.example.add" | |
queue = Queue() | |
# run 3 calls against API | |
for _ in range(3): | |
x, y = randint(0, 100), randint(0, 100) | |
expected = x + y | |
component = Component( | |
main=addition_client_main, | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
extra={"queue": queue, "x": x, "y": y, "uri": uri}, | |
) | |
# run the component and wait till the passed main coroutine has finished. | |
result = await component.start() | |
assert result is None | |
# wait till client has produced result and assert | |
result = await queue.get() | |
assert result == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test a minimal RPC caller/callee setup with crossbar.io using components and reusing | |
the subclassing API of ApplicationSession objects and "session_factory=" pattern. | |
A WAMP distributed application is made up of multiple components that register with a | |
central broker/dealer. That central broker/dealer is the router and it is application | |
agnostic/generic. | |
So far the older ApplicationSession subclassing API has been used alongside the older | |
runner ApplicationRunner. The docs mentioned in numerous locations that this API is | |
still fine, however the newer Component based approach with the run() method is clearly | |
preferred for newer code. | |
SeeAlso: | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
from random import randint | |
import pytest | |
from autobahn.asyncio.component import Component | |
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend | |
from asyncio_playground.wamp.caller.addition import Addition as AdditionClient | |
from asyncio_playground.wamp.meta.waiter import BackendReadyWaiter | |
from asyncio_playground.wamp.meta.waiter import WAMPMetaEvent | |
from asyncio_playground.wamp.meta.waiter import FilterRegistrationCreateByURI | |
from asyncio_playground.wamp.meta.waiter import SessionKiller | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"ws://localhost:{port}/ws" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": realm, | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port}, | |
"paths": { | |
"info": {"type": "nodeinfo"}, | |
"ws": {"type": "websocket"}, | |
}, | |
} | |
], | |
}, | |
], | |
} | |
return configuration, url, realm | |
@pytest.fixture | |
async def api_add(crossbar_router_responsive): | |
"""Take a crossbar router and register an API with it before passing | |
router data along to test. This is a proper way to register an API for tests. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
uri = "com.example.add" | |
# backend | |
filterfunc = FilterRegistrationCreateByURI(uri) | |
waiter = BackendReadyWaiter( | |
url, realm, WAMPMetaEvent.wamp_registration_on_create, filterfunc=filterfunc | |
) | |
component = Component( | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
session_factory=AdditionBackend, | |
extra={"uri": uri}, | |
) | |
async with waiter: | |
future_ = component.start() | |
session_id, event = waiter.awaited_event | |
# # kill the registration of backend to avoid error on crossbar router | |
# # process termination like this: | |
# # Backend:aio.py:162 session closed with reason wamp.close.transport_lost | |
# # [WAMP transport was lost without closing the session 456128172270144 before] | |
# # These errors seem more like noise then an actual problem, however, lets | |
# # avoid them if we can. | |
async with SessionKiller(url, realm, [session_id]): | |
yield url, realm | |
async def test_minimal_rpc_api_component_subclass(api_add): | |
"""Subclassing based APIs (using the ApplicationSession subclass approach) can also | |
be run using Components. In this case the ApplicationSession subclass has to be | |
passed to the session_factory keyword argument. | |
""" | |
# data | |
url, realm = api_add | |
uri = "com.example.add" | |
queue = Queue() | |
# run 3 calls against API | |
for _ in range(3): | |
x, y = randint(0, 100), randint(0, 100) | |
expected = x + y | |
component = Component( | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
session_factory=AdditionClient, | |
extra={"queue": queue, "x": x, "y": y, "uri": uri}, | |
) | |
# start component without blocking until it has finished. | |
# if we awaited here, then the component would have to finish somehow, like | |
# failing or succeeding or .leave() being called. | |
future_ = component.start() | |
# wait till client has produced result and assert | |
result = await queue.get() | |
assert result == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test a minimal RPC caller/callee setup with crossbar.io using tls for encryption in | |
transit. | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
import ssl | |
from logging import getLogger | |
from asyncio import Queue | |
from asyncio import wait_for | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
from random import randint | |
from pathlib import Path | |
import pytest | |
from autobahn.asyncio.wamp import ApplicationRunner | |
from asyncio_playground.wamp.callee.addition import Addition as AdditionBackend | |
from asyncio_playground.wamp.caller.addition import Addition as AdditionClient | |
from asyncio_playground.wamp.meta.callee_introspection import KillCalleesForURI | |
from .utilities import wait_until_responsive | |
from .utilities import IsResponsiveWebsocket | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration( | |
unused_tcp_port: int, ssl_certificates: Tuple[Path, Path] | |
) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
ssl_key_file, ssl_cert_file = ssl_certificates | |
url = f"wss://localhost:{port}" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": realm, | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "websocket", | |
"id": "systemtests_tls_websocket", | |
"endpoint": { | |
"type": "tcp", | |
"port": port, | |
"tls": { | |
"key": str(ssl_key_file), | |
"certificate": str(ssl_cert_file), | |
}, | |
}, | |
} | |
], | |
}, | |
], | |
} | |
return configuration, url, realm | |
@pytest.fixture | |
async def crossbar_router_responsive( | |
crossbar_router, ssl_certificates: Tuple[Path, Path] | |
): | |
"""Ensure that router is up and responsive. Uses basic ready check | |
by default but can be overridden for specific checks. (For example | |
against websocket endpoints). | |
""" | |
# data | |
url, realm = crossbar_router | |
ssl_key_file, ssl_cert_file = ssl_certificates | |
ssl_context = ssl.create_default_context(cafile=ssl_cert_file) | |
# ping till websocket available | |
is_responsive = IsResponsiveWebsocket(url, realm, ssl_context) | |
await wait_until_responsive(coro_factory=is_responsive, timeout=20.0, pause=0.25) | |
yield url, realm | |
@pytest.fixture | |
async def api_add(crossbar_router_responsive, ssl_certificates): | |
"""Take a crossbar router and register an API with it before passing | |
router data along to test. This is a proper way to register an API for tests. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
ssl_key_file, ssl_cert_file = ssl_certificates | |
uri = "com.example.add" | |
queue = Queue() | |
ssl_context = ssl.create_default_context(cafile=ssl_cert_file) | |
# backend | |
runner_backend = ApplicationRunner(url, realm, extra={"uri": uri}, ssl=ssl_context) | |
coro_backend = runner_backend.run(AdditionBackend, start_loop=False) | |
transport_backend, protocol_backend = await coro_backend | |
yield url, realm | |
# kill backend registration to avoid transport lost error | |
runner_kill_backend = ApplicationRunner( | |
url, realm, extra={"uri": uri, "queue": queue}, ssl=ssl_context | |
) | |
coro_kill_backend = runner_kill_backend.run(KillCalleesForURI, start_loop=False) | |
transport_kill_backend, protocol_kill_backend = await coro_kill_backend | |
session_ids = await queue.get() | |
async def test_minimal_rpc_tls(api_add, ssl_certificates): | |
"""Test the minimal RPC caller/callee setup using encrypted communication via TLS.""" | |
# data | |
url, realm = api_add | |
ssl_key_file, ssl_cert_file = ssl_certificates | |
uri = "com.example.add" | |
queue = Queue() | |
ssl_context = ssl.create_default_context(cafile=ssl_cert_file) | |
# run 3 calls against API | |
for _ in range(3): | |
x, y = randint(0, 100), randint(0, 100) | |
expected = x + y | |
# client | |
runner_client = ApplicationRunner( | |
url, | |
realm, | |
extra={"queue": queue, "x": x, "y": y, "uri": uri}, | |
ssl=ssl_context, | |
) | |
coro_client = runner_client.run(AdditionClient, start_loop=False) | |
transport_client, protocol_client = await coro_client | |
# wait till client has produced result and assert | |
result = await wait_for(queue.get(), 5) | |
assert result == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Test assumptions about the WAMP Session lifecycle. | |
SeeAlso: | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle | |
""" | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Queue | |
from asyncio import Event | |
from asyncio import wait_for | |
from asyncio import sleep | |
from typing import Tuple | |
from typing import Any | |
from typing import Dict | |
import pytest | |
from autobahn.wamp.types import CloseDetails | |
from autobahn.asyncio.websocket import WampWebSocketClientProtocol | |
from autobahn.wamp.types import SessionDetails | |
from autobahn.asyncio.wamp import ApplicationRunner | |
from autobahn.asyncio.component import Component | |
from asyncio_playground.wamp.meta.session_lifecycle import TypecheckingSession | |
from asyncio_playground.wamp.meta.session_lifecycle import LifecycleEvent | |
from asyncio_playground.wamp.meta.session_lifecycle import ( | |
SessionLifecycleTrackerSubclassAPI, | |
) | |
from asyncio_playground.wamp.meta.session_lifecycle import create_component | |
logger = getLogger(__name__) | |
@pytest.fixture | |
def crossbar_configuration(unused_tcp_port: int) -> Tuple[Dict[str, Any], str, str]: | |
"""Configuration dictionary of crossbar application.""" | |
port = unused_tcp_port | |
url = f"ws://localhost:{port}/ws" | |
realm = f"systemtests" | |
configuration = { | |
"version": 2, | |
"workers": [ | |
{ | |
"type": "router", | |
"realms": [ | |
{ | |
"name": realm, | |
"roles": [ | |
{ | |
"name": "anonymous", | |
"permissions": [ | |
{ | |
"uri": "", | |
"match": "prefix", | |
"allow": { | |
"call": True, | |
"register": True, | |
"publish": True, | |
"subscribe": True, | |
}, | |
"disclose": { | |
"caller": False, | |
"publisher": False, | |
}, | |
"cache": True, | |
}, | |
], | |
} | |
], | |
} | |
], | |
"transports": [ | |
{ | |
"type": "web", | |
"endpoint": {"type": "tcp", "port": port}, | |
"paths": { | |
"info": {"type": "nodeinfo"}, | |
"ws": {"type": "websocket"}, | |
}, | |
} | |
], | |
}, | |
], | |
} | |
return configuration, url, realm | |
async def test_session_lifecycle_subclassing_api_and_application_runner( | |
crossbar_router_responsive, | |
): | |
"""Test the WAMP session lifecycle events when running an ApplicationSession | |
via ApplicationRunner. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
expected = [ | |
LifecycleEvent.on_connect, | |
LifecycleEvent.on_join, | |
LifecycleEvent.on_ready, | |
LifecycleEvent.on_leave, | |
LifecycleEvent.on_disconnect, | |
] | |
event = Event() | |
queue = Queue() | |
# session | |
runner_session = ApplicationRunner( | |
url, realm, extra={"event": event, "queue": queue} | |
) | |
coro_session = runner_session.run( | |
SessionLifecycleTrackerSubclassAPI, start_loop=False | |
) | |
transport_session, protocol_session = await coro_session | |
await wait_for(event.wait(), 10) | |
# wait till client has produced result and assert | |
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))] | |
assert results == expected | |
async def test_session_lifecycle_subclassing_api_and_component_runner( | |
crossbar_router_responsive, | |
): | |
"""Test the WAMP session lifecycle events when running an ApplicationSession | |
via Component runner. The component runner is newer and generally preferred over | |
the older ApplicationRunner. However, it doesn't seem to matter so much whether | |
to write an ApplicationSession subclass and use the "session_factory" on a Component | |
or use Component decorators with a factory function approach. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
expected = [ | |
LifecycleEvent.on_connect, | |
LifecycleEvent.on_join, | |
LifecycleEvent.on_ready, | |
LifecycleEvent.on_leave, | |
LifecycleEvent.on_disconnect, | |
] | |
event = Event() | |
queue = Queue() | |
# session | |
component = Component( | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
session_factory=SessionLifecycleTrackerSubclassAPI, | |
extra={"event": event, "queue": queue}, | |
) | |
# we don't need an Event with components. start() will | |
# return when the component finished | |
await wait_for(component.start(), 10) | |
# wait till client has produced result and assert | |
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))] | |
assert results == expected | |
async def test_session_lifecycle_decorator_api_and_component_runner( | |
crossbar_router_responsive, | |
): | |
"""The decorator API seems to be the most modern and preferred one according to the | |
docs. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
expected = [ | |
LifecycleEvent.on_connect, | |
LifecycleEvent.on_join, | |
LifecycleEvent.on_ready, | |
LifecycleEvent.on_leave, | |
LifecycleEvent.on_disconnect, | |
] | |
queue = Queue() | |
# session | |
component = create_component( | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
extra={"queue": queue}, | |
) | |
# we don't need an Event with components. start() will | |
# return when the component finished | |
await wait_for(component.start(), 10) | |
# wait till client has produced result and assert | |
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))] | |
assert results == expected | |
async def test_session_lifecycle_decorator_additional_listeners( | |
crossbar_router_responsive, | |
): | |
"""It's possible to register multiple listeners for events. Let's test that multiple | |
listeners are invoked as expected. | |
""" | |
# data | |
url, realm = crossbar_router_responsive | |
expected = [ | |
LifecycleEvent.on_connect, | |
LifecycleEvent.on_connect, | |
LifecycleEvent.on_join, | |
LifecycleEvent.on_join, | |
LifecycleEvent.on_join, | |
LifecycleEvent.on_ready, | |
LifecycleEvent.on_leave, | |
LifecycleEvent.on_leave, | |
LifecycleEvent.on_disconnect, | |
LifecycleEvent.on_disconnect, | |
] | |
queue = Queue() | |
# additional listeners | |
async def onConnectAdditional( | |
session: TypecheckingSession, protocol: WampWebSocketClientProtocol | |
) -> None: | |
logger.warning(f"onConnectAdditional") | |
logger.warning(f"{session=}") | |
logger.warning(f"{protocol=}") | |
session.lifecycle_events.append(LifecycleEvent.on_connect) | |
async def onJoinAdditional( | |
session: TypecheckingSession, details: SessionDetails | |
) -> None: | |
logger.warning(f"onJoinAdditional") | |
logger.warning(f"{session=}") | |
logger.warning(f"{details=}") | |
await sleep(5) | |
session.lifecycle_events.append(LifecycleEvent.on_join) | |
async def onLeaveAdditional( | |
session: TypecheckingSession, details: CloseDetails | |
) -> None: | |
logger.warning(f"onLeaveAdditional") | |
logger.warning(f"{session=}") | |
logger.warning(f"{details=}") | |
session.lifecycle_events.append(LifecycleEvent.on_leave) | |
async def onDisconnectAdditional( | |
session: TypecheckingSession, *args, **kwargs | |
) -> None: | |
logger.warning(f"onDisconnectAdditional") | |
logger.warning(f"{session=}") | |
logger.warning(f"{args=}") | |
logger.warning(f"{kwargs=}") | |
event_ = LifecycleEvent.on_disconnect | |
session.lifecycle_events.append(event_) | |
await session.queue.put(event_) | |
# session | |
component = create_component( | |
additional_listener=[ | |
("connect", onConnectAdditional), | |
("join", onJoinAdditional), | |
("join", onJoinAdditional), | |
("leave", onLeaveAdditional), | |
("disconnect", onDisconnectAdditional), | |
], | |
transports=[ | |
{ | |
"url": url, | |
}, | |
], | |
realm=realm, | |
extra={"queue": queue}, | |
) | |
# we don't need an Event with components. start() will | |
# return when the component finished | |
await wait_for(component.start(), 10) | |
# wait till client has produced result and assert | |
results = [await wait_for(queue.get(), 5) for _ in range(len(expected))] | |
assert results == expected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
import ssl | |
from logging import getLogger | |
from pathlib import Path | |
from uuid import uuid4 | |
import pytest | |
from asyncio_playground.ssl.ssl_certificates import create_ssl_certificates | |
logger = getLogger(__name__) | |
async def test_create_ssl_certificates(tmp_path: Path): | |
"""Test the little helper function meant to provide self-signed ssl | |
certificates on the fly. | |
""" | |
# assert non-existence | |
directory = tmp_path / "ssl" | |
assert not directory.exists() | |
# create certificate | |
cn = f"{uuid4()}" | |
ssl_key_file, ssl_cert_file = create_ssl_certificates(directory, cn=cn) | |
# assert existence | |
assert directory.exists() | |
assert directory.is_dir() | |
assert ssl_key_file.is_file() | |
assert ssl_cert_file.is_file() | |
# assert certificate properties | |
# SeeAlso: https://docs.python.org/3/library/ssl.html#ssl.SSLContext.get_ca_certs | |
ssl_context = ssl.create_default_context(cafile=ssl_cert_file) | |
ca_cert = ssl_context.get_ca_certs()[0] | |
commonName = [ | |
value | |
for nested_tuple in ca_cert["subject"] | |
for key, value in nested_tuple | |
if key == "commonName" | |
][0] | |
assert commonName == cn |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
import ssl | |
import socket | |
from logging import getLogger | |
from asyncio import sleep | |
from typing import Tuple | |
from typing import Optional | |
from asyncio import wait_for | |
from asyncio import Event | |
from contextlib import closing | |
from typing import Awaitable | |
from itertools import count | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.asyncio.wamp import ApplicationRunner | |
from aiohttp import ClientSession | |
logger = getLogger(__name__) | |
def find_free_port(): | |
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: | |
s.bind(("", 0)) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
return s.getsockname()[1] | |
async def request( | |
method: str, url: str, ssl_context: Optional[ssl.SSLContext] = None | |
) -> Tuple[int, str]: | |
async with ClientSession() as session: | |
async with session.request(method, url, ssl=ssl_context) as response: | |
return response.status, await response.text() | |
async def wait_until_responsive(coro_factory, timeout, pause): | |
"""Wait until a service is responsive.""" | |
async def check_(): | |
for index in count(start=1): | |
logger.debug(f"Responsiveness check: {index}") | |
coro = coro_factory() | |
result = await coro | |
if result is True: | |
break | |
await sleep(pause) | |
try: | |
await wait_for(check_(), timeout) | |
except TimeoutError as exception_instance: | |
exception = RuntimeError(f"Timeout reached while waiting on service") | |
raise exception from exception_instance | |
class IsResponsiveHTTP: | |
def __init__( | |
self, method: str, url: str, ssl_context: Optional[ssl.SSLContext] = None | |
): | |
self.method = method | |
self.url = url | |
self.ssl_context = ssl_context | |
def __call__(self, *args, **kwargs) -> Awaitable: | |
"""Return a new coroutine when called.""" | |
return self.is_responsive() | |
async def is_responsive(self) -> bool: | |
try: | |
status, _ = await request(self.method, self.url, self.ssl_context) | |
except Exception: | |
return False | |
else: | |
return status == 200 | |
class IsResponsiveWebsocket: | |
def __init__( | |
self, url: str, realm: str, ssl_context: Optional[ssl.SSLContext] = None | |
): | |
self.url = url | |
self.realm = realm | |
self.ssl_context = ssl_context | |
self.event = Event() | |
def __call__(self, *args, **kwargs) -> Awaitable: | |
"""Return a new coroutine when called.""" | |
return self.is_responsive() | |
async def is_responsive(self) -> bool: | |
try: | |
logger.debug(f"is_responsive()") | |
runner_ping = ApplicationRunner( | |
self.url, self.realm, ssl=self.ssl_context, extra={"event": self.event} | |
) | |
coro_ping = runner_ping.run(Ping, start_loop=False) | |
transport_backend, protocol_backend = await coro_ping | |
await wait_for(self.event.wait(), 5) | |
except Exception as exception_instance: | |
logger.debug(f"No. {exception_instance=}") | |
return False | |
else: | |
logger.debug(f"Yes") | |
return True | |
class Ping(ApplicationSession): | |
"""Check if an address can be connected to.""" | |
async def onJoin(self, details): | |
logger.debug(f"onJoin({details})") | |
config = self.config | |
extra = config.extra | |
event: Event = extra["event"] | |
self.leave() | |
event.set() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals, print_function, division, absolute_import | |
from logging import getLogger | |
from asyncio import Event, wait_for | |
from enum import Enum | |
from typing import Callable | |
from typing import Optional | |
from typing import Any | |
from typing import List | |
from asyncio import Queue | |
from asyncio import Event | |
from autobahn.asyncio.wamp import ApplicationSession | |
from autobahn.asyncio.wamp import ApplicationRunner | |
from autobahn.wamp.request import Subscription | |
from .callee_introspection import KillSessions | |
logger = getLogger(__name__) | |
# wamp.registration.on_create | |
# { | |
# 'id': 4280645366663556, | |
# 'created': '2022-02-13T19:43:06.805Z', | |
# 'uri': 'com.example.add', | |
# 'match': 'exact', | |
# 'invoke': 'single' | |
# } | |
def passthrough(*args, **kwargs) -> bool: | |
logger.warning(f"passthrough({args=}, {kwargs=})") | |
return True | |
class FilterRegistrationCreateByURI: | |
def __init__(self, uri: str) -> None: | |
self.uri = uri | |
def __call__(self, *args: Any, **kwargs: Any) -> Any: | |
logger.warning(f"FilterRegistrationCreateByURI: __call__") | |
logger.warning(f"{args=}") | |
logger.warning(f"{kwargs=}") | |
# args have wrong length? | |
if len(args) != 2: | |
return False | |
id_, event = args | |
# event not a dict? | |
if not isinstance(event, dict): | |
return False | |
# no uri key? | |
if "uri" not in event: | |
return False | |
uri = event["uri"] | |
# uri doesn't match? | |
if uri != self.uri: | |
return False | |
return True | |
class WAMPMetaEvent(str, Enum): | |
# Meta Events for Sessions | |
wamp_session_on_join = "wamp.session.on_join" | |
wamp_session_on_leave = "wamp.session.on_leave" | |
# Meta Events for Subscriptions | |
wamp_subscription_on_create = "wamp.subscription.on_create" | |
wamp_subscription_on_subscribe = "wamp.subscription.on_subscribe" | |
wamp_subscription_on_unsubscribe = "wamp.subscription.on_unsubscribe" | |
wamp_subscription_on_delete = "wamp.subscription.on_delete" | |
# Meta Events for Registrations | |
wamp_registration_on_create = "wamp.registration.on_create" | |
wamp_registration_on_register = "wamp.registration.on_register" | |
wamp_registration_on_unregister = "wamp.registration.on_unregister" | |
wamp_registration_on_delete = "wamp.registration.on_delete" | |
# Meta Events for Schemas | |
wamp_schema_on_define = "wamp.schema.on_define" | |
wamp_schema_on_undefine = "wamp.schema.on_undefine" | |
class WaitForMetaEvent(ApplicationSession): | |
"""Wait for passed meta event possibly using additional filtering. Once a target | |
event that passes the optional filtering has happened, set the event, unsubscribe | |
and leave. | |
SeeAlso: | |
https://crossbar.io/docs/Session-Metaevents-and-Procedures/ | |
https://crossbar.io/docs/Registration-Meta-Events-and-Procedures/ | |
https://crossbar.io/docs/Subscription-Meta-Events-and-Procedures/ | |
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor-sessions.py | |
https://github.com/crossbario/crossbar-examples/blob/master/metaapi/monitor.html | |
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#session-lifecycle | |
""" | |
async def onJoin(self, details): | |
self.subscription_: Optional[Subscription] = None | |
config = self.config | |
extra = config.extra | |
meta_event: WAMPMetaEvent = extra["meta_event"] | |
event_subscription: Event = extra["event_subscription"] | |
event: Event = extra["event"] | |
queue: Queue = extra["queue"] | |
filterfunc: Callable[[Any], bool] = extra["filterfunc"] | |
meta_event_value: str = meta_event.value | |
async def on_meta_event(*args, **kwargs): | |
logger.warning(f"WaitForMetaEvent: on_meta_event()") | |
logger.warning(f"{args=}") | |
logger.warning(f"{kwargs=}") | |
# filterfunc not successful? wait more | |
logger.warning(f"WaitForMetaEvent: apply filterfunc") | |
if not filterfunc(*args): | |
logger.warning(f"WaitForMetaEvent: filterfunc false") | |
return | |
logger.warning(f"WaitForMetaEvent: finish wait") | |
await self.subscription_.unsubscribe() | |
await queue.put(args) | |
self.leave() | |
event.set() | |
# subscriptions | |
self.subscription_: Subscription = await self.subscribe( | |
on_meta_event, meta_event_value | |
) | |
event_subscription.set() | |
class BackendReadyWaiter: | |
"""Async. context manager that waits for the complete registration | |
of the RPC API registered in the enclosed context. | |
""" | |
def __init__( | |
self, | |
url: str, | |
realm: str, | |
meta_event: WAMPMetaEvent, | |
timeout: float = 20.0, | |
filterfunc: Callable[[Any], bool] = passthrough, | |
): | |
self.url = url | |
self.realm = realm | |
self.meta_event = meta_event | |
self.timeout = timeout | |
self.filterfunc = filterfunc | |
self._subscription: Optional[Subscription] = None | |
self.queue = Queue() | |
self.event_subscription = Event() | |
self.event = Event() | |
self.awaited_event: Optional[Any] = None | |
async def __aenter__(self): | |
logger.warning(f"BackendReadyWaiter: __aenter__") | |
url = self.url | |
realm = self.realm | |
meta_event = self.meta_event | |
event = self.event | |
event_subscription = self.event_subscription | |
queue = self.queue | |
timeout = self.timeout | |
filterfunc = self.filterfunc | |
# subscribe to meta event | |
runner_meta_event = ApplicationRunner( | |
url, | |
realm, | |
extra={ | |
"meta_event": meta_event, | |
"event": event, | |
"queue": queue, | |
"event_subscription": event_subscription, | |
"filterfunc": filterfunc, | |
}, | |
) | |
coro_backend = runner_meta_event.run(WaitForMetaEvent, start_loop=False) | |
transport_backend, protocol_backend = await coro_backend | |
# and make sure subscription happened before entering context | |
logger.warning(f"BackendReadyWaiter: wait for subscription") | |
await wait_for(event_subscription.wait(), timeout) | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
"""Runs upon leaving of context upon finishing or exception.""" | |
logger.warning(f"BackendReadyWaiter: __aexit__") | |
event = self.event | |
queue = self.queue | |
timeout = self.timeout | |
# exception occurred? bubble up | |
if exc_val: | |
return | |
# enclosed context ran without error, wait for meta event | |
logger.warning(f"BackendReadyWaiter: wait for event") | |
await wait_for(event.wait(), timeout) | |
logger.warning(f"BackendReadyWaiter: awaited event from queue") | |
self.awaited_event = await queue.get() | |
class SessionKiller: | |
"""Async. context manager that kills sessions for given ids on its | |
way out. | |
""" | |
def __init__( | |
self, | |
url: str, | |
realm: str, | |
session_ids: List[int], | |
timeout: float = 20.0, | |
): | |
self.url = url | |
self.realm = realm | |
self.session_ids = session_ids | |
self.timeout = timeout | |
self.event = Event() | |
async def __aenter__(self): | |
logger.warning(f"SessionKiller: __aenter__") | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
"""Runs upon leaving of context upon finishing or exception.""" | |
logger.warning(f"SessionKiller: __aexit__") | |
url = self.url | |
realm = self.realm | |
session_ids = self.session_ids | |
timeout = self.timeout | |
event = self.event | |
# exception occurred? bubble up | |
if exc_val: | |
return | |
# enclosed context ran without error, kill sessions | |
logger.warning(f"SessionKiller: kill sessions") | |
runner_kill_sessions = ApplicationRunner( | |
url, realm, extra={"session_ids": session_ids, "event": event} | |
) | |
coro_kill_sessions = runner_kill_sessions.run(KillSessions, start_loop=False) | |
transport_kill_sessions, protocol_kill_sessions = await coro_kill_sessions | |
logger.warning(f"SessionKiller: wait for event") | |
await wait_for(event.wait(), timeout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment