Skip to content

Instantly share code, notes, and snippets.

@remittor
Last active April 21, 2023 07:30
Show Gist options
  • Save remittor/7e6332616c930a618fdcb8a9a95a2dd4 to your computer and use it in GitHub Desktop.
Save remittor/7e6332616c930a618fdcb8a9a95a2dd4 to your computer and use it in GitHub Desktop.
import os
import sys
import asyncio
import io
import optparse
import logging
import pprint
try:
from ujson import dumps as jsonify
except:
from json import dumps as jsonify
parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string")
parser.add_option("-p", "--port", dest="port", default=5000, type="int")
parser.add_option("-g", "--gateway", dest="gateway", default="wz", type="string")
parser.add_option("-q", "--quart", dest="quart", action="store_true", default=False)
parser.add_option("-f", "--fastapi", dest="fastapi", action="store_true", default=False)
parser.add_option("-b", "--blacksheep", dest="blacksheep", action="store_true", default=False)
parser.add_option("-s", "--size", dest="size", default="", type="string")
parser.add_option("-#", "--profile", dest="profile", action="store_true", default=False)
parser.add_option("-t", "--threads", dest="threads", default=1, type="int")
parser.add_option("-v", "--verbose", dest="verbose", default=2, type="int")
(opt, args) = parser.parse_args()
async def read_body(receive):
body = b''
more_body = True
while more_body:
message = await receive()
print(f'read_body: {message}')
body += message.get('body', b'')
more_body = message.get('more_body', False)
return body
g_response_server = ''
async def asgi_app(scope, receive, send):
assert scope['type'] == 'http'
path = scope['path']
if g_response_server:
headers = [ ('Server', g_response_server) ]
else:
headers = [ ]
#pp = pprint.PrettyPrinter(indent=4, sort_dicts=False)
#pp.pprint(scope)
if path == '/plaintext' or path == '/hw':
headers.append( (b'Content-Type', b'text/plain') )
await send( { 'type': 'http.response.start', 'status': 200, 'headers': headers } )
await send( { 'type': 'http.response.body', 'body': b'Hello, World\n' } )
#print('asgi_app: close coroutine')
#raise RuntimeError("close coroutine")
return
if path == '/hw3':
headers.append( (b'Content-Type', b'text/plain') )
await send( { 'type': 'http.response.start', 'status': 200, 'headers': headers } )
await send( { 'type': 'http.response.body', 'body': b'Hello, world 1\n', 'more_body': True } )
await send( { 'type': 'http.response.body', 'body': b'Hello, world 2\n', 'more_body': True } )
await send( { 'type': 'http.response.body', 'body': b'Hello, world 3\n' } )
return
if path == "/json":
headers.append( (b'Content-Type', b'application/json') )
await send( { 'type': 'http.response.start', 'status': 200, 'headers': headers } )
body = jsonify( { "message": "Hello, World!" } ).encode('utf8')
await send( { 'type': 'http.response.body', 'body': body } )
return
if path == "/pingpong":
body = await read_body(receive)
headers.append( (b'Content-Type', b'application/octet-stream') )
await send( { 'type': 'http.response.start', 'status': 200, 'headers': headers } )
await send( { 'type': 'http.response.body', 'body': body } )
return
await send( { 'type': 'http.response.start', 'status': 400, 'headers': headers } )
await send( { 'type': 'http.response.body', 'body': b'' } )
return
app = asgi_app
_logfix = False
if opt.quart:
from quart.config import DEFAULT_CONFIG
#DEFAULT_CONFIG["SERVER_NAME"] = "Quart"
if opt.profile:
os.environ["Q_PROFILE"] = "1"
import quart
app = quart.Quart(__name__)
@app.after_request
async def after_request(response):
global g_response_server, _logfix
if not _logfix:
logging.getLogger("hypercorn.access").setLevel(logging.WARN)
logging.getLogger("hypercorn.error").setLevel(logging.WARN)
_logfix = True
if g_response_server:
response.headers.set('Server', g_response_server)
return response
@app.get('/plaintext')
async def plaintext():
response = await quart.make_response(b"Hello, World\n")
response.content_type = "text/plain"
return response
@app.get('/json')
async def jsondata():
return quart.jsonify(message="Hello, World!")
if opt.fastapi:
import fastapi
from fastapi.responses import PlainTextResponse
from fastapi.responses import UJSONResponse as JSONResponse
app = fastapi.FastAPI()
@app.get("/plaintext")
async def plaintext():
return PlainTextResponse(b"Hello, world\n")
@app.get("/json")
async def json_serialization():
return JSONResponse({"message": "Hello, world!"})
if opt.blacksheep:
import blacksheep
from blacksheep import Application
from blacksheep import Response, Header, Content
app = Application()
async def middleware(request, handler):
global g_response_server
response = await handler(request)
if g_response_server:
response.headers[b"Server"] = g_response_server
return response
@app.router.get('/hw')
async def hw(request):
return Response(200, content=Content(b'text/plain', b'Hello, World\n'))
@app.router.get('/plaintext')
async def plaintext(request):
return blacksheep.text("Hello, World\n")
@app.route('/json')
async def jsondata(request):
return blacksheep.json( {'message': 'Hello, world!'} )
#data = jsonify( {'message': 'Hello, world!'} )
#content = Content(b'application/json; charset=utf-8', data)
#return Response(200, content=content)
if opt.gateway == "fw":
# FastWSGI not support asgi.lifespan
from blacksheep.utils.aio import get_running_loop
g_response_server = b'FastWSGI'
app.middlewares.append(middleware)
loop = get_running_loop()
loop.run_until_complete(app.start())
if __name__ == '__main__':
if opt.gateway == "fw":
import fastwsgi
if not opt.blacksheep:
g_response_server = 'FastWSGI'
#fastwsgi.server.tcp_nodelay = 1
fastwsgi.server.num_workers = opt.threads
fastwsgi.run(app, host=opt.host, port=opt.port, loglevel=opt.verbose)
if opt.gateway == "si":
import socketify
msg = f"Listening on port http://localhost:{opt.port} now\n"
socketify.ASGI(app).listen(opt.port, lambda config: print(msg)).run(opt.threads)
if opt.gateway == "uc":
import uvicorn
uvicorn.run(app, host=opt.host, port=opt.port, workers=opt.threads, loop="uvloop", log_level=logging.ERROR, access_log=False)
if opt.gateway == "hc":
import hypercorn
from hypercorn.config import Config
from hypercorn.asyncio import serve
if opt.quart:
qlog = logging.getLogger(app.name)
qlog.setLevel(logging.WARN)
#logging.getLogger(app.name).removeHandler(quart.logging.default_handler)
use_reloader = False # True = Use a reloader process to restart the server process when files are changed
app.run(opt.host, opt.port, debug=None, use_reloader=use_reloader, loop=None)
#asyncio.run(app.run_task(opt.host, opt.port))
else:
config = Config()
config.bind = [f"{opt.host}:{opt.port}"]
config.loglevel = "WARN"
#config.access_log_format = "%(h)s %(r)s %(s)s %(b)s %(D)s"
config.accesslog = None
config.debug = None
config.errorlog = "-"
#config.use_reloader = True
asyncio.run(serve(app, config, mode="asgi", shutdown_trigger=None))
print("==== ASGI Server close =====")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment