-
-
Save kelciour/8186c66d891f20866fa24053bc074aca to your computer and use it in GitHub Desktop.
waitress
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import errno | |
import socket | |
import unittest | |
dummy_app = object() | |
class TestWSGIServer(unittest.TestCase): | |
def _makeOne( | |
self, | |
application=dummy_app, | |
host="127.0.0.1", | |
port=0, | |
_dispatcher=None, | |
adj=None, | |
map=None, | |
_start=True, | |
_sock=None, | |
_server=None, | |
): | |
from waitress.server import create_server | |
self.inst = create_server( | |
application, | |
host=host, | |
port=port, | |
map=map, | |
_dispatcher=_dispatcher, | |
_start=_start, | |
_sock=_sock, | |
) | |
return self.inst | |
def _makeOneWithMap( | |
self, adj=None, _start=True, host="127.0.0.1", port=0, app=dummy_app | |
): | |
sock = DummySock() | |
task_dispatcher = DummyTaskDispatcher() | |
map = {} | |
return self._makeOne( | |
app, | |
host=host, | |
port=port, | |
map=map, | |
_sock=sock, | |
_dispatcher=task_dispatcher, | |
_start=_start, | |
) | |
def _makeOneWithMulti( | |
self, adj=None, _start=True, app=dummy_app, listen="127.0.0.1:0 127.0.0.1:0" | |
): | |
sock = DummySock() | |
task_dispatcher = DummyTaskDispatcher() | |
map = {} | |
from waitress.server import create_server | |
self.inst = create_server( | |
app, | |
listen=listen, | |
map=map, | |
_dispatcher=task_dispatcher, | |
_start=_start, | |
_sock=sock, | |
) | |
return self.inst | |
def _makeWithSockets( | |
self, | |
application=dummy_app, | |
_dispatcher=None, | |
map=None, | |
_start=True, | |
_sock=None, | |
_server=None, | |
sockets=None, | |
): | |
from waitress.server import create_server | |
_sockets = [] | |
if sockets is not None: | |
_sockets = sockets | |
self.inst = create_server( | |
application, | |
map=map, | |
_dispatcher=_dispatcher, | |
_start=_start, | |
_sock=_sock, | |
sockets=_sockets, | |
) | |
return self.inst | |
def tearDown(self): | |
if self.inst is not None: | |
self.inst.close() | |
def test_ctor_app_is_None(self): | |
self.inst = None | |
self.assertRaises(ValueError, self._makeOneWithMap, app=None) | |
def test_ctor_start_true(self): | |
inst = self._makeOneWithMap(_start=True) | |
self.assertEqual(inst.accepting, True) | |
self.assertEqual(inst.socket.listened, 1024) | |
def test_ctor_makes_dispatcher(self): | |
inst = self._makeOne(_start=False, map={}) | |
self.assertEqual( | |
inst.task_dispatcher.__class__.__name__, "ThreadedTaskDispatcher" | |
) | |
def test_ctor_start_false(self): | |
inst = self._makeOneWithMap(_start=False) | |
self.assertEqual(inst.accepting, False) | |
def test_get_server_multi(self): | |
inst = self._makeOneWithMulti() | |
self.assertEqual(inst.__class__.__name__, "MultiSocketServer") | |
def test_run(self): | |
inst = self._makeOneWithMap(_start=False) | |
inst.asyncore = DummyAsyncore() | |
inst.task_dispatcher = DummyTaskDispatcher() | |
inst.run() | |
self.assertTrue(inst.task_dispatcher.was_shutdown) | |
def test_run_base_server(self): | |
inst = self._makeOneWithMulti(_start=False) | |
inst.asyncore = DummyAsyncore() | |
inst.task_dispatcher = DummyTaskDispatcher() | |
inst.run() | |
self.assertTrue(inst.task_dispatcher.was_shutdown) | |
def test_pull_trigger(self): | |
inst = self._makeOneWithMap(_start=False) | |
inst.trigger.close() | |
inst.trigger = DummyTrigger() | |
inst.pull_trigger() | |
self.assertEqual(inst.trigger.pulled, True) | |
def test_add_task(self): | |
task = DummyTask() | |
inst = self._makeOneWithMap() | |
inst.add_task(task) | |
self.assertEqual(inst.task_dispatcher.tasks, [task]) | |
self.assertFalse(task.serviced) | |
def test_readable_not_accepting(self): | |
inst = self._makeOneWithMap() | |
inst.accepting = False | |
self.assertFalse(inst.readable()) | |
def test_readable_maplen_gt_connection_limit(self): | |
inst = self._makeOneWithMap() | |
inst.accepting = True | |
inst.adj = DummyAdj | |
inst._map = {"a": 1, "b": 2} | |
self.assertFalse(inst.readable()) | |
self.assertTrue(inst.in_connection_overflow) | |
def test_readable_maplen_lt_connection_limit(self): | |
inst = self._makeOneWithMap() | |
inst.accepting = True | |
inst.adj = DummyAdj | |
inst._map = {} | |
self.assertTrue(inst.readable()) | |
self.assertFalse(inst.in_connection_overflow) | |
def test_readable_maplen_toggles_connection_overflow(self): | |
inst = self._makeOneWithMap() | |
inst.accepting = True | |
inst.adj = DummyAdj | |
inst._map = {"a": 1, "b": 2} | |
self.assertFalse(inst.in_connection_overflow) | |
self.assertFalse(inst.readable()) | |
self.assertTrue(inst.in_connection_overflow) | |
inst._map = {} | |
self.assertTrue(inst.readable()) | |
self.assertFalse(inst.in_connection_overflow) | |
def test_readable_maintenance_false(self): | |
import time | |
inst = self._makeOneWithMap() | |
then = time.time() + 1000 | |
inst.next_channel_cleanup = then | |
L = [] | |
inst.maintenance = lambda t: L.append(t) | |
inst.readable() | |
self.assertEqual(L, []) | |
self.assertEqual(inst.next_channel_cleanup, then) | |
def test_readable_maintenance_true(self): | |
inst = self._makeOneWithMap() | |
inst.next_channel_cleanup = 0 | |
L = [] | |
inst.maintenance = lambda t: L.append(t) | |
inst.readable() | |
self.assertEqual(len(L), 1) | |
self.assertNotEqual(inst.next_channel_cleanup, 0) | |
def test_writable(self): | |
inst = self._makeOneWithMap() | |
self.assertFalse(inst.writable()) | |
def test_handle_read(self): | |
inst = self._makeOneWithMap() | |
self.assertEqual(inst.handle_read(), None) | |
def test_handle_connect(self): | |
inst = self._makeOneWithMap() | |
self.assertEqual(inst.handle_connect(), None) | |
def test_handle_accept_wouldblock_socket_error(self): | |
inst = self._makeOneWithMap() | |
ewouldblock = socket.error(errno.EWOULDBLOCK) | |
inst.socket = DummySock(toraise=ewouldblock) | |
inst.handle_accept() | |
self.assertEqual(inst.socket.accepted, False) | |
def test_handle_accept_other_socket_error(self): | |
inst = self._makeOneWithMap() | |
eaborted = socket.error(errno.ECONNABORTED) | |
inst.socket = DummySock(toraise=eaborted) | |
inst.adj = DummyAdj | |
def foo(): | |
raise OSError | |
inst.accept = foo | |
inst.logger = DummyLogger() | |
inst.handle_accept() | |
self.assertEqual(inst.socket.accepted, False) | |
self.assertEqual(len(inst.logger.logged), 1) | |
def test_handle_accept_noerror(self): | |
inst = self._makeOneWithMap() | |
innersock = DummySock() | |
inst.socket = DummySock(acceptresult=(innersock, None)) | |
inst.adj = DummyAdj | |
L = [] | |
inst.channel_class = lambda *arg, **kw: L.append(arg) | |
inst.handle_accept() | |
self.assertEqual(inst.socket.accepted, True) | |
self.assertEqual(innersock.opts, [("level", "optname", "value")]) | |
self.assertEqual(L, [(inst, innersock, None, inst.adj)]) | |
def test_maintenance(self): | |
inst = self._makeOneWithMap() | |
class DummyChannel: | |
requests = [] | |
zombie = DummyChannel() | |
zombie.last_activity = 0 | |
zombie.running_tasks = False | |
inst.active_channels[100] = zombie | |
inst.maintenance(10000) | |
self.assertEqual(zombie.will_close, True) | |
def test_backward_compatibility(self): | |
from waitress.adjustments import Adjustments | |
from waitress.server import TcpWSGIServer, WSGIServer | |
self.assertTrue(WSGIServer is TcpWSGIServer) | |
self.inst = WSGIServer(None, _start=False, port=1234) | |
# Ensure the adjustment was actually applied. | |
self.assertNotEqual(Adjustments.port, 1234) | |
self.assertEqual(self.inst.adj.port, 1234) | |
def test_create_with_one_tcp_socket(self): | |
from waitress.server import TcpWSGIServer | |
sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)] | |
sockets[0].bind(("127.0.0.1", 0)) | |
inst = self._makeWithSockets(_start=False, sockets=sockets) | |
self.assertTrue(isinstance(inst, TcpWSGIServer)) | |
def test_create_with_multiple_tcp_sockets(self): | |
from waitress.server import MultiSocketServer | |
sockets = [ | |
socket.socket(socket.AF_INET, socket.SOCK_STREAM), | |
socket.socket(socket.AF_INET, socket.SOCK_STREAM), | |
] | |
sockets[0].bind(("127.0.0.1", 0)) | |
sockets[1].bind(("127.0.0.1", 0)) | |
inst = self._makeWithSockets(_start=False, sockets=sockets) | |
self.assertTrue(isinstance(inst, MultiSocketServer)) | |
self.assertEqual(len(inst.effective_listen), 2) | |
def test_create_with_one_socket_should_not_bind_socket(self): | |
innersock = DummySock() | |
sockets = [DummySock(acceptresult=(innersock, None))] | |
sockets[0].bind(("127.0.0.1", 80)) | |
sockets[0].bind_called = False | |
inst = self._makeWithSockets(_start=False, sockets=sockets) | |
self.assertEqual(inst.socket.bound, ("127.0.0.1", 80)) | |
self.assertFalse(inst.socket.bind_called) | |
def test_create_with_one_socket_handle_accept_noerror(self): | |
innersock = DummySock() | |
sockets = [DummySock(acceptresult=(innersock, None))] | |
sockets[0].bind(("127.0.0.1", 80)) | |
inst = self._makeWithSockets(sockets=sockets) | |
L = [] | |
inst.channel_class = lambda *arg, **kw: L.append(arg) | |
inst.adj = DummyAdj | |
inst.handle_accept() | |
self.assertEqual(sockets[0].accepted, True) | |
self.assertEqual(innersock.opts, [("level", "optname", "value")]) | |
self.assertEqual(L, [(inst, innersock, None, inst.adj)]) | |
class DummySock(socket.socket): | |
accepted = False | |
blocking = False | |
family = socket.AF_INET | |
type = socket.SOCK_STREAM | |
proto = 0 | |
def __init__(self, toraise=None, acceptresult=(None, None)): | |
self.toraise = toraise | |
self.acceptresult = acceptresult | |
self.bound = None | |
self.opts = [] | |
self.bind_called = False | |
def bind(self, addr): | |
self.bind_called = True | |
self.bound = addr | |
def accept(self): | |
if self.toraise: | |
raise self.toraise | |
self.accepted = True | |
return self.acceptresult | |
def setblocking(self, x): | |
self.blocking = True | |
def fileno(self): | |
return 10 | |
def getpeername(self): | |
return "127.0.0.1" | |
def setsockopt(self, *arg): | |
self.opts.append(arg) | |
def getsockopt(self, *arg): | |
return 1 | |
def listen(self, num): | |
self.listened = num | |
def getsockname(self): | |
return self.bound | |
def close(self): | |
pass | |
class DummyTaskDispatcher: | |
def __init__(self): | |
self.tasks = [] | |
def add_task(self, task): | |
self.tasks.append(task) | |
def shutdown(self): | |
self.was_shutdown = True | |
class DummyTask: | |
serviced = False | |
start_response_called = False | |
wrote_header = False | |
status = "200 OK" | |
def __init__(self): | |
self.response_headers = {} | |
self.written = "" | |
def service(self): # pragma: no cover | |
self.serviced = True | |
class DummyAdj: | |
connection_limit = 1 | |
log_socket_errors = True | |
socket_options = [("level", "optname", "value")] | |
cleanup_interval = 900 | |
channel_timeout = 300 | |
class DummyAsyncore: | |
def loop(self, timeout=30.0, use_poll=False, map=None, count=None): | |
raise SystemExit | |
class DummyTrigger: | |
def pull_trigger(self): | |
self.pulled = True | |
def close(self): | |
pass | |
class DummyLogger: | |
def __init__(self): | |
self.logged = [] | |
def warning(self, msg, **kw): | |
self.logged.append(msg) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment