Create a gist now

Instantly share code, notes, and snippets.

Flask 实现简单的Restful APi
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:Author: beginman.cn
:Mail: pythonsuper@gmail.com
:Created Time: 2017-02-07 16:16:36
:Last modified: 2017-02-07 16:16:36
:Copyright: (c) 2016 by beginman.
:License: MIT, see LICENSE for more details.
"""
import os
from flask import Flask, abort, request, jsonify, g, url_for
from flask_httpauth import HTTPBasicAuth
from flask_sqlalchemy import SQLAlchemy
from passlib.apps import custom_app_context as pwd_context
from itsdangerous import (TimedJSONWebSignatureSerializer as Serializer,
BadSignature, SignatureExpired)
app = Flask(__name__)
app.config['SECRET_KEY'] = '!@#abcd*(,.x1209xm$%'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config.setdefault('SQLALCHEMY_TRACK_MODIFICATIONS', True)
# extensions
db = SQLAlchemy(app)
auth = HTTPBasicAuth()
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(32), index=True)
password_hash = db.Column(db.String(64))
def hash_password(self, password):
self.password_hash = pwd_context.encrypt(password)
def verify_password(self, password):
return pwd_context.verify(password, self.password_hash)
def generate_auth_token(self, expiration=600):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id})
@staticmethod
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
user = User.query.get(data['id'])
return user
@auth.verify_password
def verify_password(username_or_token, password):
user = User.verify_auth_token(username_or_token)
if not user:
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
return True
@app.route('/api/users', methods=['POST'])
def new_user():
username = request.json.get('username')
password = request.json.get('password')
if username is None or password is None:
abort(400)
if User.query.filter_by(username=username).first() is not None:
abort(400) # exists user
user = User(username=username)
user.hash_password(password)
db.session.add(user)
db.session.commit()
return (jsonify({'username': username}),
201,
{'Location': url_for('get_user', id=user.id, _external=True)})
@app.route('/api/users/<int:id>')
def get_user(id):
user = User.query.get(id)
if not user:
abort(400)
return jsonify({"username": user.username})
@app.route('/api/token')
@auth.login_required
def get_auth_token():
token = g.user.generate_auth_token(600)
return jsonify({'token': token.decode('ascii'), 'duration': 600})
@app.route('/api/resource')
@auth.login_required
def get_resource():
return jsonify({'data': 'Hello, %s!' % g.user.username})
if __name__ == '__main__':
if not os.path.exists('db.sqlite'):
db.create_all()
app.run(debug=True)
"""
获取Token:$ curl -u miguel:python -i -X GET http://127.0.0.1:5000/api/token
通过TOken访问:$ curl -u HJ5uRINh2JSohKYDRT472wGOvjc:unused -i -X GET http://127.0.0.1:5000/api/resource
"""
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:Author: beginman.cn
:Mail: pythonsuper@gmail.com
:Created Time: 2017-02-07 12:15:58
:Last modified: 2017-02-07 14:54:45
:Copyright: (c) 2016 by beginman.
:License: MIT, see LICENSE for more details.
"""
from flask import Flask, abort, make_response, jsonify
from flask_restful import Api, Resource, reqparse, fields, marshal
from flask_httpauth import HTTPBasicAuth
# task_fields 结构用于作为 marshal 函数的模板。fields.Uri 是一个用于生成一个
# URL 的特定的参数。 它需要的参数是 endpoint。
task_fields = {
'title': fields.String,
'description': fields.String,
'done': fields.Boolean,
'uri': fields.Url('task')
}
app = Flask(__name__)
api = Api(app)
auth = HTTPBasicAuth()
@auth.get_password
def get_password(username):
"""
回调函数,Flask-HTTPAuth使用它来获取给定用户的密码
User: admin, Password: python
"""
if username == 'admin':
return 'python'
return None
@auth.error_handler
def unauthorized():
"""
error_handler 回调函数是用于给客户端发送未授权错误代码
当请求收到一个 401
的错误,网页浏览都会跳出一个丑陋的登录框,客户端应用程序应该自己处理登录
所以把401错误改成403
"""
return make_response(jsonify({'error': 'Unauthorized access'}), 403)
tasks = [
{
'id': 1,
'title': u'Buy groceries',
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False
},
{
'id': 2,
'title': u'Learn Python',
'description': u'Need to find a good Python tutorial on the web',
'done': False
}
]
class TaskListAPI(Resource):
decorators = [auth.login_required]
def __init__(self):
"""
RequestParser 类默认情况下在 request.values
中查找参数,因此 location 可选参数必须被设置以表明请求过来的参数是
request.json 格式的。
"""
self.reqparse = reqparse.RequestParser()
self.reqparse.add_argument('title', type=str, required=True,
help='No task title provided',
location='json')
self.reqparse.add_argument('description', type=str, default='',
location='json')
super(TaskListAPI, self).__init__()
def get(self):
return {'tasks': marshal(tasks, task_fields)}
def post(self):
args = self.reqparse.parse_args()
args['id'] = tasks[-1]['id'] + 1
args['done'] = False
tasks.append(args)
return self.get()
class TaskAPI(Resource):
decorators = [auth.login_required]
def __init__(self):
self.reqparse = reqparse.RequestParser()
self.reqparse.add_argument('title', type=str, location='json')
self.reqparse.add_argument('description', type=str, location='json')
self.reqparse.add_argument('done', type=bool, location='json')
super(TaskAPI, self).__init__()
def get(self, id):
task = filter(lambda t: t['id'] == id, tasks)
if len(task) == 0:
abort(404)
return {'task': task}
def put(self, id):
task = filter(lambda t: t['id'] == id, tasks)
if len(task) == 0:
abort(404)
task = task[0]
args = self.reqparse.parse_args()
for k, v in args.iteritems():
if v is not None:
task[k] = v
return {'status': 'ok'}
def delete(self, id):
task = filter(lambda t: t['id'] == id, tasks)
if len(task) == 0:
abort(404)
tasks.remove(task[0])
return {'result': True}, 200
api.add_resource(TaskListAPI, '/todo/api/v1.0/tasks', endpoint='tasks')
api.add_resource(TaskAPI, '/todo/api/v1.0/tasks/<int:id>', endpoint='task')
if __name__ == "__main__":
app.run(debug=True)
Flask==0.12
Flask-HTTPAuth
SQLAlchemy
passlib
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:Author: beginman.cn
:Mail: pythonsuper@gmail.com
:Created Time: 2017-02-07 12:15:58
:Last modified: 2017-02-07 12:21:34
:Copyright: (c) 2016 by beginman.
:License: MIT, see LICENSE for more details.
"""
from flask import Flask, jsonify, abort, make_response, request, url_for
from flask_httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()
app = Flask(__name__)
@auth.get_password
def get_password(username):
"""
回调函数,Flask-HTTPAuth使用它来获取给定用户的密码
User: admin, Password: python
"""
if username == 'admin':
return 'python'
return None
@auth.error_handler
def unauthorized():
"""
error_handler 回调函数是用于给客户端发送未授权错误代码
当请求收到一个 401
的错误,网页浏览都会跳出一个丑陋的登录框,客户端应用程序应该自己处理登录
所以把401错误改成403
"""
return make_response(jsonify({'error': 'Unauthorized access'}), 403)
tasks = [
{
'id': 1,
'title': u'Buy groceries',
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False
},
{
'id': 2,
'title': u'Learn Python',
'description': u'Need to find a good Python tutorial on the web',
'done': False
}
]
def make_public_task(task):
"""
辅助函数返回控制这些任务的完整URL给客户端使用
该函数生成一个"公共"版本任务
"""
new_tasks = {}
for field in task:
if field == 'id':
new_tasks['uri'] = url_for('get_task', task_id=task['id'],
_external=True)
else:
new_tasks[field] = task[field]
return new_tasks
@app.errorhandler(404)
def not_fount(error):
return make_response(jsonify({'error': 'Not found'}), 404)
@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def index():
return jsonify({'tasks': map(make_public_task, tasks)})
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
task = filter(lambda t: t['id'] == task_id, tasks)
if len(task) == 0:
abort(404)
return jsonify({'task': task})
@app.route('/todo/api/v1.0/tasks', methods=['POST'])
@auth.login_required
def create_task():
if not request.json or 'title' not in request.json:
abort(400)
task = {
'id': tasks[-1]['id'] + 1,
'title': request.json['title'],
'description': request.json.get('description', ''),
'done': False
}
tasks.append(task)
return jsonify({'task': task}, 200)
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['PUT'])
@auth.login_required
def update_task(task_id):
task = filter(lambda t: t['id'] == task_id, tasks)
if len(task) == 0:
abort(404)
print(task)
if not request.json:
abort(400)
fields = {
'title': unicode,
'description': unicode,
'done': bool
}
for k, v in fields.items():
if k in request.json and type(request.json[k]) != v:
abort(400)
task[0][k] = request.json.get(k, task[0][k])
return jsonify({'task': task[0]})
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['DELETE'])
@auth.login_required
def delete_task(task_id):
task = filter(lambda t: t['id'] == task_id, tasks)
if len(task) == 0:
abort(404)
tasks.remove(task[0])
return jsonify({'result': True})
if __name__ == "__main__":
app.run(debug=True)
"""
# 获取所有
$ curl -i http://127.0.0.1:5000/todo/api/v1.0/tasks
# 获取指定
$ curl -i http://127.0.0.1:5000/todo/api/v1.0/tasks/2
# 更新
$ curl -u admin:python -i -H "Content-Type: application/json" -X PUT -d '{"done": true}' http://127.0.0.1:5000/todo/api/v1.0/tasks/2
# 删除
$ curl -i -X DELETE http://127.0.0.1:5000/todo/api/v1.0/tasks/2
# 新增
$ curl -i -H "Content-Type: application/json" -X POST -d '{"title":"Read a book"}' http://127.0.0.1:5000/todo/api/v1.0/tasks
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment