Last active
December 28, 2018 13:44
-
-
Save linw1995/81abfda02b369493c5fee3f4ff1610c7 to your computer and use it in GitHub Desktop.
Run Twisted in another thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
import pytest | |
from twisted.application import internet | |
from twisted.internet import reactor as twisted_reactor | |
from twisted.internet.protocol import Factory, Protocol | |
@pytest.fixture(scope="session", autouse=True) | |
def reactor(): | |
t = threading.Thread( | |
target=twisted_reactor.run, | |
kwargs={"installSignalHandlers": False}, | |
daemon=True, | |
) | |
t.start() | |
yield twisted_reactor | |
twisted_reactor.callFromThread(twisted_reactor.stop) | |
t.join() | |
class Echo(Protocol): | |
def dataReceived(self, data): | |
self.transport.write(data) | |
class EchoFactory(Factory): | |
def buildProtocol(self, addr): | |
return Echo() | |
@pytest.fixture | |
def get_unused_port(): | |
def unused_port_getter(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind(("", 0)) | |
addr = s.getsockname() | |
s.close() | |
return addr[1] | |
return unused_port_getter | |
@pytest.fixture | |
def port(get_unused_port): | |
return get_unused_port() | |
@pytest.fixture | |
def echo_service(reactor, port): | |
echo_service = internet.TCPServer(port, EchoFactory()) | |
reactor.callFromThread(echo_service.startService) | |
started = threading.Event() | |
def wait_started(): | |
if echo_service.running: | |
started.set() | |
else: | |
reactor.callLater(0.1, wait_started) | |
reactor.callFromThread(wait_started) | |
started.wait() | |
yield echo_service | |
stoped = threading.Event() | |
def wait_stoped(): | |
echo_service.stopService().addBoth(lambda _: stoped.set()) | |
reactor.callFromThread(wait_stoped) | |
stoped.wait() | |
def test_echo_service(echo_service, port): | |
conn = socket.create_connection(("", port)) | |
msg = b"abcdefg" | |
conn.send(msg) | |
assert msg == conn.recv(len(msg)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment