Created
August 10, 2015 22:43
-
-
Save iffy/60399198de1fb16f6112 to your computer and use it in GitHub Desktop.
Question: How can I get at the `ApplicationSession` instance created through `ApplicationRunner` without polling the private `_session` attribute of the protocol?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ trial ~/Desktop/sample.py | |
sample | |
MyTest | |
test_stuff ... polling :( | |
[OK] | |
------------------------------------------------------------------------------- | |
Ran 1 tests in 0.169s | |
PASSED (successes=1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner | |
from twisted.internet import defer, reactor | |
from twisted.trial.unittest import TestCase | |
import os | |
class MyApplication(object): | |
""" | |
My application code/business logic. | |
""" | |
def __init__(self, session): | |
self._session = session | |
@defer.inlineCallbacks | |
def registerStuff(self): | |
yield self._session.register(self.doThing, 'com.test.doThing') | |
def doThing(self, thing): | |
return thing + ' done' | |
class ComposableSession(ApplicationSession): | |
""" | |
A session that's easy to hand around. See my `opened` and `joined` | |
attributes. | |
""" | |
def __init__(self, *args, **kwargs): | |
ApplicationSession.__init__(self, *args, **kwargs) | |
self.joined = MultiDeferred() | |
@defer.inlineCallbacks | |
def onJoin(self, details): | |
ret = yield ApplicationSession.onJoin(self, details) | |
self.joined.callback(details) | |
defer.returnValue(ret) | |
class MultiDeferred(object): | |
""" | |
Utility for signalling multiple, indepdent things about events. | |
""" | |
def __init__(self): | |
self._outstanding = [] | |
self._result = None | |
self._is_error = False | |
def __call__(self): | |
d = defer.Deferred() | |
if self._result: | |
a, kw = self._result | |
if self._is_error: | |
d.errback(*a, **kw) | |
else: | |
d.callback(*a, **kw) | |
self._outstanding.append(d) | |
return d | |
def callback(self, *args, **kwargs): | |
self._result = (args, kwargs) | |
for d in self._outstanding: | |
d.callback(*args, **kwargs) | |
def errback(self, *args, **kwargs): | |
self._result = (args, kwargs) | |
self._is_error = True | |
for d in self._outstanding: | |
d.errback(*args, **kwargs) | |
#---------------------------------------------------------------------------- | |
# Testing stuff | |
#---------------------------------------------------------------------------- | |
def runComponent(testcase, component_class, extra={}): | |
""" | |
Run a component for the duration of the test. | |
""" | |
url = os.environ['CROSSBAR_URL'] | |
realm = u'realm1' | |
runner = ApplicationRunner(url=url, realm=realm, extra=extra) | |
d = runner.run(component_class, start_reactor=False) | |
@defer.inlineCallbacks | |
def stopThings(proto): | |
if proto.openHandshakeTimeoutCall: | |
yield proto.openHandshakeTimeoutCall.cancel() | |
if proto.transport: | |
yield proto.transport.loseConnection() | |
def getProto(x): | |
_, (_, func, args, kwargs) = x | |
proto = args[0] | |
testcase.addCleanup(stopThings, proto) | |
return proto | |
d.addCallback(getProto) | |
return d | |
def runFunctionWithSession(testcase, func, *args, **kwargs): | |
""" | |
Run a function that accepts a joined ApplicationSession instance | |
as its first argument. Return the result as a Deferred. | |
""" | |
result = defer.Deferred() | |
class RunSession(ApplicationSession): | |
def onJoin(self, details): | |
defer.maybeDeferred(func, self, *args, **kwargs)\ | |
.chainDeferred(result) | |
runComponent(testcase, RunSession) | |
return result | |
class MyTest(TestCase): | |
timeout = 1 | |
@defer.inlineCallbacks | |
def test_stuff(self): | |
protocol = yield runComponent(self, ComposableSession) | |
# poll for session :( | |
d_session = defer.Deferred() | |
poll_interval = 0.1 | |
def pollForSession(protocol, callback): | |
print 'polling :(' | |
session = getattr(protocol, '_session', None) | |
if session: | |
callback(session) | |
else: | |
reactor.callLater(poll_interval, pollForSession, protocol, callback) | |
reactor.callLater(poll_interval, pollForSession, protocol, d_session.callback) | |
session = yield d_session | |
# Use connected session with application | |
app = MyApplication(session) | |
yield session.joined() | |
yield app.registerStuff() | |
# test it | |
@defer.inlineCallbacks | |
def testit(session): | |
result = yield session.call('com.test.doThing', 'hi') | |
self.assertEqual(result, 'hi done') | |
yield runFunctionWithSession(self, testit) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment