Run Twisted in another thread
import socket | |
import threading | |
import pytest | |
from twisted.application import internet | |
from twisted.internet import reactor as twisted_reactor | |
from twisted.internet.protocol import Factory, Protocol | |
@pytest.fixture(scope="session", autouse=True) | |
def reactor(): | |
t = threading.Thread( | |
target=twisted_reactor.run, | |
kwargs={"installSignalHandlers": False}, | |
daemon=True, | |
) | |
t.start() | |
yield twisted_reactor | |
twisted_reactor.callFromThread(twisted_reactor.stop) | |
t.join() | |
class Echo(Protocol): | |
def dataReceived(self, data): | |
self.transport.write(data) | |
class EchoFactory(Factory): | |
def buildProtocol(self, addr): | |
return Echo() | |
@pytest.fixture | |
def get_unused_port(): | |
def unused_port_getter(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(("", 0)) | |
addr = s.getsockname() | |
s.close() | |
return addr[1] | |
return unused_port_getter | |
@pytest.fixture | |
def port(get_unused_port): | |
return get_unused_port() | |
@pytest.fixture | |
def echo_service(reactor, port): | |
echo_service = internet.TCPServer(port, EchoFactory()) | |
reactor.callFromThread(echo_service.startService) | |
started = threading.Event() | |
def wait_started(): | |
if echo_service.running: | |
started.set() | |
else: | |
reactor.callLater(0.1, wait_started) | |
reactor.callFromThread(wait_started) | |
started.wait() | |
yield echo_service | |
stoped = threading.Event() | |
def wait_stoped(): | |
echo_service.stopService().addBoth(lambda _: stoped.set()) | |
reactor.callFromThread(wait_stoped) | |
stoped.wait() | |
def test_echo_service(echo_service, port): | |
conn = socket.create_connection(("", port)) | |
msg = b"abcdefg" | |
conn.send(msg) | |
assert msg == conn.recv(len(msg)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment