You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iTi-Flask/iti/applications/common/permission.py

460 lines
13 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from typing import Union, List, Callable, Optional
from functools import wraps
from flask_jwt_extended import jwt_required, current_user
from flask import current_app
from iti.applications.common.exceptions.permission import PermissionDeniedException
# ==================== 配置读取工具 ====================
def get_permission_config(key: str, default=None):
"""
获取权限配置项
Args:
key: 配置键名
default: 默认值
Returns:
配置值,如果不存在则返回默认值
"""
try:
permission_config = current_app.config.get("PERMISSION_CONFIG", {})
return permission_config.get(key, default)
except RuntimeError:
# 不在 Flask 应用上下文中,返回默认值
return default
def get_super_admin_role() -> str:
"""获取超级管理员角色代码"""
return get_permission_config("SUPER_ADMIN_ROLE", "SUPER_ADMIN")
def get_default_error_message() -> str:
"""获取默认错误消息"""
return get_permission_config("DEFAULT_ERROR_MESSAGE", "权限不足")
def get_default_error_code() -> int:
"""获取默认错误代码"""
return get_permission_config("DEFAULT_ERROR_CODE", 403)
def get_skip_super_admin_default() -> bool:
"""获取是否跳过超级管理员检查的默认值"""
return get_permission_config("SKIP_SUPER_ADMIN_DEFAULT", True)
class PermissionChecker:
"""
权限检查器基类(策略模式)
"""
def check(self, user, value) -> bool:
"""
检查用户是否满足权限要求
Args:
user: 用户对象
value: 权限值权限码、角色码、部门ID等
Returns:
bool: 是否满足权限要求
"""
raise NotImplementedError("子类必须实现 check 方法")
class PermissionCodeChecker(PermissionChecker):
"""
权限字符检查器
检查用户是否拥有指定的权限码
"""
def check(self, user, permissions) -> bool:
"""
检查用户权限
Args:
user: 用户对象
permissions: 单个权限码或权限码列表
Returns:
bool: 用户是否拥有指定权限
"""
if not user or not hasattr(user, "permissions"):
return False
# 统一转为列表处理
if isinstance(permissions, str):
permissions = [permissions]
user_permissions = set(user.permissions or [])
return any(perm in user_permissions for perm in permissions)
class RoleChecker(PermissionChecker):
"""
角色检查器
检查用户是否拥有指定的角色
"""
def check(self, user, roles: Union[str, List[str]]) -> bool:
"""
检查用户角色
Args:
user: 用户对象
roles: 单个角色码或角色码列表
Returns:
bool: 用户是否拥有指定角色
"""
if not user or not hasattr(user, "roles"):
return False
# 统一转为列表处理
if isinstance(roles, str):
roles = [roles]
user_role_codes = set(role.code for role in user.roles if hasattr(role, "code"))
return any(role in user_role_codes for role in roles)
class FunctionChecker(PermissionChecker):
"""
函数检查器
执行用户提供的自定义验证函数
"""
def check(self, user, func: Callable) -> bool:
"""
执行自定义检查函数
Args:
user: 用户对象
func: 自定义检查函数,接收 user 参数,返回 bool
Returns:
bool: 自定义函数的返回值
"""
if not callable(func):
return False
try:
return bool(func(user))
except Exception:
# 自定义函数执行异常时,视为权限检查失败
return False
class PermissionValidator:
"""
权限验证器
组合多个检查器根据逻辑规则AND/OR进行综合判断
"""
# 检查器类型映射
CHECKER_MAP = {
"permissions": PermissionCodeChecker,
"roles": RoleChecker,
"function": FunctionChecker,
}
def __init__(self, logic: str = "AND", skip_super_admin: bool = True):
"""
初始化验证器
Args:
logic: 逻辑组合方式,"AND""OR"
skip_super_admin: 是否跳过超级管理员的权限检查
"""
self.logic = logic.upper()
self.skip_super_admin = skip_super_admin
self.checkers = [] # [(checker_instance, value), ...]
def add_checker(self, checker_type: str, value):
"""
添加检查器
Args:
checker_type: 检查器类型permissions/roles/function
value: 检查值
"""
if checker_type not in self.CHECKER_MAP:
raise ValueError(f"不支持的检查器类型: {checker_type}")
checker_class = self.CHECKER_MAP[checker_type]
checker_instance = checker_class()
self.checkers.append((checker_instance, value))
def validate(self, user) -> bool:
"""
执行权限验证
Args:
user: 用户对象
Returns:
bool: 是否通过验证
"""
# 用户不存在,验证失败
if not user:
return False
# 超级管理员跳过权限检查
if self.skip_super_admin and self._is_super_admin(user):
return True
# 没有添加任何检查器,默认通过
if not self.checkers:
return True
# 执行所有检查器
results = [checker.check(user, value) for checker, value in self.checkers]
# 根据逻辑规则返回结果
if self.logic == "AND":
return all(results) # 全部通过
elif self.logic == "OR":
return any(results) # 任一通过
else:
raise ValueError(f"不支持的逻辑类型: {self.logic}")
def _is_super_admin(self, user) -> bool:
"""
判断是否为超级管理员
Args:
user: 用户对象
Returns:
bool: 是否为超级管理员
"""
if not hasattr(user, "roles"):
return False
# 从配置中获取超级管理员角色代码
super_admin_role = get_super_admin_role()
# 检查是否有超级管理员角色
return any(
role.code == super_admin_role
for role in user.roles
if hasattr(role, "code")
)
def permission(
permissions: Optional[Union[str, List[str]]] = None,
roles: Optional[Union[str, List[str]]] = None,
func: Optional[Callable] = None,
logic: str = "AND",
error_message: Optional[str] = None,
error_code: Optional[int] = None,
skip_super_admin: Optional[bool] = None,
):
"""
权限检查装饰器
用于路由函数,在函数执行前进行权限验证
Args:
permissions: 权限码(单个或列表)
roles: 角色码(单个或列表)
depts: 部门ID单个或列表
func: 自定义检查函数
logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一)
error_message: 权限不足时的错误消息
error_code: 权限不足时的错误代码
skip_super_admin: 是否跳过超级管理员的权限检查
Returns:
装饰器函数
Examples:
>>> @require_permission(permissions="user:create")
>>> def create_user():
>>> pass
>>> @require_permission(roles=["admin", "manager"], logic="OR")
>>> def manage_system():
>>> pass
>>> @require_permission(
>>> permissions="data:export",
>>> roles="admin",
>>> logic="AND"
>>> )
>>> def export_data():
>>> pass
"""
def decorator(ff):
@jwt_required()
@wraps(ff)
def wrapper(*args, **kwargs):
# 从配置读取默认值(如果参数未提供)
_error_message = (
error_message
if error_message is not None
else get_default_error_message()
)
_error_code = (
error_code if error_code is not None else get_default_error_code()
)
_skip_super_admin = (
skip_super_admin
if skip_super_admin is not None
else get_skip_super_admin_default()
)
# 获取当前用户
user = current_user
# 创建验证器
validator = PermissionValidator(
logic=logic, skip_super_admin=_skip_super_admin
)
# 添加检查器
if permissions is not None:
validator.add_checker("permissions", permissions)
if roles is not None:
validator.add_checker("roles", roles)
if func is not None:
validator.add_checker("function", func)
# 执行验证
if not validator.validate(user):
raise PermissionDeniedException(_error_message, _error_code)
# 权限验证通过,执行原函数
return ff(*args, **kwargs)
return wrapper
return decorator
def check_permission(
user=None,
permissions: Optional[Union[str, List[str]]] = None,
roles: Optional[Union[str, List[str]]] = None,
func: Optional[Callable] = None,
logic: str = "AND",
skip_super_admin: Optional[bool] = None,
) -> bool:
"""
快捷权限检查方法
用于在函数内部进行权限判断,返回布尔值
Args:
user: 用户对象,不传则使用 current_user
permissions: 权限码(单个或列表)
roles: 角色码(单个或列表)
func: 自定义检查函数
logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一)
skip_super_admin: 是否跳过超级管理员的权限检查
Returns:
bool: 是否通过权限检查
Examples:
>>> if check_permission(permissions="user:delete"):
>>> # 执行删除逻辑
>>> pass
>>> if check_permission(roles=["admin", "manager"], logic="OR"):
>>> # 管理员或经理可见
>>> pass
>>> if check_permission(
>>> permissions="data:export",
>>> roles="admin",
>>> logic="AND"
>>> ):
>>> # 导出数据
>>> pass
"""
# 从配置读取默认值(如果参数未提供)
_skip_super_admin = (
skip_super_admin
if skip_super_admin is not None
else get_skip_super_admin_default()
)
# 如果未传入用户,使用当前用户
if user is None:
user = current_user
# 创建验证器
validator = PermissionValidator(logic=logic, skip_super_admin=_skip_super_admin)
# 添加检查器
if permissions is not None:
validator.add_checker("permissions", permissions)
if roles is not None:
validator.add_checker("roles", roles)
if func is not None:
validator.add_checker("function", func)
# 执行验证并返回结果
return validator.validate(user)
def check_permission_or_raise(
user=None,
permissions: Optional[Union[str, List[str]]] = None,
roles: Optional[Union[str, List[str]]] = None,
func: Optional[Callable] = None,
logic: str = "AND",
error_message: Optional[str] = None,
error_code: Optional[int] = None,
skip_super_admin: Optional[bool] = None,
):
"""
快捷权限检查方法(失败时抛出异常)
用于在函数内部进行权限判断,不通过时直接抛出异常
Args:
user: 用户对象,不传则使用 current_user
permissions: 权限码(单个或列表)
roles: 角色码(单个或列表)
func: 自定义检查函数
logic: 逻辑组合方式,"AND"(全部满足)或 "OR"(满足任一)
error_message: 权限不足时的错误消息
error_code: 权限不足时的错误代码
skip_super_admin: 是否跳过超级管理员的权限检查
Raises:
PermissionDeniedException: 权限不足时抛出
Examples:
>>> check_permission_or_raise(permissions="user:delete")
>>> # 继续执行删除逻辑
"""
# 从配置读取默认值(如果参数未提供)
_error_message = (
error_message if error_message is not None else get_default_error_message()
)
_error_code = error_code if error_code is not None else get_default_error_code()
_skip_super_admin = (
skip_super_admin
if skip_super_admin is not None
else get_skip_super_admin_default()
)
if not check_permission(
user=user,
permissions=permissions,
roles=roles,
func=func,
logic=logic,
skip_super_admin=_skip_super_admin,
):
raise PermissionDeniedException(_error_message, _error_code)