Last active
February 8, 2017 08:56
-
-
Save limboinf/a31c9f51dc3547774dcde01d87a81323 to your computer and use it in GitHub Desktop.
Flask 实现简单的Restful APi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
:Author: beginman.cn | |
:Mail: pythonsuper@gmail.com | |
:Created Time: 2017-02-07 16:16:36 | |
:Last modified: 2017-02-07 16:16:36 | |
:Copyright: (c) 2016 by beginman. | |
:License: MIT, see LICENSE for more details. | |
""" | |
import os | |
from flask import Flask, abort, request, jsonify, g, url_for | |
from flask_httpauth import HTTPBasicAuth | |
from flask_sqlalchemy import SQLAlchemy | |
from passlib.apps import custom_app_context as pwd_context | |
from itsdangerous import (TimedJSONWebSignatureSerializer as Serializer, | |
BadSignature, SignatureExpired) | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = '!@#abcd*(,.x1209xm$%' | |
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite' | |
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True | |
app.config.setdefault('SQLALCHEMY_TRACK_MODIFICATIONS', True) | |
# extensions | |
db = SQLAlchemy(app) | |
auth = HTTPBasicAuth() | |
class User(db.Model): | |
__tablename__ = 'users' | |
id = db.Column(db.Integer, primary_key=True) | |
username = db.Column(db.String(32), index=True) | |
password_hash = db.Column(db.String(64)) | |
def hash_password(self, password): | |
self.password_hash = pwd_context.encrypt(password) | |
def verify_password(self, password): | |
return pwd_context.verify(password, self.password_hash) | |
def generate_auth_token(self, expiration=600): | |
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration) | |
return s.dumps({'id': self.id}) | |
@staticmethod | |
def verify_auth_token(token): | |
s = Serializer(app.config['SECRET_KEY']) | |
try: | |
data = s.loads(token) | |
except SignatureExpired: | |
return None | |
except BadSignature: | |
return None | |
user = User.query.get(data['id']) | |
return user | |
@auth.verify_password | |
def verify_password(username_or_token, password): | |
user = User.verify_auth_token(username_or_token) | |
if not user: | |
user = User.query.filter_by(username=username_or_token).first() | |
if not user or not user.verify_password(password): | |
return False | |
g.user = user | |
return True | |
@app.route('/api/users', methods=['POST']) | |
def new_user(): | |
username = request.json.get('username') | |
password = request.json.get('password') | |
if username is None or password is None: | |
abort(400) | |
if User.query.filter_by(username=username).first() is not None: | |
abort(400) # exists user | |
user = User(username=username) | |
user.hash_password(password) | |
db.session.add(user) | |
db.session.commit() | |
return (jsonify({'username': username}), | |
201, | |
{'Location': url_for('get_user', id=user.id, _external=True)}) | |
@app.route('/api/users/<int:id>') | |
def get_user(id): | |
user = User.query.get(id) | |
if not user: | |
abort(400) | |
return jsonify({"username": user.username}) | |
@app.route('/api/token') | |
@auth.login_required | |
def get_auth_token(): | |
token = g.user.generate_auth_token(600) | |
return jsonify({'token': token.decode('ascii'), 'duration': 600}) | |
@app.route('/api/resource') | |
@auth.login_required | |
def get_resource(): | |
return jsonify({'data': 'Hello, %s!' % g.user.username}) | |
if __name__ == '__main__': | |
if not os.path.exists('db.sqlite'): | |
db.create_all() | |
app.run(debug=True) | |
""" | |
获取Token:$ curl -u miguel:python -i -X GET http://127.0.0.1:5000/api/token | |
通过TOken访问:$ curl -u HJ5uRINh2JSohKYDRT472wGOvjc:unused -i -X GET http://127.0.0.1:5000/api/resource | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
:Author: beginman.cn | |
:Mail: pythonsuper@gmail.com | |
:Created Time: 2017-02-07 12:15:58 | |
:Last modified: 2017-02-07 14:54:45 | |
:Copyright: (c) 2016 by beginman. | |
:License: MIT, see LICENSE for more details. | |
""" | |
from flask import Flask, abort, make_response, jsonify | |
from flask_restful import Api, Resource, reqparse, fields, marshal | |
from flask_httpauth import HTTPBasicAuth | |
# task_fields 结构用于作为 marshal 函数的模板。fields.Uri 是一个用于生成一个 | |
# URL 的特定的参数。 它需要的参数是 endpoint。 | |
task_fields = { | |
'title': fields.String, | |
'description': fields.String, | |
'done': fields.Boolean, | |
'uri': fields.Url('task') | |
} | |
app = Flask(__name__) | |
api = Api(app) | |
auth = HTTPBasicAuth() | |
@auth.get_password | |
def get_password(username): | |
""" | |
回调函数,Flask-HTTPAuth使用它来获取给定用户的密码 | |
User: admin, Password: python | |
""" | |
if username == 'admin': | |
return 'python' | |
return None | |
@auth.error_handler | |
def unauthorized(): | |
""" | |
error_handler 回调函数是用于给客户端发送未授权错误代码 | |
当请求收到一个 401 | |
的错误,网页浏览都会跳出一个丑陋的登录框,客户端应用程序应该自己处理登录 | |
所以把401错误改成403 | |
""" | |
return make_response(jsonify({'error': 'Unauthorized access'}), 403) | |
tasks = [ | |
{ | |
'id': 1, | |
'title': u'Buy groceries', | |
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol', | |
'done': False | |
}, | |
{ | |
'id': 2, | |
'title': u'Learn Python', | |
'description': u'Need to find a good Python tutorial on the web', | |
'done': False | |
} | |
] | |
class TaskListAPI(Resource): | |
decorators = [auth.login_required] | |
def __init__(self): | |
""" | |
RequestParser 类默认情况下在 request.values | |
中查找参数,因此 location 可选参数必须被设置以表明请求过来的参数是 | |
request.json 格式的。 | |
""" | |
self.reqparse = reqparse.RequestParser() | |
self.reqparse.add_argument('title', type=str, required=True, | |
help='No task title provided', | |
location='json') | |
self.reqparse.add_argument('description', type=str, default='', | |
location='json') | |
super(TaskListAPI, self).__init__() | |
def get(self): | |
return {'tasks': marshal(tasks, task_fields)} | |
def post(self): | |
args = self.reqparse.parse_args() | |
args['id'] = tasks[-1]['id'] + 1 | |
args['done'] = False | |
tasks.append(args) | |
return self.get() | |
class TaskAPI(Resource): | |
decorators = [auth.login_required] | |
def __init__(self): | |
self.reqparse = reqparse.RequestParser() | |
self.reqparse.add_argument('title', type=str, location='json') | |
self.reqparse.add_argument('description', type=str, location='json') | |
self.reqparse.add_argument('done', type=bool, location='json') | |
super(TaskAPI, self).__init__() | |
def get(self, id): | |
task = filter(lambda t: t['id'] == id, tasks) | |
if len(task) == 0: | |
abort(404) | |
return {'task': task} | |
def put(self, id): | |
task = filter(lambda t: t['id'] == id, tasks) | |
if len(task) == 0: | |
abort(404) | |
task = task[0] | |
args = self.reqparse.parse_args() | |
for k, v in args.iteritems(): | |
if v is not None: | |
task[k] = v | |
return {'status': 'ok'} | |
def delete(self, id): | |
task = filter(lambda t: t['id'] == id, tasks) | |
if len(task) == 0: | |
abort(404) | |
tasks.remove(task[0]) | |
return {'result': True}, 200 | |
api.add_resource(TaskListAPI, '/todo/api/v1.0/tasks', endpoint='tasks') | |
api.add_resource(TaskAPI, '/todo/api/v1.0/tasks/<int:id>', endpoint='task') | |
if __name__ == "__main__": | |
app.run(debug=True) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Flask==0.12 | |
Flask-HTTPAuth | |
SQLAlchemy | |
passlib |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
:Author: beginman.cn | |
:Mail: pythonsuper@gmail.com | |
:Created Time: 2017-02-07 12:15:58 | |
:Last modified: 2017-02-07 12:21:34 | |
:Copyright: (c) 2016 by beginman. | |
:License: MIT, see LICENSE for more details. | |
""" | |
from flask import Flask, jsonify, abort, make_response, request, url_for | |
from flask_httpauth import HTTPBasicAuth | |
auth = HTTPBasicAuth() | |
app = Flask(__name__) | |
@auth.get_password | |
def get_password(username): | |
""" | |
回调函数,Flask-HTTPAuth使用它来获取给定用户的密码 | |
User: admin, Password: python | |
""" | |
if username == 'admin': | |
return 'python' | |
return None | |
@auth.error_handler | |
def unauthorized(): | |
""" | |
error_handler 回调函数是用于给客户端发送未授权错误代码 | |
当请求收到一个 401 | |
的错误,网页浏览都会跳出一个丑陋的登录框,客户端应用程序应该自己处理登录 | |
所以把401错误改成403 | |
""" | |
return make_response(jsonify({'error': 'Unauthorized access'}), 403) | |
tasks = [ | |
{ | |
'id': 1, | |
'title': u'Buy groceries', | |
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol', | |
'done': False | |
}, | |
{ | |
'id': 2, | |
'title': u'Learn Python', | |
'description': u'Need to find a good Python tutorial on the web', | |
'done': False | |
} | |
] | |
def make_public_task(task): | |
""" | |
辅助函数返回控制这些任务的完整URL给客户端使用 | |
该函数生成一个"公共"版本任务 | |
""" | |
new_tasks = {} | |
for field in task: | |
if field == 'id': | |
new_tasks['uri'] = url_for('get_task', task_id=task['id'], | |
_external=True) | |
else: | |
new_tasks[field] = task[field] | |
return new_tasks | |
@app.errorhandler(404) | |
def not_fount(error): | |
return make_response(jsonify({'error': 'Not found'}), 404) | |
@app.route('/todo/api/v1.0/tasks', methods=['GET']) | |
def index(): | |
return jsonify({'tasks': map(make_public_task, tasks)}) | |
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET']) | |
def get_task(task_id): | |
task = filter(lambda t: t['id'] == task_id, tasks) | |
if len(task) == 0: | |
abort(404) | |
return jsonify({'task': task}) | |
@app.route('/todo/api/v1.0/tasks', methods=['POST']) | |
@auth.login_required | |
def create_task(): | |
if not request.json or 'title' not in request.json: | |
abort(400) | |
task = { | |
'id': tasks[-1]['id'] + 1, | |
'title': request.json['title'], | |
'description': request.json.get('description', ''), | |
'done': False | |
} | |
tasks.append(task) | |
return jsonify({'task': task}, 200) | |
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['PUT']) | |
@auth.login_required | |
def update_task(task_id): | |
task = filter(lambda t: t['id'] == task_id, tasks) | |
if len(task) == 0: | |
abort(404) | |
print(task) | |
if not request.json: | |
abort(400) | |
fields = { | |
'title': unicode, | |
'description': unicode, | |
'done': bool | |
} | |
for k, v in fields.items(): | |
if k in request.json and type(request.json[k]) != v: | |
abort(400) | |
task[0][k] = request.json.get(k, task[0][k]) | |
return jsonify({'task': task[0]}) | |
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['DELETE']) | |
@auth.login_required | |
def delete_task(task_id): | |
task = filter(lambda t: t['id'] == task_id, tasks) | |
if len(task) == 0: | |
abort(404) | |
tasks.remove(task[0]) | |
return jsonify({'result': True}) | |
if __name__ == "__main__": | |
app.run(debug=True) | |
""" | |
# 获取所有 | |
$ curl -i http://127.0.0.1:5000/todo/api/v1.0/tasks | |
# 获取指定 | |
$ curl -i http://127.0.0.1:5000/todo/api/v1.0/tasks/2 | |
# 更新 | |
$ curl -u admin:python -i -H "Content-Type: application/json" -X PUT -d '{"done": true}' http://127.0.0.1:5000/todo/api/v1.0/tasks/2 | |
# 删除 | |
$ curl -i -X DELETE http://127.0.0.1:5000/todo/api/v1.0/tasks/2 | |
# 新增 | |
$ curl -i -H "Content-Type: application/json" -X POST -d '{"title":"Read a book"}' http://127.0.0.1:5000/todo/api/v1.0/tasks | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment