Skip to content

Instantly share code, notes, and snippets.

@adontz
Created February 22, 2019 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adontz/a6ad0246758ef5362841c86c88ee3c01 to your computer and use it in GitHub Desktop.
Save adontz/a6ad0246758ef5362841c86c88ee3c01 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from datetime import datetime
from httptools import HttpRequestParser
from jinja2 import Environment
from jinja2 import FileSystemLoader
from os.path import dirname
import asyncio
HEAD_TEMPLATE = b'''HTTP/1.1 200 OK
Date: Mon, 13 Jan 2019 12:28:53 GMT
Server: Apache/2.2.14 (Win32)
Last-Modified: Wed, 22 Jul 2009 19:15:56 GMT
Content-Length: %d
Content-Type: text/html
Connection: Closed
'''
environment = Environment(auto_reload=False, enable_async=True, loader=FileSystemLoader(dirname(__file__)))
class Request:
def __init__(self):
from curio import Event
self.url = b''
self.headers = {}
self._headers_complete = False
self._headers_complete_event = asyncio.Event()
self._body_queue = asyncio.Queue()
class RequestParserCallback:
def __init__(self, request):
self.request = request
self.is_body_complete = False
def on_message_begin(self):
pass
def on_url(self, url: bytes):
self.request.url = url
def on_header(self, name: bytes, value: bytes):
self.request.headers[name] = value
def on_headers_complete(self):
self.request._headers_complete = True
def on_body(self, body: bytes):
if len(body) > 0:
self.request._body_queue.put_nowait(body)
def on_message_complete(self):
self.request._body_queue.put_nowait(b'')
self.is_body_complete = True
class TestProtocol(asyncio.Protocol):
def __init__(self):
self.request = Request()
self.request_parser_callback = RequestParserCallback(self.request)
self.request_parser = HttpRequestParser(self.request_parser_callback)
def pause_writing(self):
print('pause_writing')
def resume_writing(self):
print('resume_writing')
def _close_task(self):
if self.transport:
self.transport.close()
async def _write_task(self):
while not self.request_parser_callback.is_body_complete:
data = await self.request._body_queue.get()
if not data:
break
template = environment.get_template('test.html')
body_text = await template.render_async({'system': 'uv-loop.', 'address': self.transport.get_extra_info('peername'), 'datetime': datetime.now()})
body_data = body_text.encode('utf-8')
head_data = HEAD_TEMPLATE % len(body_data)
self.transport.write(head_data)
self.transport.write(body_data)
asyncio.get_running_loop().call_soon(self._close_task)
def connection_made(self, transport):
self.transport = transport
asyncio.get_running_loop().create_task(self._write_task())
def connection_lost(self, exc):
self.transport = None
def data_received(self, data):
self.request_parser.feed_data(data)
if self.request._headers_complete:
self.request._headers_complete_event.set()
if __name__ == '__main__':
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(False)
coro = loop.create_server(TestProtocol, '0.0.0.0', 7777, ssl=None, reuse_address=True, reuse_port=True, backlog=1024)
srv = loop.run_until_complete(coro)
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment