Skip to content

Instantly share code, notes, and snippets.

@vane
Last active March 3, 2018 09:32
Show Gist options
  • Save vane/d3052a4a053ab83406fdb3dd83c6bbc8 to your computer and use it in GitHub Desktop.
Save vane/d3052a4a053ab83406fdb3dd83c6bbc8 to your computer and use it in GitHub Desktop.
tornado scale wsgi app
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import io
import sys
import bottle
import traceback
import tornado
import tornado.gen
import tornado.log
import tornado.httputil
import tornado.escape
import tornado.wsgi
import tornado.web
import tornado.ioloop
import concurrent.futures.thread
wapp = bottle.Bottle()
instance = tornado.ioloop.IOLoop.instance()
class Count:
count = 0
SCALE = 1
@wapp.get('/test/<param>')
def test(param):
time.sleep(int(param))
return """
Hello World {}
""".format(param)
class WSGIContainer(object):
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
def __call__(self, request):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data["status"] = status
data["headers"] = response_headers
return response.append
@tornado.gen.coroutine
def async_resp():
return self.wsgi_application(WSGIContainer.environ(request), start_response)
ft = async_resp()
app_response = ft.result()
try:
response.extend(app_response)
body = b"".join(response)
finally:
if hasattr(app_response, "close"):
app_response.close()
if not data:
raise Exception("WSGI app did not call start_response")
status_code, reason = data["status"].split(' ', 1)
status_code = int(status_code)
headers = data["headers"]
header_set = set(k.lower() for (k, v) in headers)
body = tornado.escape.utf8(body)
if status_code != 304:
if "content-length" not in header_set:
headers.append(("Content-Length", str(len(body))))
if "content-type" not in header_set:
headers.append(("Content-Type", "text/html; charset=UTF-8"))
if "server" not in header_set:
headers.append(("Server", "TornadoServer/%s" % tornado.version))
start_line = tornado.httputil.ResponseStartLine("HTTP/1.1", status_code, reason)
header_obj = tornado.httputil.HTTPHeaders()
for key, value in headers:
header_obj.add(key, value)
request.connection.write_headers(start_line, header_obj, chunk=body)
request.connection.finish()
self._log(status_code, request)
@staticmethod
def environ(request):
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
"""
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
port = int(hostport[1])
else:
host = request.host
port = 443 if request.protocol == "https" else 80
environ = {
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
"PATH_INFO": tornado.wsgi.to_wsgi_str(tornado.escape.url_unescape(
request.path, encoding=None, plus=False)),
"QUERY_STRING": request.query,
"REMOTE_ADDR": request.remote_ip,
"SERVER_NAME": host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": request.version,
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.protocol,
"wsgi.input": io.BytesIO(tornado.escape.utf8(request.body)),
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if "Content-Type" in request.headers:
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
if "Content-Length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
for key, value in request.headers.items():
environ["HTTP_" + key.replace("-", "_").upper()] = value
return environ
def _log(self, status_code, request):
if status_code < 400:
log_method = tornado.log.access_log.info
elif status_code < 500:
log_method = tornado.log.access_log.warning
else:
log_method = tornado.log.access_log.error
request_time = 1000.0 * request.request_time()
summary = request.method + " " + request.uri + " (" + \
request.remote_ip + ")"
log_method("%d %s %.2fms", status_code, summary, request_time)
class WSGIHandler(tornado.web.RequestHandler):
executor = concurrent.futures.thread.ThreadPoolExecutor(max_workers=SCALE)
def initialize(self, fallback, threaded=False):
self.fallback = fallback
self.threaded = threaded
self._error = False
def prepare(self):
Count.count += 1
if self.threaded:
print('req <= {}'.format(Count.count))
self.ft = self.executor.submit(self.fallback, self.request)
self.ft.add_done_callback(self.ft_finish)
q = self.executor._work_queue
print("queue size: {} task_handling: {}".format(len(q.queue), q.unfinished_tasks))
def ft_finish(self, ft):
try:
# this have to have timeout because without some browser requests are not finished
ft.result(60)
# this will close cleanup tornado part
if not self._finished:
self.request.finish()
self._finished = True
self.on_finish()
# self._break_cycles()
except Exception as e:
self._error = True
print(traceback.format_exc())
#raise e.__class__, e, ft._traceback
finally:
pass
def on_finish(self):
Count.count += 1
print('req => {}'.format(Count.count))
def post(self, *args, **kwargs):
return self.ft
def get(self, *args, **kwargs):
""" See WSGIHandler.post """
return self.ft
def put(self, *args, **kwargs):
""" See WSGIHandler.post """
return self.ft
def delete(self, *args, **kwargs):
""" See WSGIHandler.post """
return self.ft
def req_finish(self):
if self._error:
print(self.ft._exception)
else:
self._finished = True
def send_error(self, status_code=500, **kwargs):
self.req_finish()
if not self._finished:
self.finish()
def write_error(self, status_code, **kwargs):
tornado.web.RequestHandler.write_error(self, status_code, **kwargs)
if __name__ == '__main__':
fb = WSGIContainer(wapp)
app = tornado.web.Application(
handlers=[('.*', WSGIHandler, dict(fallback=fb, threaded=True))],
default_host=None,
transforms=None)
web = app
web.listen(5002, address="0.0.0.0")
instance.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment