Skip to content

Instantly share code, notes, and snippets.

@mcchae
Last active December 8, 2019 16:12
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mcchae/39febe806c3065064ad7 to your computer and use it in GitHub Desktop.
Save mcchae/39febe806c3065064ad7 to your computer and use it in GitHub Desktop.
Flask REST API sample
#!/usr/bin/env python
#coding=utf8
##########################################################################################
import os
import logging
import logging.handlers
import traceback
from flask import Flask
from flask.ext.restful import reqparse, abort, Api, Resource
from flask import request
##########################################################################################
def getLogger(logname, logdir, logsize=500*1024, logbackup_count=4):
if not os.path.exists(logdir):
os.makedirs(logdir)
logfile='%s/%s.log' % (logdir, logname)
loglevel = logging.INFO
logger = logging.getLogger(logname)
logger.setLevel(loglevel)
if logger.handlers is not None and len(logger.handlers) >= 0:
for handler in logger.handlers:
logger.removeHandler(handler)
logger.handlers = []
loghandler = logging.handlers.RotatingFileHandler(
logfile, maxBytes=logsize, backupCount=logbackup_count)
formatter = logging.Formatter('%(asctime)s-%(name)s-%(levelname)s-%(message)s')
loghandler.setFormatter(formatter)
logger.addHandler(loghandler)
return logger
##########################################################################################
logger = getLogger('restapi', '/tmp/restapi')
##########################################################################################
app = Flask(__name__)
api = Api(app)
from OpenSSL import SSL
context = SSL.Context(SSL.SSLv23_METHOD)
context.use_privatekey_file('future.key')
context.use_certificate_file('future.crt')
##########################################################################################
class ZoneInfo(Resource):
def get(self):
logger.info("ZoneInfo.get start...")
try:
stmt = "select zone,ou,pool,in_ip,ex_ip FROM tm_zone"
dlist = [
{'id':1, 'zone':'서울'},
{'id':2, 'zone':'대전'},
{'id':3, 'zone':'부산'},
]
rd = {
'result':dlist
}
return rd
except Exception, e:
logger.error('ZoneInfo.get: Error: %s: %s' % (e, traceback.format_exc()))
rd = {
'result':[]
}
return rd
finally:
rdstr = str(rd)
if len(rdstr) > 1024:
rdstr = rdstr[:1024] + '...'
logger.info("ZoneInfo.get return: %s" % rdstr)
##########################################################################################
class UserList(Resource):
def get(self, tm_id):
logger.info("UserList.get start... tm_id=<%s>" % tm_id)
try:
tm_id = tm_id.encode('utf-8')
dlist = [
{'id':11, 'userid':'kdkim'},
{'id':22, 'userid':'kdhong'},
]
rd = {
'result':dlist
}
return rd
except Exception, e:
logger.error('UserList.get: Error: %s: %s' % (e, traceback.format_exc()))
rd = {
'result':[]
}
return rd
finally:
rdstr = str(rd)
if len(rdstr) > 1024:
rdstr = rdstr[:1024] + '...'
logger.info("UserList.get return: %s" % rdstr)
##########################################################################################
# UserList : get list of TMES master rows which is matching pool_tm_id
class UserAuth(Resource):
def post(self):
rd = { 'result': False }
logger.info("UserAuth.get start...")
if not (request.form.has_key('userid') and request.form.has_key('passwd')):
logger.error('UserAuth.get: Error: invalid POST form of request (userid, passwd)')
return rd
userid = request.form['userid'].encode('utf-8')
passwd = request.form['passwd'].encode('utf-8')
try:
# authenticate
authenticated = userid != 'invalid'
rd = {
'result': authenticated,
}
return rd
except Exception, e:
logger.error('UserAuth.get: Error: %s: %s' % (e, traceback.format_exc()))
rd = { 'result': False }
return rd
finally:
rdstr = str(rd)
if len(rdstr) > 1024:
rdstr = rdstr[:1024] + '...'
logger.info("UserAuth.get return: %s" % rdstr)
##########################################################################################
## Actually setup the Api resource routing here
##########################################################################################
api.add_resource(UserAuth, '/userAuth')
api.add_resource(ZoneInfo, '/zoneInfo')
api.add_resource(UserList, '/userList/<string:tm_id>')
##########################################################################################
if __name__ == '__main__':
logger.info("Start RestAPI : listen %s:%s" % ('127.0.0.1', 8443))
app.run(
host='127.0.0.1',
port=8443,
debug = True,
ssl_context=context,
)
__author__ = 'future'
##########################################################################################
import sys
import json
import httplib
import urllib
##########################################################################################
def userAuth(host, port, userid, credential):
params = urllib.urlencode({'userid': userid, 'passwd': credential})
headers = {"Content-type": "application/x-www-form-urlencoded","Accept": "text/plain"}
conn = httplib.HTTPSConnection(host=host, port=port)
conn.request("POST", "/userAuth", params, headers)
response = conn.getresponse()
# print response.status, response.reason
data = response.read()
conn.close()
rdict = json.loads(data)
return rdict
##########################################################################################
def zoneInfo(host, port):
getstr='/zoneInfo'
import httplib
conn = httplib.HTTPSConnection(host=host, port=port)
conn.request("GET", getstr)
response = conn.getresponse()
#print response.status, response.reason
if response.status != 200:
sys.stderr.write("Error: %s" % response.reason)
return 0
data = response.read()
rdict = json.loads(data)
return rdict
##########################################################################################
def userList(host, port, tm_id):
getstr='/userList/%s' % tm_id
import httplib
conn = httplib.HTTPSConnection(host=host, port=port)
conn.request("GET", getstr)
response = conn.getresponse()
#print response.status, response.reason
if response.status != 200:
sys.stderr.write("Error: %s" % response.reason)
return 0
data = response.read()
rdict = json.loads(data)
return rdict
##########################################################################################
def test():
import pprint
host = '127.0.0.1'
port = 8443
# userAuth
print ">>> Invalid userAuth('invalid', '1234')"
print userAuth(host, port, 'invalid', '1234')
print ">>> Valid userAuth('nkkang', '2134')"
print userAuth(host, port, 'nkkang', '2134')
print ">>> Valid userAuth('kdhong', '2134')"
print userAuth(host, port, 'kdhong', '2134')
# zoneInfo
print ">>> zoneInfo()"
pprint.pprint(zoneInfo(host, port))
# userList
print ">>> userList('1')"
print userList(host, port, 1)
print ">>> userList('2')"
##########################################################################################
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment