Instantly share code, notes, and snippets.

@linw1995 /test.py
Last active Dec 28, 2018

Embed
What would you like to do?
Run Twisted in another thread
import socket
import threading
import pytest
from twisted.application import internet
from twisted.internet import reactor as twisted_reactor
from twisted.internet.protocol import Factory, Protocol
@pytest.fixture(scope="session", autouse=True)
def reactor():
t = threading.Thread(
target=twisted_reactor.run,
kwargs={"installSignalHandlers": False},
daemon=True,
)
t.start()
yield twisted_reactor
twisted_reactor.callFromThread(twisted_reactor.stop)
t.join()
class Echo(Protocol):
def dataReceived(self, data):
self.transport.write(data)
class EchoFactory(Factory):
def buildProtocol(self, addr):
return Echo()
@pytest.fixture
def get_unused_port():
def unused_port_getter():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
addr = s.getsockname()
s.close()
return addr[1]
return unused_port_getter
@pytest.fixture
def port(get_unused_port):
return get_unused_port()
@pytest.fixture
def echo_service(reactor, port):
echo_service = internet.TCPServer(port, EchoFactory())
reactor.callFromThread(echo_service.startService)
started = threading.Event()
def wait_started():
if echo_service.running:
started.set()
else:
reactor.callLater(0.1, wait_started)
reactor.callFromThread(wait_started)
started.wait()
yield echo_service
stoped = threading.Event()
def wait_stoped():
echo_service.stopService().addBoth(lambda _: stoped.set())
reactor.callFromThread(wait_stoped)
stoped.wait()
def test_echo_service(echo_service, port):
conn = socket.create_connection(("", port))
msg = b"abcdefg"
conn.send(msg)
assert msg == conn.recv(len(msg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment