Last active
March 28, 2018 19:23
-
-
Save rdebroiz/1854c2bab7da929e376c273d262c805a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# usage: python autobahn_start_AS_two_times.py host port realm | |
import asyncio | |
import sys | |
from autobahn.asyncio.wamp import ApplicationRunner, ApplicationSession | |
class CheckConnectionSession(ApplicationSession): | |
""" | |
A dummy WAMP Session which disconnect from the server as soon as it joins the | |
configured realm. | |
""" | |
async def onJoin(self, details): | |
self.leave() | |
self.disconnect() | |
async def onDisconnect(self): | |
loop = asyncio.get_event_loop() | |
loop.stop() | |
def get_wamp_url(host, port): | |
""" | |
:param host: WAMP server host | |
:type host: str | |
:param port: WAMP server port | |
:type port: int | |
:return: A url to use with ``Autobahn|Python`` | |
""" | |
return f"ws://{host}:{port}/ws" | |
if __name__ == '__main__': | |
url = get_wamp_url(sys.argv[1], sys.argv[2]) | |
realm = sys.argv[3] | |
runner = ApplicationRunner(url, realm) | |
loop = asyncio.get_event_loop() | |
runner.run(CheckConnectionSession) | |
loop.close() | |
# Uncomment to make it works | |
# if loop.is_closed(): | |
# asyncio.set_event_loop(asyncio.new_event_loop()) | |
runner.run(CheckConnectionSession) | |
loop.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment