-
-
Save vane/d3052a4a053ab83406fdb3dd83c6bbc8 to your computer and use it in GitHub Desktop.
tornado scale wsgi app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import time | |
import io | |
import sys | |
import bottle | |
import traceback | |
import tornado | |
import tornado.gen | |
import tornado.log | |
import tornado.httputil | |
import tornado.escape | |
import tornado.wsgi | |
import tornado.web | |
import tornado.ioloop | |
import concurrent.futures.thread | |
wapp = bottle.Bottle() | |
instance = tornado.ioloop.IOLoop.instance() | |
class Count: | |
count = 0 | |
SCALE = 1 | |
@wapp.get('/test/<param>') | |
def test(param): | |
time.sleep(int(param)) | |
return """ | |
Hello World {} | |
""".format(param) | |
class WSGIContainer(object): | |
def __init__(self, wsgi_application): | |
self.wsgi_application = wsgi_application | |
def __call__(self, request): | |
data = {} | |
response = [] | |
def start_response(status, response_headers, exc_info=None): | |
data["status"] = status | |
data["headers"] = response_headers | |
return response.append | |
@tornado.gen.coroutine | |
def async_resp(): | |
return self.wsgi_application(WSGIContainer.environ(request), start_response) | |
ft = async_resp() | |
app_response = ft.result() | |
try: | |
response.extend(app_response) | |
body = b"".join(response) | |
finally: | |
if hasattr(app_response, "close"): | |
app_response.close() | |
if not data: | |
raise Exception("WSGI app did not call start_response") | |
status_code, reason = data["status"].split(' ', 1) | |
status_code = int(status_code) | |
headers = data["headers"] | |
header_set = set(k.lower() for (k, v) in headers) | |
body = tornado.escape.utf8(body) | |
if status_code != 304: | |
if "content-length" not in header_set: | |
headers.append(("Content-Length", str(len(body)))) | |
if "content-type" not in header_set: | |
headers.append(("Content-Type", "text/html; charset=UTF-8")) | |
if "server" not in header_set: | |
headers.append(("Server", "TornadoServer/%s" % tornado.version)) | |
start_line = tornado.httputil.ResponseStartLine("HTTP/1.1", status_code, reason) | |
header_obj = tornado.httputil.HTTPHeaders() | |
for key, value in headers: | |
header_obj.add(key, value) | |
request.connection.write_headers(start_line, header_obj, chunk=body) | |
request.connection.finish() | |
self._log(status_code, request) | |
@staticmethod | |
def environ(request): | |
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. | |
""" | |
hostport = request.host.split(":") | |
if len(hostport) == 2: | |
host = hostport[0] | |
port = int(hostport[1]) | |
else: | |
host = request.host | |
port = 443 if request.protocol == "https" else 80 | |
environ = { | |
"REQUEST_METHOD": request.method, | |
"SCRIPT_NAME": "", | |
"PATH_INFO": tornado.wsgi.to_wsgi_str(tornado.escape.url_unescape( | |
request.path, encoding=None, plus=False)), | |
"QUERY_STRING": request.query, | |
"REMOTE_ADDR": request.remote_ip, | |
"SERVER_NAME": host, | |
"SERVER_PORT": str(port), | |
"SERVER_PROTOCOL": request.version, | |
"wsgi.version": (1, 0), | |
"wsgi.url_scheme": request.protocol, | |
"wsgi.input": io.BytesIO(tornado.escape.utf8(request.body)), | |
"wsgi.errors": sys.stderr, | |
"wsgi.multithread": False, | |
"wsgi.multiprocess": True, | |
"wsgi.run_once": False, | |
} | |
if "Content-Type" in request.headers: | |
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type") | |
if "Content-Length" in request.headers: | |
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length") | |
for key, value in request.headers.items(): | |
environ["HTTP_" + key.replace("-", "_").upper()] = value | |
return environ | |
def _log(self, status_code, request): | |
if status_code < 400: | |
log_method = tornado.log.access_log.info | |
elif status_code < 500: | |
log_method = tornado.log.access_log.warning | |
else: | |
log_method = tornado.log.access_log.error | |
request_time = 1000.0 * request.request_time() | |
summary = request.method + " " + request.uri + " (" + \ | |
request.remote_ip + ")" | |
log_method("%d %s %.2fms", status_code, summary, request_time) | |
class WSGIHandler(tornado.web.RequestHandler): | |
executor = concurrent.futures.thread.ThreadPoolExecutor(max_workers=SCALE) | |
def initialize(self, fallback, threaded=False): | |
self.fallback = fallback | |
self.threaded = threaded | |
self._error = False | |
def prepare(self): | |
Count.count += 1 | |
if self.threaded: | |
print('req <= {}'.format(Count.count)) | |
self.ft = self.executor.submit(self.fallback, self.request) | |
self.ft.add_done_callback(self.ft_finish) | |
q = self.executor._work_queue | |
print("queue size: {} task_handling: {}".format(len(q.queue), q.unfinished_tasks)) | |
def ft_finish(self, ft): | |
try: | |
# this have to have timeout because without some browser requests are not finished | |
ft.result(60) | |
# this will close cleanup tornado part | |
if not self._finished: | |
self.request.finish() | |
self._finished = True | |
self.on_finish() | |
# self._break_cycles() | |
except Exception as e: | |
self._error = True | |
print(traceback.format_exc()) | |
#raise e.__class__, e, ft._traceback | |
finally: | |
pass | |
def on_finish(self): | |
Count.count += 1 | |
print('req => {}'.format(Count.count)) | |
def post(self, *args, **kwargs): | |
return self.ft | |
def get(self, *args, **kwargs): | |
""" See WSGIHandler.post """ | |
return self.ft | |
def put(self, *args, **kwargs): | |
""" See WSGIHandler.post """ | |
return self.ft | |
def delete(self, *args, **kwargs): | |
""" See WSGIHandler.post """ | |
return self.ft | |
def req_finish(self): | |
if self._error: | |
print(self.ft._exception) | |
else: | |
self._finished = True | |
def send_error(self, status_code=500, **kwargs): | |
self.req_finish() | |
if not self._finished: | |
self.finish() | |
def write_error(self, status_code, **kwargs): | |
tornado.web.RequestHandler.write_error(self, status_code, **kwargs) | |
if __name__ == '__main__': | |
fb = WSGIContainer(wapp) | |
app = tornado.web.Application( | |
handlers=[('.*', WSGIHandler, dict(fallback=fb, threaded=True))], | |
default_host=None, | |
transforms=None) | |
web = app | |
web.listen(5002, address="0.0.0.0") | |
instance.start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment