from typing import Union, List, Callable, Optional from functools import wraps from flask_jwt_extended import jwt_required, current_user from flask import current_app from iti.applications.common.exceptions.permission import PermissionDeniedException # ==================== 配置读取工具 ==================== def get_permission_config(key: str, default=None): """ 获取权限配置项 Args: key: 配置键名 default: 默认值 Returns: 配置值,如果不存在则返回默认值 """ try: permission_config = current_app.config.get("PERMISSION_CONFIG", {}) return permission_config.get(key, default) except RuntimeError: # 不在 Flask 应用上下文中,返回默认值 return default def get_super_admin_role() -> str: """获取超级管理员角色代码""" return get_permission_config("SUPER_ADMIN_ROLE", "ADMIN") def get_default_error_message() -> str: """获取默认错误消息""" return get_permission_config("DEFAULT_ERROR_MESSAGE", "权限不足") def get_default_error_code() -> int: """获取默认错误代码""" return get_permission_config("DEFAULT_ERROR_CODE", 403) def get_skip_super_admin_default() -> bool: """获取是否跳过超级管理员检查的默认值""" return get_permission_config("SKIP_SUPER_ADMIN_DEFAULT", True) class PermissionChecker: """ 权限检查器基类(策略模式) """ def check(self, user, value) -> bool: """ 检查用户是否满足权限要求 Args: user: 用户对象 value: 权限值(权限码、角色码、部门ID等) Returns: bool: 是否满足权限要求 """ raise NotImplementedError("子类必须实现 check 方法") class PermissionCodeChecker(PermissionChecker): """ 权限字符检查器 检查用户是否拥有指定的权限码 """ def check(self, user, permissions) -> bool: """ 检查用户权限 Args: user: 用户对象 permissions: 单个权限码或权限码列表 Returns: bool: 用户是否拥有指定权限 """ if not user or not hasattr(user, "permissions"): return False # 统一转为列表处理 if isinstance(permissions, str): permissions = [permissions] user_permissions = set(user.permissions or []) return any(perm in user_permissions for perm in permissions) class RoleChecker(PermissionChecker): """ 角色检查器 检查用户是否拥有指定的角色 """ def check(self, user, roles: Union[str, List[str]]) -> bool: """ 检查用户角色 Args: user: 用户对象 roles: 单个角色码或角色码列表 Returns: bool: 用户是否拥有指定角色 """ if not user or not hasattr(user, "roles"): return False # 统一转为列表处理 if isinstance(roles, str): roles = [roles] user_role_codes = set(role.code for role in user.roles if hasattr(role, "code")) return any(role in user_role_codes for role in roles) class FunctionChecker(PermissionChecker): """ 函数检查器 执行用户提供的自定义验证函数 """ def check(self, user, func: Callable) -> bool: """ 执行自定义检查函数 Args: user: 用户对象 func: 自定义检查函数,接收 user 参数,返回 bool Returns: bool: 自定义函数的返回值 """ if not callable(func): return False try: return bool(func(user)) except Exception: # 自定义函数执行异常时,视为权限检查失败 return False class PermissionValidator: """ 权限验证器 组合多个检查器,根据逻辑规则(AND/OR)进行综合判断 """ # 检查器类型映射 CHECKER_MAP = { "permissions": PermissionCodeChecker, "roles": RoleChecker, "function": FunctionChecker, } def __init__(self, logic: str = "AND", skip_super_admin: bool = True): """ 初始化验证器 Args: logic: 逻辑组合方式,"AND" 或 "OR" skip_super_admin: 是否跳过超级管理员的权限检查 """ self.logic = logic.upper() self.skip_super_admin = skip_super_admin self.checkers = [] # [(checker_instance, value), ...] def add_checker(self, checker_type: str, value): """ 添加检查器 Args: checker_type: 检查器类型(permissions/roles/function) value: 检查值 """ if checker_type not in self.CHECKER_MAP: raise ValueError(f"不支持的检查器类型: {checker_type}") checker_class = self.CHECKER_MAP[checker_type] checker_instance = checker_class() self.checkers.append((checker_instance, value)) def validate(self, user) -> bool: """ 执行权限验证 Args: user: 用户对象 Returns: bool: 是否通过验证 """ # 用户不存在,验证失败 if not user: return False # 超级管理员跳过权限检查 if self.skip_super_admin and self._is_super_admin(user): return True # 没有添加任何检查器,默认通过 if not self.checkers: return True # 执行所有检查器 results = [checker.check(user, value) for checker, value in self.checkers] # 根据逻辑规则返回结果 if self.logic == "AND": return all(results) # 全部通过 elif self.logic == "OR": return any(results) # 任一通过 else: raise ValueError(f"不支持的逻辑类型: {self.logic}") def _is_super_admin(self, user) -> bool: """ 判断是否为超级管理员 Args: user: 用户对象 Returns: bool: 是否为超级管理员 """ if not hasattr(user, "roles"): return False # 从配置中获取超级管理员角色代码 super_admin_role = get_super_admin_role() # 检查是否有超级管理员角色 return any( role.code == super_admin_role for role in user.roles if hasattr(role, "code") ) def permission( permissions: Optional[Union[str, List[str]]] = None, roles: Optional[Union[str, List[str]]] = None, func: Optional[Callable] = None, logic: str = "AND", error_message: Optional[str] = None, error_code: Optional[int] = None, skip_super_admin: Optional[bool] = None, ): """ 权限检查装饰器 用于路由函数,在函数执行前进行权限验证 Args: permissions: 权限码(单个或列表) roles: 角色码(单个或列表) depts: 部门ID(单个或列表) func: 自定义检查函数 logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一) error_message: 权限不足时的错误消息 error_code: 权限不足时的错误代码 skip_super_admin: 是否跳过超级管理员的权限检查 Returns: 装饰器函数 Examples: >>> @require_permission(permissions="user:create") >>> def create_user(): >>> pass >>> @require_permission(roles=["admin", "manager"], logic="OR") >>> def manage_system(): >>> pass >>> @require_permission( >>> permissions="data:export", >>> roles="admin", >>> logic="AND" >>> ) >>> def export_data(): >>> pass """ def decorator(ff): @jwt_required() @wraps(ff) def wrapper(*args, **kwargs): # 从配置读取默认值(如果参数未提供) _error_message = ( error_message if error_message is not None else get_default_error_message() ) _error_code = ( error_code if error_code is not None else get_default_error_code() ) _skip_super_admin = ( skip_super_admin if skip_super_admin is not None else get_skip_super_admin_default() ) # 获取当前用户 user = current_user # 创建验证器 validator = PermissionValidator( logic=logic, skip_super_admin=_skip_super_admin ) # 添加检查器 if permissions is not None: validator.add_checker("permissions", permissions) if roles is not None: validator.add_checker("roles", roles) if func is not None: validator.add_checker("function", func) # 执行验证 if not validator.validate(user): raise PermissionDeniedException(_error_message, _error_code) # 权限验证通过,执行原函数 return ff(*args, **kwargs) return wrapper return decorator def check_permission( user=None, permissions: Optional[Union[str, List[str]]] = None, roles: Optional[Union[str, List[str]]] = None, func: Optional[Callable] = None, logic: str = "AND", skip_super_admin: Optional[bool] = None, ) -> bool: """ 快捷权限检查方法 用于在函数内部进行权限判断,返回布尔值 Args: user: 用户对象,不传则使用 current_user permissions: 权限码(单个或列表) roles: 角色码(单个或列表) func: 自定义检查函数 logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一) skip_super_admin: 是否跳过超级管理员的权限检查 Returns: bool: 是否通过权限检查 Examples: >>> if check_permission(permissions="user:delete"): >>> # 执行删除逻辑 >>> pass >>> if check_permission(roles=["admin", "manager"], logic="OR"): >>> # 管理员或经理可见 >>> pass >>> if check_permission( >>> permissions="data:export", >>> roles="admin", >>> logic="AND" >>> ): >>> # 导出数据 >>> pass """ # 从配置读取默认值(如果参数未提供) _skip_super_admin = ( skip_super_admin if skip_super_admin is not None else get_skip_super_admin_default() ) # 如果未传入用户,使用当前用户 if user is None: user = current_user # 创建验证器 validator = PermissionValidator(logic=logic, skip_super_admin=_skip_super_admin) # 添加检查器 if permissions is not None: validator.add_checker("permissions", permissions) if roles is not None: validator.add_checker("roles", roles) if func is not None: validator.add_checker("function", func) # 执行验证并返回结果 return validator.validate(user) def check_permission_or_raise( user=None, permissions: Optional[Union[str, List[str]]] = None, roles: Optional[Union[str, List[str]]] = None, func: Optional[Callable] = None, logic: str = "AND", error_message: Optional[str] = None, error_code: Optional[int] = None, skip_super_admin: Optional[bool] = None, ): """ 快捷权限检查方法(失败时抛出异常) 用于在函数内部进行权限判断,不通过时直接抛出异常 Args: user: 用户对象,不传则使用 current_user permissions: 权限码(单个或列表) roles: 角色码(单个或列表) func: 自定义检查函数 logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一) error_message: 权限不足时的错误消息 error_code: 权限不足时的错误代码 skip_super_admin: 是否跳过超级管理员的权限检查 Raises: PermissionDeniedException: 权限不足时抛出 Examples: >>> check_permission_or_raise(permissions="user:delete") >>> # 继续执行删除逻辑 """ # 从配置读取默认值(如果参数未提供) _error_message = ( error_message if error_message is not None else get_default_error_message() ) _error_code = error_code if error_code is not None else get_default_error_code() _skip_super_admin = ( skip_super_admin if skip_super_admin is not None else get_skip_super_admin_default() ) if not check_permission( user=user, permissions=permissions, roles=roles, func=func, logic=logic, skip_super_admin=_skip_super_admin, ): raise PermissionDeniedException(_error_message, _error_code)