Skip to content

Instantly share code, notes, and snippets.

@hustlzp
Last active September 7, 2017 09:03
Show Gist options
  • Save hustlzp/011be36d7bd950f21c17 to your computer and use it in GitHub Desktop.
Save hustlzp/011be36d7bd950f21c17 to your computer and use it in GitHub Desktop.
Flask permission control
# 用于view的装饰器
from ..utils.permissions import VisitorPermission
@bp.route('/signin', methods=['GET', 'POST'])
@VisitorPermission()
def signin():
"""登陆"""
form = SigninForm()
if form.validate_on_submit():
signin_user(form.user)
return redirect(url_for('site.index'))
return render_template('account/signin.html', form=form)
# 用于view代码中
from ..utils.permissions import QuestionAdminPermission
@bp.route('/question/<int:uid>/delete')
def delete_question(uid):
"""删除问题"""
question = Question.query.get_or_404(uid)
permission = QuestionAdminPermission(uid)
if not permission.check():
return permission.deny()
db.session.delete(question)
db.session.commit()
return redirect(url_for('site.index'))
# 用于Jinja2中
def register_jinja(app):
"""注册Jinja2全局变量"""
from .utils import permissions
# inject vars into template context
@app.context_processor
def inject_vars():
return dict(
g_permissions=permissions,
)
"""
{% if g_permissions.QuestionAdminPermission(question.id).check() %}
<a class="btn btn-default confirm" data-confirm="确认删除此问题?"
href="{{ url_for('qa.delete_question', uid=question.id) }}">
<span class="fa fa-trash-o"></span> 删除
</a>
{% endif %}
"""
# coding: utf-8
from functools import wraps
class Permission(object):
def __call__(self, func):
"""提供view装饰器能力"""
@wraps(func)
def decorator(*args, **kwargs):
if not self.check():
return self.deny()
return func(*args, **kwargs)
return decorator
def check(self):
"""运行规则"""
result, self.deny = self.rule.run()
return result
def show(self):
"""显示self.rule的规则结构,调试用"""
self.rule.show()
class Rule(object):
def __init__(self):
self.rules_list = [[(self.check, self.deny)]]
# 若子类实现了base方法,则将其返回的rule实例的rules_list串联到self.rules_list上游
base_rule = self.base()
if base_rule:
self.rules_list = Rule._and(base_rule.rules_list, self.rules_list)
def __and__(self, other):
"""逻辑与操作(&)
将other.rules_list串联到self.rules_list的下游,
并返回当前实例。"""
self.rules_list = Rule._and(self.rules_list, other.rules_list)
return self
def __or__(self, other):
"""逻辑或操作(|)
将self.rules_list与other.rules_list并联起来,
并返回当前实例"""
for rule in other.rules_list:
self.rules_list.append(rule)
return self
def show(self):
"""显示rules_list的结构"""
for rule in self.rules_list:
result = ", ".join([check.__repr__() for check, deny in rule])
print(result)
def base(self):
"""提供rule规则继承能力(串联)"""
return None
def run(self):
"""运行rules_list。
若某一通道check通过,则返回成功状态
若所有通道都无法check通过,则返回失败状态(包含最后运行失败的rule的deny方法)"""
failed_result = None
for rule in self.rules_list:
for check, deny in rule:
if not check():
failed_result = (False, deny)
break
else:
return (True, None)
return failed_result
def check(self):
"""当前rule的测试方法,强制子类overload"""
raise NotImplementedError()
def deny(self):
"""当前rule测试失败后需要执行的方法,强制子类overload"""
raise NotImplementedError()
@staticmethod
def _and(rules_list_pre, rules_list_pro):
"""将rule_list_pre串联到rule_list_pro上游"""
return [rule_pre + rule_pro
for rule_pre in rules_list_pre
for rule_pro in rules_list_pro]
# coding: utf-8
from .flask_permission import Permission
from .rules import VisitorRule, UserRule, AdminRule, QuestionOwnerRule
class VisitorPermission(Permission):
"""游客许可"""
def __init__(self):
self.rule = VisitorRule()
class UserPermission(Permission):
"""注册用户许可"""
def __init__(self):
self.rule = UserRule()
class AdminPermission(Permission):
"""管理员许可"""
def __init__(self):
self.rule = AdminRule()
class QuestionAdminPermission(Permission):
"""Question管理许可
Question拥有者/管理员均有该许可
"""
def __init__(self, question_id):
self.rule = AdminRule() | QuestionOwnerRule(question_id)
# coding: utf-8
from flask import redirect, url_for, abort, flash, g
from ..models import Question
from .flask_permission import Rule
class VisitorRule(Rule):
"""游客"""
def check(self):
return not g.user
def deny(self):
return redirect(url_for('site.index'))
class UserRule(Rule):
"""注册用户"""
def check(self):
return g.user
def deny(self):
flash('此操作需要登录账户')
return redirect(url_for('account.signin'))
class AdminRule(Rule):
"""管理员"""
def base(self):
return UserRule()
def check(self):
return g.user.is_admin
def deny(self):
abort(403)
class QuestionOwnerRule(Rule):
"""问题拥有者"""
def __init__(self, question_id):
self.question_id = question_id
Rule.__init__(self)
def base(self):
return UserRule()
def check(self):
question = Question.query.filter(Question.id == self.question_id).first()
return question and question.user_id == g.user.id
def deny(self):
abort(403)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment