Created
February 1, 2021 21:53
-
-
Save devilholk/4ad5a7470d8df0a406e8181ab330884b to your computer and use it in GitHub Desktop.
This is an example/test for using gunicorn with uvicorn to serve http and websockets using asynchronous handlers. Take this code with a wheelbarrow of NaCl.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
if not 'gunicorn' in sys.argv[0]: | |
import pathlib | |
name = pathlib.Path(sys.argv[0]).stem | |
print('# To test this script:') | |
print(f'gunicorn -w1 -k uvicorn.workers.UvicornWorker {name}:app') | |
sys.exit(1) | |
async def handle_lifespan(scope, receive, send): | |
while True: | |
message = await receive() | |
if message['type'] == 'lifespan.startup': | |
... # Do some startup here! | |
await send(type = 'lifespan.startup.complete') | |
elif message['type'] == 'lifespan.shutdown': | |
... # Do some shutdown here! | |
await send(type = lifespan.shutdown.complete) | |
return | |
async def handle_http(scope, receive, send): | |
await send( | |
type ='http.response.start', | |
status =200, | |
headers =[ | |
[b'content-type', b'text/html; charset=utf-8'], | |
], | |
) | |
await send( | |
type = 'http.response.body', | |
body = br'''<!doctype HTML> | |
<html> | |
<head> | |
<title>Testing stuff</title> | |
<style> | |
html, body { | |
background-color: #CCC; | |
} | |
body > div { | |
border-radius: 5px; | |
border: 1px solid blue; | |
background-color: white; | |
padding: 5px; | |
margin-bottom: 5px; | |
} | |
</style> | |
<script> | |
function runtest() { | |
function add_msg(m) { | |
d = document.createElement('div'); | |
d.appendChild(document.createTextNode(m)); | |
document.body.appendChild(d); | |
} | |
add_msg('Performing test'); | |
ws = new WebSocket(`ws://${location.host}${location.pathname}${location.search}${location.hash}`); | |
ws.onopen = function(m) { | |
outmsg = 'Yo world!'; | |
m.target.binaryType = "arraybuffer"; | |
add_msg(`Connection established to ${ws.url}`); | |
add_msg(`Sending message: ${outmsg}`); | |
m.target.send(outmsg); | |
} | |
ws.onclose = function(m) { | |
add_msg(`Connection closed`); | |
} | |
ws.onmessage = function(m) { | |
outmsg = 'Terminate plz'; | |
msg = new TextDecoder().decode(m.data); | |
add_msg(`Received message: ${msg}`); | |
add_msg(`Sending message: ${outmsg}`); | |
m.target.send(outmsg); | |
}; | |
} | |
</script> | |
</head> | |
<body onload="runtest();"> | |
</body> | |
</html> | |
''', | |
) | |
async def handle_websocket(scope, receive, send): | |
print(scope) | |
await send(type = "websocket.accept") | |
await send(type = "websocket.send", bytes = b"123") | |
print('Waitin for websock') | |
while True: | |
message = await receive() | |
if message['type'] == 'websocket.connect': | |
print('CON', message) | |
elif message['type'] == 'websocket.receive': | |
msg_text = message['text']; | |
lmsg_text = msg_text.lower() | |
if 'terminate' in lmsg_text and 'plz' in lmsg_text: | |
await send(type = "websocket.close") | |
return | |
print('REC', message) | |
elif message['type'] == 'websocket.disconnect': | |
print('DISC', message) | |
return | |
await send(type = "websocket.send", bytes = b"Moarh") | |
scope_handlers = dict( | |
lifespan = handle_lifespan, | |
websocket = handle_websocket, | |
http = handle_http, | |
) | |
async def app(scope, receive, send): | |
scope_type = scope['type'] | |
await scope_handlers[scope_type](scope, receive, lambda **k: send(k)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment