Skip to content

Instantly share code, notes, and snippets.

Last active Dec 19, 2019
What would you like to do?


Twisted tests will generally look like:

@twisted_utils.reacts  # marks the test as needing a reactor
# marks the function as a coroutine, a poor immitation of a Python3 async/def
# coroutine
def test_foo(fixtures)
    server = Server()
    assert (yield server.message("ping")) == "pong"
    assert (yield server.message("GET resource")) == "4"

If you see:

    def wrapper(**kwargs):
        __tracebackhide__ = True
>       assert func(**kwargs) is None
E       assert <Deferred at 0xdeadbeef> is None
E        +  where <Deferred at 0xdeadbeef> = <function test_foo at 0x>bbadbeef(**{})

this means the test completed without testing anything! Similar to not awaiting a task/coroutine, you need to add the @twisted_utils.reacts to run your test and extract the result

If you see a TwistedTestUtilError. See the associated advice in test.utils.twisted

  • why are there two decorators? the decorators do different things, twisted_utils.inlineCallbacks turns a coroutine into a function returning a Deferred, twisted_utils.reacts takes a function that returns a Deferred and extracts the result by running a twisted reactor. It's similar to the following from py3.4:

    def test_bad_because_legacy():
        Support for generator-based coroutines is deprecated and is scheduled
        for removal in Python 3.10.
        yield from asyncio.sleep(1)
  • why not pytest-twisted?

    • pytest-twisted runs one reactor for the entire test suite, this means sockets that are awaited, LoopingCalls and other timers persist for the whole test suite.
    • pytest-twisted pauses and resumes the reactor using a greenlet which defeats the entire purpose of using a reactor/event loop for explicit non-blocking IO.
    • pytest-twisted runs the reactor with installSignalHandlers=True this means that KeyboardInterrupt and the jenkins interrupt signal will be ignored and will not stop the suite.

Python 2 compatible async context management with CPS

when managing async contexts you need to await on all of the "enter", "body" and "exit" phases:

eg in python3 you can just create an async contextmanager:

def some_service():
    service = SomeService()
    await service.startService()
        yield service
        # service is now active only inside the context managed with `async with`
        await service.stopService()

async def test_foo():
    v = "spam"
    # nested contexts can be managed with a single `async with` statement
    async with some_service() as service, other_service(service) as other_service:
        await other_service.recvText(v)
        assert await service.recvText("foo") == v

however in Python2 we don't have async context managers so we can either use a blocking context manager with "pytest_twisted.blockon" or we need to build a context using continuation passing style (CPS):

def with_some_service(fn):
    service = SomeService()
    yield service.startService()
        defer.returnValue((yield fn(service)))
        # service is now active only inside the context managed by the continuation
        yield service.stopService()

def test_foo():
    v = "spam"
    def test(some_service, other_service):
        yield other_service.recvText(v)
        assert (yield service.recvText("foo")) == v

    # to manage nested contexts ...
    return with_some_service(
        # ... the context must be passed from one continuation ...
        lambda some_service: with_other_service(
            # ... to the next ...
            # ... all the way to the test
            fn=lambda other_service: test(some_service, other_service)
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import contextlib2
import decorator
import mock
from twisted.internet import defer, interfaces, main, task
from twisted.python import failure
from twisted.python.runtime import platform
from zope import interface as zope_interface
from zope.interface.interface import Attribute, Method
__all__ = ["inlineCallbacks", "reacts"]
def inlineCallbacks(fun, *args, **kw):
return defer.inlineCallbacks(fun)(*args, **kw)
class TwistedTestUtilError(Exception):
class UnexpectedStateError(TwistedTestUtilError):
Something totally unexpected happened.
There's no easy fix for this, as it should only be possible if mocking out
vital parts of twisted infrastucture.
class ReactorNotReentrantError(TwistedTestUtilError):
twisted_utils.reacts was used twice
def test_bad():
twisted_utils.reacts(lambda x: tx_sleep(10))
twisted_utils.reacts is not a magic tool to turn async io code into blocking
code, you may want reactor.blockingCallFromThread:
class ReactorNotReadyError(TwistedTestUtilError):
The test attempted to use the reactor, but the test was not decorated with
class AlreadyDoneError(TwistedTestUtilError):
The test was decorated with @twisted_utils.reacts, but didn't actually need
the reactor because it did no IO at all.
twisted inlineCallbacks coroutines and Deferreds are totally decoupled from
the reactor, allowing you to use them for completely synchronous code.
def test_bad():
yield defer.success(None)
In this test, instead, it could access the Deferred object's:
`d.result if d.called else die()`.
You should consider carefuly if the test is actually useful and the test
could instead perform and test a real IO action on a reactor.
class NoReactorUseError(TwistedTestUtilError):
The test was decorated with @twisted_utils.reacts, but didn't actually need
the reactor because it completed its deferred using some other mechanism.
See AlreadyDoneError.
def _default_reactor_factory():
if platform.isLinux():
from twisted.internet.epollreactor import EPollReactor as Reactor
except ImportError:
from twisted.internet.pollreactor import PollReactor as Reactor
elif platform.getType() == "posix" and not platform.isMacOSX():
from twisted.internet.pollreactor import PollReactor as Reactor
from twisted.internet.selectreactor import SelectReactor as Reactor
except ImportError:
from twisted.internet.selectreactor import SelectReactor as Reactor
return Reactor
class _ReactorProxy(object):
def __init__(self, get_reactor):
self.__get_reactor = get_reactor
def setup(cls):
def instrument(name):
def do(self, *args, **kwargs):
return getattr(self.__get_reactor(), name)(*args, **kwargs)
return do
def makeprop(name):
def set_(self, attr):
setattr(self.__get_reactor(), name, attr)
def get(self):
return getattr(self.__get_reactor(), name)
return property(get, set_)
for iname, interface in vars(interfaces).items():
if not iname.startswith("IReactor"):
for attr_name, attr_type in interface.namesAndDescriptions(True):
if isinstance(attr_type, Method):
setattr(cls, attr_name, instrument(attr_name))
elif isinstance(attr_type, Attribute): # Method inherits Attribute
setattr(cls, attr_name, makeprop(attr_name))
_setup_reactor_proxy = _ReactorProxy.setup
del _ReactorProxy.setup
class _ReactorProxyController(object):
def __init__(self):
self.__called = False
self.__reactor = None
self.__proxy = None
def __get_reactor(self):
if self.__reactor is None:
raise ReactorNotReadyError("Reactor not ready yet")
self.__called = True
return self.__reactor
def install(self):
if self.__proxy:
self.__proxy = _ReactorProxy(self.__get_reactor)
def reactor(self, factory=None):
with contextlib2.ExitStack() as stack:
if self.__reactor is not None:
raise ReactorNotReentrantError("reactor() context is not reentrant")
reactor = self.__reactor = (factory or _default_reactor_factory())()
def unset():
self.__called = False
self.__reactor = None
if getattr(reactor, "running", False):
raise UnexpectedStateError("Reactor still running")
self.__proxy, zope_interface.providedBy(reactor),
def reset_zope():
del self.__proxy.__provides__
yield reactor
def run_until_complete(self, fn):
with self.reactor() as reactor, mock.patch(
# twisted clears the traceback information from Failures as they
# are likely to cause reference cycles.
# However in tests this information is too valuable to delete, so
# we disable the method.
new=lambda _: None,
started = defer.Deferred()
def go():
yield started
defer.returnValue((yield fn()))
d = go()
reactor.callWhenRunning(started.callback, None)
if not d.called:
raise UnexpectedStateError("deferred never completed")
result = d.result
if isinstance(result, failure.Failure):
if not self.__called:
raise NoReactorUseError("didn't use reactor")
return result
_reactor_proxy_controller = _ReactorProxyController()
install = _reactor_proxy_controller.install
def clock():
with _reactor_proxy_controller.reactor(task.Clock) as c:
yield c
assert c.calls == []
def reacts(fn, *args, **kwargs):
def dfn():
v = fn(*args, **kwargs)
if not isinstance(v, defer.Deferred) or (
v.called and not isinstance(v.result, failure.Failure)
raise AlreadyDoneError(
"fn {fn!r} must return a pending or failed deferred got {v!r}".format(
fn=fn, v=v
defer.returnValue((yield v))
return _reactor_proxy_controller.run_until_complete(dfn)
Copy link

graingert commented Dec 19, 2019

not we can't use zope.proxy because methods of the reactor are cached, and zope.proxy only proxies the attribute:

> p = zope_proxy(o)
> assert p.method is o.method

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment