Last active
July 4, 2016 10:53
-
-
Save borgle/e53497d9cb88c23c8ff70063196ef836 to your computer and use it in GitHub Desktop.
simplify Web Server base on wsgi on gunicorn, gevent. visit eg. http://localhost/DEMO/Hello?method=Say&other=otherparameter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding:utf-8 | |
import requsts | |
import imghdr, json | |
''' this file is in package DEMO folder ''' | |
def Say(parameters, start_response): | |
s = { | |
'content': 'how are you?', | |
} | |
start_response('200 OK', [('Content-Type', "application/json")]) | |
return json.dumps(s) | |
def Remote(parameters, start_response): | |
''' get remote img from server-end ''' | |
url = parameters['img'] | |
r = requests.get(url) | |
data = r.content | |
r.close() | |
imgtype = imghdr.what('', h=data) | |
start_response('200 OK', [('Content-Type', 'image/{}'.format(imgtype)), | |
('content-length', str(len(data)))]) | |
return [data] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding:utf-8 | |
__version__ = '1.0.0' | |
try: | |
import gevent | |
import gevent.queue | |
import gevent.monkey | |
gevent.monkey.patch_all() | |
except ImportError: | |
pass | |
import sys | |
import logging | |
import multiprocessing | |
from urllib import unquote as urlunquote | |
def application(environ, start_response): | |
try: | |
PATH_INFO = environ['PATH_INFO'].split('/') | |
logging.debug(PATH_INFO) | |
if not PATH_INFO[1] or len(PATH_INFO) != 3: | |
data = 'hello, this is a private service.' | |
start_response('200 OK', [('Content-Type', "text/plain"), | |
('content-length', str(len(data)))]) | |
return [data] | |
if PATH_INFO[1] == 'favicon.ico': | |
start_response('200 OK', [('Content-Type', "image/png")]) | |
return [] | |
REQUEST_METHOD = environ['REQUEST_METHOD'] | |
if REQUEST_METHOD == 'GET': | |
parameters = parse_request_data(environ['QUERY_STRING']) | |
elif REQUEST_METHOD == 'POST': | |
data = environ['wsgi.input'].read() | |
logging.debug(data) | |
parameters = parse_request_data(data) | |
logging.debug(parameters) | |
else: | |
start_response('403', [('Content-Type', "text/plain")]) | |
return ['Access denied.'] | |
if not 'method' in parameters: | |
start_response('200 OK', [('Content-Type', "text/plain")]) | |
return ['not supported method.'] | |
module = __import__('{}.{}'.format(PATH_INFO[1], PATH_INFO[2]), {}, {}, ['models']) | |
logging.debug(module) | |
if 'method' in parameters: | |
method = getattr(module, parameters['method']) | |
return method(parameters, start_response) | |
start_response('403', [('Content-Type', "text/plain")]) | |
return ['Access denied.'] | |
except Exception as e: | |
logging.error(e) | |
raise StopIteration | |
def parse_request_data(qs): | |
r = {} | |
for pair in qs.replace(';', '&').split('&'): | |
if not pair: continue | |
nv = pair.split('=', 1) | |
if len(nv) != 2: nv.append('') | |
key = urlunquote(nv[0].replace('+', ' ')) | |
value = urlunquote(nv[1].replace('+', ' ')) | |
r[key] = value | |
return r | |
def number_of_workers(): | |
return (multiprocessing.cpu_count() * 2) + 1 | |
def run_wsgi_app(address, app): | |
try: | |
from gunicorn.app.wsgiapp import WSGIApplication | |
class GunicornApplication(WSGIApplication): | |
def __init__(self): | |
pass | |
def init(self, parser, opts, args): | |
return {'bind': '%s:%d' % (address[0], int(address[1])), | |
'workers': number_of_workers(), | |
'worker_class': 'gevent'} | |
def load(self): | |
return app | |
GunicornApplication().run() | |
except ImportError: | |
from gevent.wsgi import WSGIServer | |
WSGIServer(address, app).serve_forever() | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO, format='%(levelname)s - - %(asctime)s %(message)s', datefmt='[%b %d %H:%M:%S]') | |
if len(sys.argv) < 2: | |
host, port = '127.0.0.1', 8080 | |
else: | |
host, _, port = sys.argv[1].rpartition(':') | |
logging.info('local python application serving at %s:%s', host, port) | |
run_wsgi_app((host, int(port)), application) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment