Last active
August 29, 2015 13:57
-
-
Save mathuin/9516017 to your computer and use it in GitHub Desktop.
Trying to run this test against this code. I get an error about the reactor being unclean. What does that mean in this context?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet.endpoints import TCP4ServerEndpoint | |
from twisted.internet.error import CannotListenError | |
from twisted.internet.protocol import Protocol, Factory | |
from twisted.internet import reactor | |
import simplejson as json | |
# Ports available for proxy purposes. | |
low_port = 5000 | |
high_port = 5100 | |
class EchoFactory(Factory): | |
def __init__(self, proxy): | |
self.proxy = proxy | |
def buildProtocol(self, addr): | |
return Echo(self) | |
class Echo(Protocol): | |
def __init__(self, factory): | |
self.factory = factory | |
def connectionMade(self): | |
self.transport.write(self.factory.proxy.output) | |
self.transport.loseConnection() | |
def connectionLost(self, reason): | |
self.factory.proxy.open_port.stopListening() | |
self.factory.proxy.ports.append(self.factory.proxy.dport) | |
class ProxyFactory(Factory): | |
def buildProtocol(self, addr): | |
return Proxy(self) | |
class Proxy(Protocol): | |
ports = [x for x in xrange(low_port, high_port)] | |
in_use = [] | |
def __init__(self, factory): | |
self.dport = None | |
self.factory = factory | |
def connectionMade(self): | |
while not self.dport: | |
try: | |
self.dport = Proxy.ports.pop() | |
except IndexError: | |
self.transport.write("No ports available!") | |
self.transport.abortConnection() | |
try: | |
self.endpoint = TCP4ServerEndpoint(reactor, self.dport, | |
backlog=1) | |
except CannotListenError: | |
self.dport = None | |
def dataReceived(self, data): | |
self.output = data | |
# abort connection if json is invalid? | |
self.transport.write(str(self.dport)) | |
self.endpoint.listen(EchoFactory(self)).addCallback(self.set_open_port) | |
self.transport.loseConnection() | |
def set_open_port(self, result): | |
self.open_port = result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from proxyjson_server import ProxyFactory, EchoFactory | |
from twisted.trial import unittest | |
from twisted.test import proto_helpers | |
from mock import patch, MagicMock | |
class ProxyJsonFactoryTestCase(unittest.TestCase): | |
def setUp(self): | |
factory = ProxyFactory() | |
self.proto = factory.buildProtocol(('127.0.0.1', 0)) | |
self.tr = proto_helpers.StringTransport() | |
self.proto.makeConnection(self.tr) | |
# The following test mocks up a connection to the port returned by | |
# the proxy. The object that listens for the connection has the | |
# correct return value in it, so we just check for that value. | |
@patch('proxyjson_server.Echo') | |
def test_proxyjson(self, proxy_echo): | |
instring = 'hi' | |
self.proto.dataReceived(instring) | |
inport = int(self.tr.value()) | |
echof = EchoFactory(self.proto) | |
self.echop = echof.buildProtocol(('127.0.0.1', inport)) | |
args, kwargs = proxy_echo.call_args | |
self.assertEqual(args[0].proxy.output, instring) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(twistedauthproxy)jmt@cyan:~/crazy/twistedauthproxy$ trial test.test_proxyjson | |
test.test_proxyjson | |
ProxyJsonFactoryTestCase | |
test_proxyjson ... [OK] | |
[ERROR] | |
=============================================================================== | |
[ERROR] | |
Traceback (most recent call last): | |
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean. | |
Selectables: | |
<<class 'twisted.internet.tcp.Port'> of proxyjson_server.EchoFactory on 5099> | |
test.test_proxyjson.ProxyJsonFactoryTestCase.test_proxyjson | |
------------------------------------------------------------------------------- | |
Ran 1 tests in 0.007s | |
FAILED (errors=1, successes=1) | |
(twistedauthproxy)jmt@cyan:~/crazy/twistedauthproxy$ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment