Created
October 6, 2014 23:17
-
-
Save 1st1/8dd0a2d4aa1ffd895c52 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from aiohttp.server import ServerHttpProtocol | |
import unittest | |
import threading | |
from urllib.request import urlopen, URLError | |
class Test(unittest.TestCase): | |
def setUp(self): | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
def _run_server(self): | |
started = threading.Event() | |
self.srv = threading.Thread(target=self._server, args=(started,)) | |
self.srv.start() | |
if not started.wait(timeout=3): | |
self.fail("server did not start") | |
def _server(self, started): | |
loop = self.server_loop = asyncio.new_event_loop() | |
self.server = loop.run_until_complete( | |
loop.create_server(lambda: ServerHttpProtocol(loop=loop), | |
'127.0.0.1', | |
8880)) | |
started.set() | |
loop.run_forever() | |
def _stop_server(self): | |
self.server.close() | |
self.server.wait_closed() | |
self.server_loop.stop() | |
def test_version(self): | |
self._run_server() | |
with self.assertRaises(URLError) as cm: | |
urlopen('http://127.0.0.1:8880/') | |
self.assertEqual(cm.exception.code, 404) | |
self.server_loop.call_soon_threadsafe(self._stop_server) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment