Skip to content

Instantly share code, notes, and snippets.

@kelciour
Last active August 26, 2021 02:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kelciour/8186c66d891f20866fa24053bc074aca to your computer and use it in GitHub Desktop.
Save kelciour/8186c66d891f20866fa24053bc074aca to your computer and use it in GitHub Desktop.
waitress
import errno
import socket
import unittest
dummy_app = object()
class TestWSGIServer(unittest.TestCase):
def _makeOne(
self,
application=dummy_app,
host="127.0.0.1",
port=0,
_dispatcher=None,
adj=None,
map=None,
_start=True,
_sock=None,
_server=None,
):
from waitress.server import create_server
self.inst = create_server(
application,
host=host,
port=port,
map=map,
_dispatcher=_dispatcher,
_start=_start,
_sock=_sock,
)
return self.inst
def _makeOneWithMap(
self, adj=None, _start=True, host="127.0.0.1", port=0, app=dummy_app
):
sock = DummySock()
task_dispatcher = DummyTaskDispatcher()
map = {}
return self._makeOne(
app,
host=host,
port=port,
map=map,
_sock=sock,
_dispatcher=task_dispatcher,
_start=_start,
)
def _makeOneWithMulti(
self, adj=None, _start=True, app=dummy_app, listen="127.0.0.1:0 127.0.0.1:0"
):
sock = DummySock()
task_dispatcher = DummyTaskDispatcher()
map = {}
from waitress.server import create_server
self.inst = create_server(
app,
listen=listen,
map=map,
_dispatcher=task_dispatcher,
_start=_start,
_sock=sock,
)
return self.inst
def _makeWithSockets(
self,
application=dummy_app,
_dispatcher=None,
map=None,
_start=True,
_sock=None,
_server=None,
sockets=None,
):
from waitress.server import create_server
_sockets = []
if sockets is not None:
_sockets = sockets
self.inst = create_server(
application,
map=map,
_dispatcher=_dispatcher,
_start=_start,
_sock=_sock,
sockets=_sockets,
)
return self.inst
def tearDown(self):
if self.inst is not None:
self.inst.close()
def test_ctor_app_is_None(self):
self.inst = None
self.assertRaises(ValueError, self._makeOneWithMap, app=None)
def test_ctor_start_true(self):
inst = self._makeOneWithMap(_start=True)
self.assertEqual(inst.accepting, True)
self.assertEqual(inst.socket.listened, 1024)
def test_ctor_makes_dispatcher(self):
inst = self._makeOne(_start=False, map={})
self.assertEqual(
inst.task_dispatcher.__class__.__name__, "ThreadedTaskDispatcher"
)
def test_ctor_start_false(self):
inst = self._makeOneWithMap(_start=False)
self.assertEqual(inst.accepting, False)
def test_get_server_multi(self):
inst = self._makeOneWithMulti()
self.assertEqual(inst.__class__.__name__, "MultiSocketServer")
def test_run(self):
inst = self._makeOneWithMap(_start=False)
inst.asyncore = DummyAsyncore()
inst.task_dispatcher = DummyTaskDispatcher()
inst.run()
self.assertTrue(inst.task_dispatcher.was_shutdown)
def test_run_base_server(self):
inst = self._makeOneWithMulti(_start=False)
inst.asyncore = DummyAsyncore()
inst.task_dispatcher = DummyTaskDispatcher()
inst.run()
self.assertTrue(inst.task_dispatcher.was_shutdown)
def test_pull_trigger(self):
inst = self._makeOneWithMap(_start=False)
inst.trigger.close()
inst.trigger = DummyTrigger()
inst.pull_trigger()
self.assertEqual(inst.trigger.pulled, True)
def test_add_task(self):
task = DummyTask()
inst = self._makeOneWithMap()
inst.add_task(task)
self.assertEqual(inst.task_dispatcher.tasks, [task])
self.assertFalse(task.serviced)
def test_readable_not_accepting(self):
inst = self._makeOneWithMap()
inst.accepting = False
self.assertFalse(inst.readable())
def test_readable_maplen_gt_connection_limit(self):
inst = self._makeOneWithMap()
inst.accepting = True
inst.adj = DummyAdj
inst._map = {"a": 1, "b": 2}
self.assertFalse(inst.readable())
self.assertTrue(inst.in_connection_overflow)
def test_readable_maplen_lt_connection_limit(self):
inst = self._makeOneWithMap()
inst.accepting = True
inst.adj = DummyAdj
inst._map = {}
self.assertTrue(inst.readable())
self.assertFalse(inst.in_connection_overflow)
def test_readable_maplen_toggles_connection_overflow(self):
inst = self._makeOneWithMap()
inst.accepting = True
inst.adj = DummyAdj
inst._map = {"a": 1, "b": 2}
self.assertFalse(inst.in_connection_overflow)
self.assertFalse(inst.readable())
self.assertTrue(inst.in_connection_overflow)
inst._map = {}
self.assertTrue(inst.readable())
self.assertFalse(inst.in_connection_overflow)
def test_readable_maintenance_false(self):
import time
inst = self._makeOneWithMap()
then = time.time() + 1000
inst.next_channel_cleanup = then
L = []
inst.maintenance = lambda t: L.append(t)
inst.readable()
self.assertEqual(L, [])
self.assertEqual(inst.next_channel_cleanup, then)
def test_readable_maintenance_true(self):
inst = self._makeOneWithMap()
inst.next_channel_cleanup = 0
L = []
inst.maintenance = lambda t: L.append(t)
inst.readable()
self.assertEqual(len(L), 1)
self.assertNotEqual(inst.next_channel_cleanup, 0)
def test_writable(self):
inst = self._makeOneWithMap()
self.assertFalse(inst.writable())
def test_handle_read(self):
inst = self._makeOneWithMap()
self.assertEqual(inst.handle_read(), None)
def test_handle_connect(self):
inst = self._makeOneWithMap()
self.assertEqual(inst.handle_connect(), None)
def test_handle_accept_wouldblock_socket_error(self):
inst = self._makeOneWithMap()
ewouldblock = socket.error(errno.EWOULDBLOCK)
inst.socket = DummySock(toraise=ewouldblock)
inst.handle_accept()
self.assertEqual(inst.socket.accepted, False)
def test_handle_accept_other_socket_error(self):
inst = self._makeOneWithMap()
eaborted = socket.error(errno.ECONNABORTED)
inst.socket = DummySock(toraise=eaborted)
inst.adj = DummyAdj
def foo():
raise OSError
inst.accept = foo
inst.logger = DummyLogger()
inst.handle_accept()
self.assertEqual(inst.socket.accepted, False)
self.assertEqual(len(inst.logger.logged), 1)
def test_handle_accept_noerror(self):
inst = self._makeOneWithMap()
innersock = DummySock()
inst.socket = DummySock(acceptresult=(innersock, None))
inst.adj = DummyAdj
L = []
inst.channel_class = lambda *arg, **kw: L.append(arg)
inst.handle_accept()
self.assertEqual(inst.socket.accepted, True)
self.assertEqual(innersock.opts, [("level", "optname", "value")])
self.assertEqual(L, [(inst, innersock, None, inst.adj)])
def test_maintenance(self):
inst = self._makeOneWithMap()
class DummyChannel:
requests = []
zombie = DummyChannel()
zombie.last_activity = 0
zombie.running_tasks = False
inst.active_channels[100] = zombie
inst.maintenance(10000)
self.assertEqual(zombie.will_close, True)
def test_backward_compatibility(self):
from waitress.adjustments import Adjustments
from waitress.server import TcpWSGIServer, WSGIServer
self.assertTrue(WSGIServer is TcpWSGIServer)
self.inst = WSGIServer(None, _start=False, port=1234)
# Ensure the adjustment was actually applied.
self.assertNotEqual(Adjustments.port, 1234)
self.assertEqual(self.inst.adj.port, 1234)
def test_create_with_one_tcp_socket(self):
from waitress.server import TcpWSGIServer
sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)]
sockets[0].bind(("127.0.0.1", 0))
inst = self._makeWithSockets(_start=False, sockets=sockets)
self.assertTrue(isinstance(inst, TcpWSGIServer))
def test_create_with_multiple_tcp_sockets(self):
from waitress.server import MultiSocketServer
sockets = [
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
sockets[0].bind(("127.0.0.1", 0))
sockets[1].bind(("127.0.0.1", 0))
inst = self._makeWithSockets(_start=False, sockets=sockets)
self.assertTrue(isinstance(inst, MultiSocketServer))
self.assertEqual(len(inst.effective_listen), 2)
def test_create_with_one_socket_should_not_bind_socket(self):
innersock = DummySock()
sockets = [DummySock(acceptresult=(innersock, None))]
sockets[0].bind(("127.0.0.1", 80))
sockets[0].bind_called = False
inst = self._makeWithSockets(_start=False, sockets=sockets)
self.assertEqual(inst.socket.bound, ("127.0.0.1", 80))
self.assertFalse(inst.socket.bind_called)
def test_create_with_one_socket_handle_accept_noerror(self):
innersock = DummySock()
sockets = [DummySock(acceptresult=(innersock, None))]
sockets[0].bind(("127.0.0.1", 80))
inst = self._makeWithSockets(sockets=sockets)
L = []
inst.channel_class = lambda *arg, **kw: L.append(arg)
inst.adj = DummyAdj
inst.handle_accept()
self.assertEqual(sockets[0].accepted, True)
self.assertEqual(innersock.opts, [("level", "optname", "value")])
self.assertEqual(L, [(inst, innersock, None, inst.adj)])
class DummySock(socket.socket):
accepted = False
blocking = False
family = socket.AF_INET
type = socket.SOCK_STREAM
proto = 0
def __init__(self, toraise=None, acceptresult=(None, None)):
self.toraise = toraise
self.acceptresult = acceptresult
self.bound = None
self.opts = []
self.bind_called = False
def bind(self, addr):
self.bind_called = True
self.bound = addr
def accept(self):
if self.toraise:
raise self.toraise
self.accepted = True
return self.acceptresult
def setblocking(self, x):
self.blocking = True
def fileno(self):
return 10
def getpeername(self):
return "127.0.0.1"
def setsockopt(self, *arg):
self.opts.append(arg)
def getsockopt(self, *arg):
return 1
def listen(self, num):
self.listened = num
def getsockname(self):
return self.bound
def close(self):
pass
class DummyTaskDispatcher:
def __init__(self):
self.tasks = []
def add_task(self, task):
self.tasks.append(task)
def shutdown(self):
self.was_shutdown = True
class DummyTask:
serviced = False
start_response_called = False
wrote_header = False
status = "200 OK"
def __init__(self):
self.response_headers = {}
self.written = ""
def service(self): # pragma: no cover
self.serviced = True
class DummyAdj:
connection_limit = 1
log_socket_errors = True
socket_options = [("level", "optname", "value")]
cleanup_interval = 900
channel_timeout = 300
class DummyAsyncore:
def loop(self, timeout=30.0, use_poll=False, map=None, count=None):
raise SystemExit
class DummyTrigger:
def pull_trigger(self):
self.pulled = True
def close(self):
pass
class DummyLogger:
def __init__(self):
self.logged = []
def warning(self, msg, **kw):
self.logged.append(msg)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment