权限管理是一个很常见的功能模块,本文基于RBAC模型针对于多用户,多角色,多权限的场景,介绍一种Flask权限管理方案。html
Flask系列文章:python
本文将在开发初探的代码基础上进行重构。git
在本文所述场景中,具体的权限管理是:权限和角色关联,给用户添加角色,用户即拥有角色的权限,也就是基于角色的权限控制。固然,若须要基于用户的权限控制也是能够的,只须要修改下相关数据结构便可。github
具体的权限验证采用了位运算,将权限值用十六进制表示,每一个角色拥有一个权限总值,当判断该角色是否有特定权限时:json
In [1]: permission = 0X02 In [2]: permissions = 0X0D In [3]: print((permissions & permission) == permission) False In [4]: permissions = 0X07 In [5]: print((permissions & permission) == permission) True
返回值为True表示拥有该权限,False为没有该权限,原理与位运算的原理有关。flask
0x07 = 0x01 + 0x02 + 0x04安全
转换为二进制数值能够看作是:0111 = 0001 + 0010 + 0100session
按照位运算,运算符&(按位与)相应位都为1,则该位为1,不然为0,那么权限总值和权限值执行按位与运算,结果恒为权限值时才能得出拥有该权限。数据结构
首先,针对以上场景,咱们建立数据表。app
建立用户表,保存用户信息和对应的角色:
class User(db.Model): """ 用户表 """ __tablename__ = "user" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(128), unique=True) email = db.Column(db.String(128)) password = db.Column(db.String(128)) role_id = db.Column(db.Integer) def __init__(self, name, email, password): self.name = name self.email = email self.password = bcrypt_sha256.encrypt(str(password))
建立权限类,赋予每种操做权限值,这里举例用户管理和更新权限:
class Permissions: """ 权限类 """ USER_MANAGE = 0X01 UPDATE_PERMISSION = 0x02
须要建立角色表结构,咱们暂定两种角色:普通用户和管理员,并初始化角色和权限。
class Role(db.Model): """ 角色表 """ __tablename__ = "role" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(128), unique=True, commit="角色名") permissions = db.Column(db.Integer, commit="权限总值") @staticmethod def init_role(): role_name_list = ['user', 'admin'] roles_permission_map = { 'user': [Permissions.USER_MANAGE], 'admin': [Permissions.USER_MANAGE, Permissions.UPDATE_PERMISSION] } try: for role_name in role_name_list: role = Role.query.filter_by(name=role_name).first() if not role: role = Role(name=role_name) role.reset_permissions() for permission in roles_permission_map[role_name]: role.add_permission(permission) db.session.add(role) db.session.commit() except: db.session.rollback() db.session.close() def reset_permissions(self): self.permissions = 0 def has_permission(self, permission): return self.permissions & permission == permission def add_permission(self, permission): if not self.has_permission(permission): self.permissions += permission
随着应用更新,权限值会不断增长,角色对应的权限值随之增大,为了保证每次更新同步到表,能够在flask应用初始化时添加:
Role.init_role()
这样,咱们就赋予了每一个角色其拥有的权限值。
重启应用,能够看到role表:
前期数据准备稳当了,接下来就是鉴权。
为了保证访问的安全性,须要对接口和权限进行关联绑定,我尝试过两种方案:
1. 装饰器
封装装饰器,对接口视图函数进行装饰,装饰器传入权限值做为参数,在装饰器中根据用户角色的权限和权限值进行对比,判断该用户是否有该接口的访问权限。
刚开始我是用这种方式的,小型应用接口很少的场景下使用还好,但随着应用越来越复杂,赋权操做就有点繁琐。
2. 接口赋权
这是我在装饰器以后想到的一种方式,在大型应用接口比较多的状况下比较推荐,并且这种方式耦合度低,易于扩展。
具体操做:首先,将接口地址和权限关联,接口比较多的话,推荐用蓝图,基本上保证一个蓝图中的接口是一个权限,这样操做会简单一些,而后,在应用初始化时将接口地址和权限入库,这样能够保证每次重启应用后数据都是最新的,最后,当用户登陆时,会根据用户角色和请求的地址判断其是否有权限访问。
以上两种方式,今天以装饰器鉴权举例说明。
首先,建立鉴权装饰器:
from functools import wraps from flask import session, abort from app.models import db, Users, Role Permission_code = [0X01, 0X02] def permission_can(current_user, permission): """ 检测用户是否有特定权限 :param current_user :param permission :return: """ role_id = current_user.role_id role = db.session.query(Role).filter_by(id=role_id).first() return (role.permissions & permission) == permission def permission_required(permission): """ 权限认证装饰器 :param permission: :return: """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): try: current_user = Users.query.filter_by(id=session.get('user_id')).first() if not current_user and permission_can(current_user, permission): abort(403) return f(*args, **kwargs) except: abort(403) return decorated_function return decorator
其中,用到了flask session,获取当前登陆用户的user_id,根据当前用户的角色判断其是否拥有该权限permission。
而后在视图函数上添加该装饰器,就能够鉴权了。举例用户管理功能:
@user.route('/user-manage', methods=['POST', 'GET']) @permission_required(Permissions.USER_MANAGE) def user_manage(): """ 用户管理 :return: """ if request.method == 'POST': # 处理... ret_data = dict(code=0, ret_msg='user manage') else: # 数据处理 ... ret_data = dict(code=0, ret_msg='user list') return jsonify(ret_data)
最后,分别构造请求,访问接口测试:
import requests session = requests.Session() # login login_url = 'http://0.0.0.0:9001/login' login_data = dict(user='test', pwd='pwd') login_request = session.post(login_url, json=login_data) print(login_request.json()) # user_manage user_manage_url = 'http://0.0.0.0:9001/user-manage' login_request = session.post(user_manage_url) print(login_request.json()) # permission_manege permission_manage_url = 'http://0.0.0.0:9001/permission-manage' login_request = session.post(permission_manage_url) print(login_request.json())
具体代码见 my github
以上。