Skip to content

Instantly share code, notes, and snippets.

@linw1995
Last active December 28, 2018 13:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linw1995/81abfda02b369493c5fee3f4ff1610c7 to your computer and use it in GitHub Desktop.
Save linw1995/81abfda02b369493c5fee3f4ff1610c7 to your computer and use it in GitHub Desktop.
Run Twisted in another thread
import socket
import threading
import pytest
from twisted.application import internet
from twisted.internet import reactor as twisted_reactor
from twisted.internet.protocol import Factory, Protocol
@pytest.fixture(scope="session", autouse=True)
def reactor():
t = threading.Thread(
target=twisted_reactor.run,
kwargs={"installSignalHandlers": False},
daemon=True,
)
t.start()
yield twisted_reactor
twisted_reactor.callFromThread(twisted_reactor.stop)
t.join()
class Echo(Protocol):
def dataReceived(self, data):
self.transport.write(data)
class EchoFactory(Factory):
def buildProtocol(self, addr):
return Echo()
@pytest.fixture
def get_unused_port():
def unused_port_getter():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
addr = s.getsockname()
s.close()
return addr[1]
return unused_port_getter
@pytest.fixture
def port(get_unused_port):
return get_unused_port()
@pytest.fixture
def echo_service(reactor, port):
echo_service = internet.TCPServer(port, EchoFactory())
reactor.callFromThread(echo_service.startService)
started = threading.Event()
def wait_started():
if echo_service.running:
started.set()
else:
reactor.callLater(0.1, wait_started)
reactor.callFromThread(wait_started)
started.wait()
yield echo_service
stoped = threading.Event()
def wait_stoped():
echo_service.stopService().addBoth(lambda _: stoped.set())
reactor.callFromThread(wait_stoped)
stoped.wait()
def test_echo_service(echo_service, port):
conn = socket.create_connection(("", port))
msg = b"abcdefg"
conn.send(msg)
assert msg == conn.recv(len(msg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment