Skip to content

Instantly share code, notes, and snippets.

@borgle
Last active July 4, 2016 10:53
Show Gist options
  • Save borgle/e53497d9cb88c23c8ff70063196ef836 to your computer and use it in GitHub Desktop.
Save borgle/e53497d9cb88c23c8ff70063196ef836 to your computer and use it in GitHub Desktop.
simplify Web Server base on wsgi on gunicorn, gevent. visit eg. http://localhost/DEMO/Hello?method=Say&other=otherparameter
#!/usr/bin/env python
# coding:utf-8
import requsts
import imghdr, json
''' this file is in package DEMO folder '''
def Say(parameters, start_response):
s = {
'content': 'how are you?',
}
start_response('200 OK', [('Content-Type', "application/json")])
return json.dumps(s)
def Remote(parameters, start_response):
''' get remote img from server-end '''
url = parameters['img']
r = requests.get(url)
data = r.content
r.close()
imgtype = imghdr.what('', h=data)
start_response('200 OK', [('Content-Type', 'image/{}'.format(imgtype)),
('content-length', str(len(data)))])
return [data]
#!/usr/bin/env python
# coding:utf-8
__version__ = '1.0.0'
try:
import gevent
import gevent.queue
import gevent.monkey
gevent.monkey.patch_all()
except ImportError:
pass
import sys
import logging
import multiprocessing
from urllib import unquote as urlunquote
def application(environ, start_response):
try:
PATH_INFO = environ['PATH_INFO'].split('/')
logging.debug(PATH_INFO)
if not PATH_INFO[1] or len(PATH_INFO) != 3:
data = 'hello, this is a private service.'
start_response('200 OK', [('Content-Type', "text/plain"),
('content-length', str(len(data)))])
return [data]
if PATH_INFO[1] == 'favicon.ico':
start_response('200 OK', [('Content-Type', "image/png")])
return []
REQUEST_METHOD = environ['REQUEST_METHOD']
if REQUEST_METHOD == 'GET':
parameters = parse_request_data(environ['QUERY_STRING'])
elif REQUEST_METHOD == 'POST':
data = environ['wsgi.input'].read()
logging.debug(data)
parameters = parse_request_data(data)
logging.debug(parameters)
else:
start_response('403', [('Content-Type', "text/plain")])
return ['Access denied.']
if not 'method' in parameters:
start_response('200 OK', [('Content-Type', "text/plain")])
return ['not supported method.']
module = __import__('{}.{}'.format(PATH_INFO[1], PATH_INFO[2]), {}, {}, ['models'])
logging.debug(module)
if 'method' in parameters:
method = getattr(module, parameters['method'])
return method(parameters, start_response)
start_response('403', [('Content-Type', "text/plain")])
return ['Access denied.']
except Exception as e:
logging.error(e)
raise StopIteration
def parse_request_data(qs):
r = {}
for pair in qs.replace(';', '&').split('&'):
if not pair: continue
nv = pair.split('=', 1)
if len(nv) != 2: nv.append('')
key = urlunquote(nv[0].replace('+', ' '))
value = urlunquote(nv[1].replace('+', ' '))
r[key] = value
return r
def number_of_workers():
return (multiprocessing.cpu_count() * 2) + 1
def run_wsgi_app(address, app):
try:
from gunicorn.app.wsgiapp import WSGIApplication
class GunicornApplication(WSGIApplication):
def __init__(self):
pass
def init(self, parser, opts, args):
return {'bind': '%s:%d' % (address[0], int(address[1])),
'workers': number_of_workers(),
'worker_class': 'gevent'}
def load(self):
return app
GunicornApplication().run()
except ImportError:
from gevent.wsgi import WSGIServer
WSGIServer(address, app).serve_forever()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(levelname)s - - %(asctime)s %(message)s', datefmt='[%b %d %H:%M:%S]')
if len(sys.argv) < 2:
host, port = '127.0.0.1', 8080
else:
host, _, port = sys.argv[1].rpartition(':')
logging.info('local python application serving at %s:%s', host, port)
run_wsgi_app((host, int(port)), application)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment