Skip to content

Instantly share code, notes, and snippets.

@FirefighterBlu3
Created January 19, 2017 23:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FirefighterBlu3/d3fd3f51fcc8d7ba9c24cbe06d22cabf to your computer and use it in GitHub Desktop.
Save FirefighterBlu3/d3fd3f51fcc8d7ba9c24cbe06d22cabf to your computer and use it in GitHub Desktop.
free threaded WAMP client that runs separate from any main process
#!/usr/bin/env python
__version__ = '1.5'
__released__ = '2016-Jul-6 12:50A EST'
__author__ = 'david@blue-labs.org'
__license__ = 'Apache 2.0'
import asyncio
import configparser
import sys
import time
import txaio
import signal
import socket
import logging
import threading
import functools
import types
import datetime
import traceback
import warnings
def warn_with_traceback(message, category, filename, lineno, file=None, line=None):
traceback.print_stack()
log = file if hasattr(file,'write') else sys.stderr
log.write(warnings.formatwarning(message, category, filename, lineno, line))
warnings.showwarning = warn_with_traceback
# 3rd party aio library for asyncio friendly Queue
# https://github.com/aio-libs/janus.git
import janus
from autobahn.websocket.util import parse_url
from autobahn.asyncio.websocket import WampWebSocketClientFactory
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.wamp.types import (ComponentConfig, SessionDetails, EventDetails, CallDetails, PublishOptions, CallOptions,
CloseDetails, Challenge, SubscribeOptions, RegisterOptions)
from concurrent.futures import CancelledError
from concurrent.futures import ProcessPoolExecutor
class _Component(ApplicationSession):
close_reason = None
_shutdown = False
_loop = None
def __init__(self, realm:str, extra:dict, loop, join_future:asyncio.Future):
super().__init__(ComponentConfig(realm, extra))
self.__join_future = join_future
self.event_loop = loop
self.log = logging.getLogger()
def onConnect(self):
self.close_reason = None
self.cfg = self.config.extra['cfg']
realm = self.cfg.get('wamp', 'realm')
authid = self.cfg.get('wamp', 'cli username')
self.log.debug("\x1b[1;34mClientSession connected: Joining realm <{}> under authid <{}>\x1b[0m".format(realm, authid))
self.join(realm, ['ticket'], authid)
def onChallenge(self, challenge):
self.log.debug("ClientSession received: {}".format(challenge))
if challenge.method == 'ticket':
return self.cfg.get('wamp','cli password')
else:
raise Exception("Invalid authmethod {}".format(challenge.method))
@asyncio.coroutine
def onJoin(self, details:SessionDetails):
self.log.debug('onJoin')
self.__join_future.set_result(details)
self.__join_future = None
def _publish_msg(self, channel, message):
try:
if not self.is_attached():
self.log.warning('lost session with router')
else:
self.publish(channel, message)
except Exception as e:
self.log.critical(traceback.format_exc())
def onLeave(self, details):
# also, autobahn-python's .leave() method doesn't
# propagate the reason or log_message, #todo, file bug
super().onLeave(details)
self.close_reason = details.reason
if not self.close_reason in ('wamp.close.logout','wamp.close.normal'):
print('unexpected communication loss from router:',self.close_reason)
self.event_loop.stop()
def onDisconnect(self, *args, **kwargs):
super().onDisconnect()
class WampClient():
# we won't use the built in ApplicationRunner as we need to be able
# to intersperse within the loop
def __init__(self, id=None):
self.log = logging.getLogger()
self.extra={}
self.extra['cfg'] = configparser.ConfigParser()
self.extra['cfg'].read('/var/bluelabs/etc/hive.conf')
self.session = None
self.session_details = None
self.wamp_eventloop = None
self.starttime = datetime.datetime.now(tz=datetime.timezone.utc)
self.wamp_established = False
self.last_publish_ts = None
self._shutdown = False
if not id:
self.id = socket.getfqdn()
exit_ = False
for k in ('__version__','irl','realm','cli username','cli password','join timeout'):
if not k in self.extra['cfg']['wamp']:
exit_ = True
print("section [wamp]; required config option '{}' not found".format(k))
if exit_:
raise KeyError('missing required config values')
self.realm = self.extra['cfg'].get('wamp','realm')
self.irl = self.extra['cfg'].get('wamp','irl')
self.client_version = self.extra['cfg'].get('wamp','__version__')
self.join_timeout = int(self.extra['cfg'].get('wamp','join timeout'))
self.loop = asyncio.get_event_loop()
self.q = janus.Queue(loop=self.loop)
tA = self.loop.run_in_executor(None, self.log_pusher, self.q.sync_q)
tB = self.loop.run_in_executor(None, self.app)
def log_pusher(self, queue):
''' this runs in its own thread so the main process runs freely as does
the WAMP client (freely: within the context of python threading)
'''
waits = 0
while True:
while not self.session:
self.log.debug('not connected to WAMP router')
time.sleep(.25)
timeout = waits < 30 and .1 or 10
while True:
try:
c,w,p,m = queue.get(timeout=timeout)
args=c,{'v':self.client_version, 'ts':w.strftime('%F %T +0000'),'priority':p,'msg':m}
self.session._publish_msg(*args)
self.last_publish_ts = datetime.datetime.now(tz=datetime.timezone.utc)
queue.task_done()
waits=0
except janus.SyncQueueEmpty:
waits += 1
break
except Exception as e:
log.critical('queue error: {}'.format(e))
break
if self._shutdown:
break
def publish(self, message=None, channel=None, priority=None):
if message:
if not channel:
channel = 'hive.system.debug'
if not priority:
priority = 'info'
ts = datetime.datetime.now(tz=datetime.timezone.utc)
self.loop.run_until_complete(self.q.async_q.put((channel,ts,priority,message)))
def app(self) -> None:
"""
We use our own ApplicationRunner here which is almost an identical copy of
wamp.ApplicationRunner. The difference being that we need to:
a) explicitly get a new asyncio event loop because we aren't running
in the main thread - we'll get a RuntimeError: There is no current
event loop in thread <thread name>, and
b) don't set a signal handler for SIGTERM, also because we're not
running in the main thread
"""
#txaio.start_logging(level='debug')
isSecure, host, port, resource, path, params = parse_url(self.irl)
ssl = True
serializers = None
loop = txaio.config.loop = asyncio.new_event_loop()
self.wamp_eventloop = loop
asyncio.set_event_loop(loop)
async def fuck(loop):
while True:
self.log.debug('Connecting to router ')
join_future = asyncio.Future()
session_factory = functools.partial(_Component, self.realm, self.extra, loop, join_future)
transport_factory = WampWebSocketClientFactory(
session_factory, url=self.irl, serializers=serializers, loop=loop)
transport, protocol = await loop.create_connection(
transport_factory, host, port, ssl=ssl)
try:
# Connection established; wait for onJoin to finish
self.session_details = await asyncio.wait_for(join_future, timeout=3.0, loop=loop)
self.session = protocol._session
break
except (asyncio.TimeoutError,):
self.log.warning('router connection timeout')
# absorb the concurrent.futures._base.CancelledError error
try:
await asyncio.wait([join_future])
except Exception as e:
self.log.critical('unexpected error while connecting to router: {}'.format(e))
transport.close()
continue
except CancelledError:
try:
await asyncio.wait([join_future])
except Exception as e:
self.log.critical('unexpected error while connecting to router: {}'.format(e))
break
except Exception as e:
self.log.critical(traceback.format_exc())
transport.close()
break
while True:
self.session = None
tasks = [ asyncio.ensure_future(fuck(loop)), ]
try:
loop.run_until_complete(asyncio.wait(tasks))
self.wamp_established = True
except CancelledError:
break
except Exception as e:
self.log.critical('unexpected error while connecting to router: {} {}**'.format(e.__class__, e))
break
try:
loop.run_forever()
if self.session.close_reason:
self.log.critical('session close reason: {}'.format(self.session.close_reason))
except Exception as e:
self.log.critical(traceback.format_exc())
if self.session.close_reason == 'wamp.close.transport_lost':
continue
break
# cleanup
try:
loop.run_until_complete(asyncio.wait(tasks))
except:
pass
if self.wamp_eventloop.is_running():
self.wamp_eventloop.stop()
self.wamp_eventloop.close()
def shutdown(self):
now = datetime.datetime.now(tz=datetime.timezone.utc)
whence = self.starttime + datetime.timedelta(seconds=self.join_timeout)
self._shutdown = True
if not self.q.async_q.empty():
if not self.wamp_established and now < whence:
self.log.debug('waiting for publish queue to be sent')
# wait up to 10 seconds for client to connect
while datetime.datetime.now(tz=datetime.timezone.utc) < whence:
time.sleep(.5)
if self.wamp_established:
self.log.debug('session joined, publishing')
# wait a few more seconds
while not self.q.async_q.empty():
time.sleep(.5)
break
if not self.wamp_established:
self.log.warning('WAMP session still not established, giving up')
else:
self.log.warning('publish queue unable to be sent within timeout, losing messages')
# pause for at least one second to let the provider get our message(s)
# and query the router about our session details before we leave and
# the session details vanish (bug? crossbar should cache them)
now = datetime.datetime.now(tz=datetime.timezone.utc)
if self.last_publish_ts and now - self.last_publish_ts < datetime.timedelta(seconds=1):
time.sleep(1)
if self.session:
self.session.leave('wamp.close.logout', 'logged out')
tasks = asyncio.Task.all_tasks(loop=self.wamp_eventloop)
for t in tasks:
if not t.done():
self.wamp_eventloop.call_soon_threadsafe(t.cancel)
# how do we a) test if messages are still in queue or b) understand that
# we do have messages in queue but can't send them? or c) we ran so fast
# that we haven't even connected to the router yet?
self.wamp_eventloop.stop()
if __name__ == '__main__':
logging.basicConfig()
#logging.captureWarnings(True)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
log.debug('\x1b[1;37m'+'='*80+'\x1b[0m')
try:
wc = WampClient()
for n in range(10):
wc.publish('test message with lots of foo diddly doo hum etc so we can make a really long long long long line that is intended to wrap. oohhhh, almost. just a few more words ought to do it! {}'.format(n))
except KeyboardInterrupt:
print()
except Exception as e:
log.critical(traceback.format_exc())
finally:
log.debug('shutting down client')
try:
wc.shutdown()
except Exception as e:
log.critical(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment