You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.4 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from sqlalchemy.orm import noload
from sqlalchemy.sql._typing import ColumnExpressionArgument
from typing import List, Dict, Any, Optional, Set
from iti.applications.extensions import db
from iti.applications.models import SysMenu
from iti.applications.models import sys_role_menu, sys_user_role
from iti.applications.common.enums import StatusEnum, MenuTypeEnum
from iti.applications.common.utils import (
build_tree_from_list,
)
from sqlalchemy import select, distinct
def get_user_menu_ids(user_id: str) -> Set[str]:
"""
获取用户拥有的所有菜单ID通过角色关联
Args:
user_id: 用户ID
Returns:
用户拥有的菜单ID集合
"""
menu_ids = db.session.scalars(
select(distinct(sys_role_menu.c.menu_id))
.join(sys_user_role, sys_role_menu.c.role_id == sys_user_role.c.role_id)
.filter(sys_user_role.c.user_id == user_id)
).all()
return set(menu_ids)
def get_menu_tree(
parent_id: Optional[str] = None,
include_disabled: bool = True,
type_filter: Optional[List[MenuTypeEnum]] = None,
status_filter: Optional[List[StatusEnum]] = None,
user_menu_ids: Optional[Set[str]] = None,
) -> List[Dict[str, Any]]:
"""
获取菜单树
Args:
parent_id: 父菜单ID 不填为所有树,若填写则只返回该父菜单的子菜单树(包括父菜单)
include_disabled: 是否包含禁用菜单
type_filter: 菜单类型过滤 不填为不限制
status_filter: 状态过滤 不填为不限制
user_menu_ids: 用户拥有的菜单ID集合用于过滤用户实际拥有的菜单
"""
# 获取所有菜单数据
if parent_id is None:
cte_query = build_descendants_cte(SysMenu.parent_id.is_(None))
else:
cte_query = build_descendants_cte(SysMenu.parent_id == parent_id)
if not include_disabled:
cte_query = cte_query.filter(SysMenu.status == StatusEnum.ENABLED)
if type_filter:
cte_query = cte_query.filter(SysMenu.type.in_(type_filter))
if status_filter:
cte_query = cte_query.filter(SysMenu.status.in_(status_filter))
if user_menu_ids is not None:
cte_query = cte_query.filter(SysMenu.id.in_(user_menu_ids))
cte_query = cte_query.order_by(SysMenu.parent_id, SysMenu.sort).options(
noload(SysMenu.parent), noload(SysMenu.children)
)
# 使用增强的树工具构建树结构
return build_tree_from_list(db.session.scalars(cte_query).all())
def build_descendants_cte(*criterion: ColumnExpressionArgument[bool]):
"""
构建获取后代的递归CTE
"""
hierarchy = (
select(SysMenu.id, SysMenu.parent_id)
.filter(*criterion)
.cte("hierarchy", recursive=True)
)
bt = select(SysMenu.id, SysMenu.parent_id).join(
hierarchy, hierarchy.c.id == SysMenu.parent_id
)
q = hierarchy.union_all(bt)
return select(SysMenu).join(q, SysMenu.id == q.c.id)
def build_ancestors_cte(*criterion: ColumnExpressionArgument[bool]):
"""
构建获取祖先的递归CTE
"""
hierarchy = (
select(SysMenu.id, SysMenu.parent_id)
.filter(*criterion)
.cte("hierarchy", recursive=True)
)
bt = select(SysMenu.id, SysMenu.parent_id).join(
hierarchy, hierarchy.c.parent_id == SysMenu.id
)
q = hierarchy.union_all(bt)
return select(SysMenu).join(q, SysMenu.id == q.c.id)