from iti_system.models import SysDept from iti_system.routes.schemas.dept import SysDeptQuery from iti.applications.extensions import db from iti.applications.common.utils import build_tree_from_list from sqlalchemy.orm import joinedload, noload from sqlalchemy.sql._typing import ColumnExpressionArgument def get_dept_tree_or_page(query_data: SysDeptQuery, with_leader: bool = False): """ 获取部门树 """ # 若根据 Leader 查询,查询该 Leader 的部门列表(分页),而非树 if query_data.leader_id: return ( SysDept.query.filter(SysDept.leader_id == query_data.leader_id) .order_by(SysDept.sort) .paginate(page=query_data.page, per_page=query_data.size) ) # 若根据 Parent 查询,查询该 Parent 的子部门树,否则查询所有部门树 cte_query = build_descendants_cte(SysDept.parent_id == query_data.parent_id) if query_data.name: cte_query = cte_query.filter(SysDept.name.like(f"%{query_data.name}%")) if query_data.status: cte_query = cte_query.filter(SysDept.status == query_data.status) if query_data.startTime: cte_query = cte_query.filter(SysDept.created_at >= query_data.startTime) if query_data.endTime: cte_query = cte_query.filter(SysDept.created_at <= query_data.endTime) cte_query = cte_query.order_by(SysDept.parent_id, SysDept.sort).options( noload(SysDept.parent), noload(SysDept.children) ) if with_leader: cte_query = cte_query.options(joinedload(SysDept.leader)) return build_tree_from_list(cte_query.all()) def build_descendants_cte(*criterion: ColumnExpressionArgument[bool]): """ 构建获取后代的递归CTE """ hierarchy = ( db.session.query(SysDept.id, SysDept.parent_id) .filter(*criterion) .cte("hierarchy", recursive=True) ) bt = db.session.query(SysDept.id, SysDept.parent_id).join( hierarchy, hierarchy.c.id == SysDept.parent_id ) q = hierarchy.union_all(bt) return db.session.query(SysDept).join(q, SysDept.id == q.c.id) def build_ancestors_cte(*criterion: ColumnExpressionArgument[bool]): """ 构建获取祖先的递归CTE """ hierarchy = ( db.session.query(SysDept.id, SysDept.parent_id) .filter(*criterion) .cte("hierarchy", recursive=True) ) bt = db.session.query(SysDept.id, SysDept.parent_id).join( hierarchy, hierarchy.c.id == SysDept.parent_id ) q = hierarchy.union_all(bt) return db.session.query(SysDept).join(q, SysDept.id == q.c.id)