|
|
from typing import Union, List, Callable, Optional
|
|
|
from functools import wraps
|
|
|
from flask_jwt_extended import jwt_required, current_user
|
|
|
from flask import current_app
|
|
|
from iti.applications.common.exceptions.permission import PermissionDeniedException
|
|
|
|
|
|
|
|
|
# ==================== 配置读取工具 ====================
|
|
|
|
|
|
|
|
|
def get_permission_config(key: str, default=None):
|
|
|
"""
|
|
|
获取权限配置项
|
|
|
|
|
|
Args:
|
|
|
key: 配置键名
|
|
|
default: 默认值
|
|
|
|
|
|
Returns:
|
|
|
配置值,如果不存在则返回默认值
|
|
|
"""
|
|
|
try:
|
|
|
permission_config = current_app.config.get("PERMISSION_CONFIG", {})
|
|
|
return permission_config.get(key, default)
|
|
|
except RuntimeError:
|
|
|
# 不在 Flask 应用上下文中,返回默认值
|
|
|
return default
|
|
|
|
|
|
|
|
|
def get_super_admin_role() -> str:
|
|
|
"""获取超级管理员角色代码"""
|
|
|
return get_permission_config("SUPER_ADMIN_ROLE", "ADMIN")
|
|
|
|
|
|
|
|
|
def get_default_error_message() -> str:
|
|
|
"""获取默认错误消息"""
|
|
|
return get_permission_config("DEFAULT_ERROR_MESSAGE", "权限不足")
|
|
|
|
|
|
|
|
|
def get_default_error_code() -> int:
|
|
|
"""获取默认错误代码"""
|
|
|
return get_permission_config("DEFAULT_ERROR_CODE", 403)
|
|
|
|
|
|
|
|
|
def get_skip_super_admin_default() -> bool:
|
|
|
"""获取是否跳过超级管理员检查的默认值"""
|
|
|
return get_permission_config("SKIP_SUPER_ADMIN_DEFAULT", True)
|
|
|
|
|
|
|
|
|
class PermissionChecker:
|
|
|
"""
|
|
|
权限检查器基类(策略模式)
|
|
|
"""
|
|
|
|
|
|
def check(self, user, value) -> bool:
|
|
|
"""
|
|
|
检查用户是否满足权限要求
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象
|
|
|
value: 权限值(权限码、角色码、部门ID等)
|
|
|
|
|
|
Returns:
|
|
|
bool: 是否满足权限要求
|
|
|
"""
|
|
|
raise NotImplementedError("子类必须实现 check 方法")
|
|
|
|
|
|
|
|
|
class PermissionCodeChecker(PermissionChecker):
|
|
|
"""
|
|
|
权限字符检查器
|
|
|
检查用户是否拥有指定的权限码
|
|
|
"""
|
|
|
|
|
|
def check(self, user, permissions) -> bool:
|
|
|
"""
|
|
|
检查用户权限
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象
|
|
|
permissions: 单个权限码或权限码列表
|
|
|
|
|
|
Returns:
|
|
|
bool: 用户是否拥有指定权限
|
|
|
"""
|
|
|
if not user or not hasattr(user, "permissions"):
|
|
|
return False
|
|
|
|
|
|
# 统一转为列表处理
|
|
|
if isinstance(permissions, str):
|
|
|
permissions = [permissions]
|
|
|
|
|
|
user_permissions = set(user.permissions or [])
|
|
|
return any(perm in user_permissions for perm in permissions)
|
|
|
|
|
|
|
|
|
class RoleChecker(PermissionChecker):
|
|
|
"""
|
|
|
角色检查器
|
|
|
检查用户是否拥有指定的角色
|
|
|
"""
|
|
|
|
|
|
def check(self, user, roles: Union[str, List[str]]) -> bool:
|
|
|
"""
|
|
|
检查用户角色
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象
|
|
|
roles: 单个角色码或角色码列表
|
|
|
|
|
|
Returns:
|
|
|
bool: 用户是否拥有指定角色
|
|
|
"""
|
|
|
if not user or not hasattr(user, "roles"):
|
|
|
return False
|
|
|
|
|
|
# 统一转为列表处理
|
|
|
if isinstance(roles, str):
|
|
|
roles = [roles]
|
|
|
|
|
|
user_role_codes = set(role.code for role in user.roles if hasattr(role, "code"))
|
|
|
return any(role in user_role_codes for role in roles)
|
|
|
|
|
|
|
|
|
class FunctionChecker(PermissionChecker):
|
|
|
"""
|
|
|
函数检查器
|
|
|
执行用户提供的自定义验证函数
|
|
|
"""
|
|
|
|
|
|
def check(self, user, func: Callable) -> bool:
|
|
|
"""
|
|
|
执行自定义检查函数
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象
|
|
|
func: 自定义检查函数,接收 user 参数,返回 bool
|
|
|
|
|
|
Returns:
|
|
|
bool: 自定义函数的返回值
|
|
|
"""
|
|
|
if not callable(func):
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
return bool(func(user))
|
|
|
except Exception:
|
|
|
# 自定义函数执行异常时,视为权限检查失败
|
|
|
return False
|
|
|
|
|
|
|
|
|
class PermissionValidator:
|
|
|
"""
|
|
|
权限验证器
|
|
|
组合多个检查器,根据逻辑规则(AND/OR)进行综合判断
|
|
|
"""
|
|
|
|
|
|
# 检查器类型映射
|
|
|
CHECKER_MAP = {
|
|
|
"permissions": PermissionCodeChecker,
|
|
|
"roles": RoleChecker,
|
|
|
"function": FunctionChecker,
|
|
|
}
|
|
|
|
|
|
def __init__(self, logic: str = "AND", skip_super_admin: bool = True):
|
|
|
"""
|
|
|
初始化验证器
|
|
|
|
|
|
Args:
|
|
|
logic: 逻辑组合方式,"AND" 或 "OR"
|
|
|
skip_super_admin: 是否跳过超级管理员的权限检查
|
|
|
"""
|
|
|
self.logic = logic.upper()
|
|
|
self.skip_super_admin = skip_super_admin
|
|
|
self.checkers = [] # [(checker_instance, value), ...]
|
|
|
|
|
|
def add_checker(self, checker_type: str, value):
|
|
|
"""
|
|
|
添加检查器
|
|
|
|
|
|
Args:
|
|
|
checker_type: 检查器类型(permissions/roles/function)
|
|
|
value: 检查值
|
|
|
"""
|
|
|
if checker_type not in self.CHECKER_MAP:
|
|
|
raise ValueError(f"不支持的检查器类型: {checker_type}")
|
|
|
|
|
|
checker_class = self.CHECKER_MAP[checker_type]
|
|
|
checker_instance = checker_class()
|
|
|
self.checkers.append((checker_instance, value))
|
|
|
|
|
|
def validate(self, user) -> bool:
|
|
|
"""
|
|
|
执行权限验证
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象
|
|
|
|
|
|
Returns:
|
|
|
bool: 是否通过验证
|
|
|
"""
|
|
|
# 用户不存在,验证失败
|
|
|
if not user:
|
|
|
return False
|
|
|
|
|
|
# 超级管理员跳过权限检查
|
|
|
if self.skip_super_admin and self._is_super_admin(user):
|
|
|
return True
|
|
|
|
|
|
# 没有添加任何检查器,默认通过
|
|
|
if not self.checkers:
|
|
|
return True
|
|
|
|
|
|
# 执行所有检查器
|
|
|
results = [checker.check(user, value) for checker, value in self.checkers]
|
|
|
|
|
|
# 根据逻辑规则返回结果
|
|
|
if self.logic == "AND":
|
|
|
return all(results) # 全部通过
|
|
|
elif self.logic == "OR":
|
|
|
return any(results) # 任一通过
|
|
|
else:
|
|
|
raise ValueError(f"不支持的逻辑类型: {self.logic}")
|
|
|
|
|
|
def _is_super_admin(self, user) -> bool:
|
|
|
"""
|
|
|
判断是否为超级管理员
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象
|
|
|
|
|
|
Returns:
|
|
|
bool: 是否为超级管理员
|
|
|
"""
|
|
|
if not hasattr(user, "roles"):
|
|
|
return False
|
|
|
|
|
|
# 从配置中获取超级管理员角色代码
|
|
|
super_admin_role = get_super_admin_role()
|
|
|
|
|
|
# 检查是否有超级管理员角色
|
|
|
return any(
|
|
|
role.code == super_admin_role
|
|
|
for role in user.roles
|
|
|
if hasattr(role, "code")
|
|
|
)
|
|
|
|
|
|
|
|
|
def permission(
|
|
|
permissions: Optional[Union[str, List[str]]] = None,
|
|
|
roles: Optional[Union[str, List[str]]] = None,
|
|
|
func: Optional[Callable] = None,
|
|
|
logic: str = "AND",
|
|
|
error_message: Optional[str] = None,
|
|
|
error_code: Optional[int] = None,
|
|
|
skip_super_admin: Optional[bool] = None,
|
|
|
):
|
|
|
"""
|
|
|
权限检查装饰器
|
|
|
|
|
|
用于路由函数,在函数执行前进行权限验证
|
|
|
|
|
|
Args:
|
|
|
permissions: 权限码(单个或列表)
|
|
|
roles: 角色码(单个或列表)
|
|
|
depts: 部门ID(单个或列表)
|
|
|
func: 自定义检查函数
|
|
|
logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一)
|
|
|
error_message: 权限不足时的错误消息
|
|
|
error_code: 权限不足时的错误代码
|
|
|
skip_super_admin: 是否跳过超级管理员的权限检查
|
|
|
|
|
|
Returns:
|
|
|
装饰器函数
|
|
|
|
|
|
Examples:
|
|
|
>>> @require_permission(permissions="user:create")
|
|
|
>>> def create_user():
|
|
|
>>> pass
|
|
|
|
|
|
>>> @require_permission(roles=["admin", "manager"], logic="OR")
|
|
|
>>> def manage_system():
|
|
|
>>> pass
|
|
|
|
|
|
>>> @require_permission(
|
|
|
>>> permissions="data:export",
|
|
|
>>> roles="admin",
|
|
|
>>> logic="AND"
|
|
|
>>> )
|
|
|
>>> def export_data():
|
|
|
>>> pass
|
|
|
"""
|
|
|
|
|
|
def decorator(ff):
|
|
|
@jwt_required()
|
|
|
@wraps(ff)
|
|
|
def wrapper(*args, **kwargs):
|
|
|
# 从配置读取默认值(如果参数未提供)
|
|
|
_error_message = (
|
|
|
error_message
|
|
|
if error_message is not None
|
|
|
else get_default_error_message()
|
|
|
)
|
|
|
_error_code = (
|
|
|
error_code if error_code is not None else get_default_error_code()
|
|
|
)
|
|
|
_skip_super_admin = (
|
|
|
skip_super_admin
|
|
|
if skip_super_admin is not None
|
|
|
else get_skip_super_admin_default()
|
|
|
)
|
|
|
|
|
|
# 获取当前用户
|
|
|
user = current_user
|
|
|
|
|
|
# 创建验证器
|
|
|
validator = PermissionValidator(
|
|
|
logic=logic, skip_super_admin=_skip_super_admin
|
|
|
)
|
|
|
|
|
|
# 添加检查器
|
|
|
if permissions is not None:
|
|
|
validator.add_checker("permissions", permissions)
|
|
|
if roles is not None:
|
|
|
validator.add_checker("roles", roles)
|
|
|
if func is not None:
|
|
|
validator.add_checker("function", func)
|
|
|
|
|
|
# 执行验证
|
|
|
if not validator.validate(user):
|
|
|
raise PermissionDeniedException(_error_message, _error_code)
|
|
|
|
|
|
# 权限验证通过,执行原函数
|
|
|
return ff(*args, **kwargs)
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
def check_permission(
|
|
|
user=None,
|
|
|
permissions: Optional[Union[str, List[str]]] = None,
|
|
|
roles: Optional[Union[str, List[str]]] = None,
|
|
|
func: Optional[Callable] = None,
|
|
|
logic: str = "AND",
|
|
|
skip_super_admin: Optional[bool] = None,
|
|
|
) -> bool:
|
|
|
"""
|
|
|
快捷权限检查方法
|
|
|
|
|
|
用于在函数内部进行权限判断,返回布尔值
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象,不传则使用 current_user
|
|
|
permissions: 权限码(单个或列表)
|
|
|
roles: 角色码(单个或列表)
|
|
|
func: 自定义检查函数
|
|
|
logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一)
|
|
|
skip_super_admin: 是否跳过超级管理员的权限检查
|
|
|
|
|
|
Returns:
|
|
|
bool: 是否通过权限检查
|
|
|
|
|
|
Examples:
|
|
|
>>> if check_permission(permissions="user:delete"):
|
|
|
>>> # 执行删除逻辑
|
|
|
>>> pass
|
|
|
|
|
|
>>> if check_permission(roles=["admin", "manager"], logic="OR"):
|
|
|
>>> # 管理员或经理可见
|
|
|
>>> pass
|
|
|
|
|
|
>>> if check_permission(
|
|
|
>>> permissions="data:export",
|
|
|
>>> roles="admin",
|
|
|
>>> logic="AND"
|
|
|
>>> ):
|
|
|
>>> # 导出数据
|
|
|
>>> pass
|
|
|
"""
|
|
|
# 从配置读取默认值(如果参数未提供)
|
|
|
_skip_super_admin = (
|
|
|
skip_super_admin
|
|
|
if skip_super_admin is not None
|
|
|
else get_skip_super_admin_default()
|
|
|
)
|
|
|
|
|
|
# 如果未传入用户,使用当前用户
|
|
|
if user is None:
|
|
|
user = current_user
|
|
|
|
|
|
# 创建验证器
|
|
|
validator = PermissionValidator(logic=logic, skip_super_admin=_skip_super_admin)
|
|
|
|
|
|
# 添加检查器
|
|
|
if permissions is not None:
|
|
|
validator.add_checker("permissions", permissions)
|
|
|
if roles is not None:
|
|
|
validator.add_checker("roles", roles)
|
|
|
if func is not None:
|
|
|
validator.add_checker("function", func)
|
|
|
|
|
|
# 执行验证并返回结果
|
|
|
return validator.validate(user)
|
|
|
|
|
|
|
|
|
def check_permission_or_raise(
|
|
|
user=None,
|
|
|
permissions: Optional[Union[str, List[str]]] = None,
|
|
|
roles: Optional[Union[str, List[str]]] = None,
|
|
|
func: Optional[Callable] = None,
|
|
|
logic: str = "AND",
|
|
|
error_message: Optional[str] = None,
|
|
|
error_code: Optional[int] = None,
|
|
|
skip_super_admin: Optional[bool] = None,
|
|
|
):
|
|
|
"""
|
|
|
快捷权限检查方法(失败时抛出异常)
|
|
|
|
|
|
用于在函数内部进行权限判断,不通过时直接抛出异常
|
|
|
|
|
|
Args:
|
|
|
user: 用户对象,不传则使用 current_user
|
|
|
permissions: 权限码(单个或列表)
|
|
|
roles: 角色码(单个或列表)
|
|
|
func: 自定义检查函数
|
|
|
logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一)
|
|
|
error_message: 权限不足时的错误消息
|
|
|
error_code: 权限不足时的错误代码
|
|
|
skip_super_admin: 是否跳过超级管理员的权限检查
|
|
|
|
|
|
Raises:
|
|
|
PermissionDeniedException: 权限不足时抛出
|
|
|
|
|
|
Examples:
|
|
|
>>> check_permission_or_raise(permissions="user:delete")
|
|
|
>>> # 继续执行删除逻辑
|
|
|
"""
|
|
|
# 从配置读取默认值(如果参数未提供)
|
|
|
_error_message = (
|
|
|
error_message if error_message is not None else get_default_error_message()
|
|
|
)
|
|
|
_error_code = error_code if error_code is not None else get_default_error_code()
|
|
|
_skip_super_admin = (
|
|
|
skip_super_admin
|
|
|
if skip_super_admin is not None
|
|
|
else get_skip_super_admin_default()
|
|
|
)
|
|
|
|
|
|
if not check_permission(
|
|
|
user=user,
|
|
|
permissions=permissions,
|
|
|
roles=roles,
|
|
|
func=func,
|
|
|
logic=logic,
|
|
|
skip_super_admin=_skip_super_admin,
|
|
|
):
|
|
|
raise PermissionDeniedException(_error_message, _error_code)
|