-
-
Save adontz/a6ad0246758ef5362841c86c88ee3c01 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from datetime import datetime | |
from httptools import HttpRequestParser | |
from jinja2 import Environment | |
from jinja2 import FileSystemLoader | |
from os.path import dirname | |
import asyncio | |
HEAD_TEMPLATE = b'''HTTP/1.1 200 OK | |
Date: Mon, 13 Jan 2019 12:28:53 GMT | |
Server: Apache/2.2.14 (Win32) | |
Last-Modified: Wed, 22 Jul 2009 19:15:56 GMT | |
Content-Length: %d | |
Content-Type: text/html | |
Connection: Closed | |
''' | |
environment = Environment(auto_reload=False, enable_async=True, loader=FileSystemLoader(dirname(__file__))) | |
class Request: | |
def __init__(self): | |
from curio import Event | |
self.url = b'' | |
self.headers = {} | |
self._headers_complete = False | |
self._headers_complete_event = asyncio.Event() | |
self._body_queue = asyncio.Queue() | |
class RequestParserCallback: | |
def __init__(self, request): | |
self.request = request | |
self.is_body_complete = False | |
def on_message_begin(self): | |
pass | |
def on_url(self, url: bytes): | |
self.request.url = url | |
def on_header(self, name: bytes, value: bytes): | |
self.request.headers[name] = value | |
def on_headers_complete(self): | |
self.request._headers_complete = True | |
def on_body(self, body: bytes): | |
if len(body) > 0: | |
self.request._body_queue.put_nowait(body) | |
def on_message_complete(self): | |
self.request._body_queue.put_nowait(b'') | |
self.is_body_complete = True | |
class TestProtocol(asyncio.Protocol): | |
def __init__(self): | |
self.request = Request() | |
self.request_parser_callback = RequestParserCallback(self.request) | |
self.request_parser = HttpRequestParser(self.request_parser_callback) | |
def pause_writing(self): | |
print('pause_writing') | |
def resume_writing(self): | |
print('resume_writing') | |
def _close_task(self): | |
if self.transport: | |
self.transport.close() | |
async def _write_task(self): | |
while not self.request_parser_callback.is_body_complete: | |
data = await self.request._body_queue.get() | |
if not data: | |
break | |
template = environment.get_template('test.html') | |
body_text = await template.render_async({'system': 'uv-loop.', 'address': self.transport.get_extra_info('peername'), 'datetime': datetime.now()}) | |
body_data = body_text.encode('utf-8') | |
head_data = HEAD_TEMPLATE % len(body_data) | |
self.transport.write(head_data) | |
self.transport.write(body_data) | |
asyncio.get_running_loop().call_soon(self._close_task) | |
def connection_made(self, transport): | |
self.transport = transport | |
asyncio.get_running_loop().create_task(self._write_task()) | |
def connection_lost(self, exc): | |
self.transport = None | |
def data_received(self, data): | |
self.request_parser.feed_data(data) | |
if self.request._headers_complete: | |
self.request._headers_complete_event.set() | |
if __name__ == '__main__': | |
import uvloop | |
loop = uvloop.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.set_debug(False) | |
coro = loop.create_server(TestProtocol, '0.0.0.0', 7777, ssl=None, reuse_address=True, reuse_port=True, backlog=1024) | |
srv = loop.run_until_complete(coro) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment