You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

332 lines
10 KiB
Python

from apiflask import APIBlueprint
from flask_jwt_extended import jwt_required
from iti.applications.common.utils.schema import condition_schema
from iti.applications.models import (
SysDictType,
SysDictTypeSchema,
SysDictData,
SysDictDataSchema,
)
from .schemas.dict import (
SysDictTypeQuery,
SysDictTypeCreateRequest,
SysDictTypeUpdateRequest,
SysDictDataQuery,
SysDictDataCreateRequest,
SysDictDataUpdateRequest,
SysDictDataBatchDeleteRequest,
)
from iti.applications.common.utils import page_schema, page, success, pagination_fields
from iti.applications.common import ModelFilter
from iti.applications.extensions import db
from iti.applications.common.exceptions.biz_exp import BizException
from sqlalchemy import select, delete
from iti.applications.common import permission
bp = APIBlueprint("sys_dict", __name__, url_prefix="/dict", tag="系统.字典管理")
@bp.get("/type/page")
@jwt_required()
@permission("system:dict:list")
@bp.input(SysDictTypeQuery.Schema, location="query")
@bp.output(
page_schema(condition_schema(SysDictTypeSchema, {"withDataList": ["data_list"]}))
)
def page_sys_dict_type(query_data: SysDictTypeQuery):
"""
分页查询字典类型
"""
query = select(SysDictType).order_by(SysDictType.sort.asc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
SysDictType.type_name.like(f"%{kw}%")
| SysDictType.type_code.like(f"%{kw}%")
)
else:
if query_data.type_name:
query = query.filter(
SysDictType.type_name.like(
f"%{ModelFilter.escape_like(query_data.type_name)}%"
)
)
if query_data.type_code:
query = query.filter(
SysDictType.type_code.like(
f"%{ModelFilter.escape_like(query_data.type_code)}%"
)
)
if query_data.status:
query = query.filter(SysDictType.status == query_data.status)
if query_data.createdAt and len(query_data.createdAt) >= 2:
query = query.filter(
SysDictType.created_at.between(
query_data.createdAt[0], query_data.createdAt[1]
)
)
if query_data.updatedAt and len(query_data.updatedAt) >= 2:
query = query.filter(
SysDictType.updated_at.between(
query_data.updatedAt[0], query_data.updatedAt[1]
)
)
pagination = db.paginate(query, page=query_data.page, per_page=query_data.size)
return page(pagination)
@bp.get("/type")
@jwt_required()
@permission("system:dict:list")
@bp.input(SysDictTypeQuery.Schema(exclude=pagination_fields), location="query")
@bp.output(condition_schema(SysDictTypeSchema, {"withDataList": ["data_list"]}))
def get_sys_dict_type(query_data: SysDictTypeQuery):
"""
查询字典类型
"""
query = select(SysDictType).order_by(SysDictType.sort.asc()).limit(1)
if query_data.type_code:
query = query.filter(SysDictType.type_code == query_data.type_code)
if query_data.status:
query = query.filter(SysDictType.status == query_data.status)
if query_data.createdAt and len(query_data.createdAt) >= 2:
query = query.filter(
SysDictType.created_at.between(
query_data.createdAt[0], query_data.createdAt[1]
)
)
if query_data.updatedAt and len(query_data.updatedAt) >= 2:
query = query.filter(
SysDictType.updated_at.between(
query_data.updatedAt[0], query_data.updatedAt[1]
)
)
return success(db.session.execute(query).one_or_none())
@bp.post("/type")
@jwt_required()
@permission("system:dict:create")
@bp.input(SysDictTypeCreateRequest, location="json")
def create_sys_dict_type(json_data: dict):
"""
创建字典类型
"""
sys_dict_type = SysDictType(**json_data)
db.session.add(sys_dict_type)
db.session.commit()
return success()
@bp.put("/type/<string:id>")
@jwt_required()
@permission("system:dict:edit")
@bp.input(SysDictTypeUpdateRequest(partial=True), location="json")
def update_sys_dict_type(id: str, json_data: dict):
"""
更新字典类型
"""
sys_dict_type = db.session.scalar(select(SysDictType).filter_by(id=id))
if not sys_dict_type:
raise BizException(message="字典类型不存在")
for key, value in json_data.items():
if value is None:
continue
setattr(sys_dict_type, key, value)
db.session.commit()
return success()
@bp.delete("/type/<string:id>")
@jwt_required()
@permission("system:dict:delete")
def delete_sys_dict_type(id: str):
"""
删除字典类型
"""
sys_dict_type = db.session.scalar(select(SysDictType).filter_by(id=id))
if not sys_dict_type:
raise BizException(message="字典类型不存在")
try:
# 要先删除字典数据(级联)
db.session.execute(
delete(SysDictData).filter_by(type_code=sys_dict_type.type_code)
)
db.session.delete(sys_dict_type)
db.session.commit()
except Exception as e:
db.session.rollback()
raise BizException(message="删除字典类型失败", data=str(e))
return success()
########################## 字典数据管理 #######################
@bp.get("/data/page")
@jwt_required()
@permission("system:dict:list")
@bp.input(SysDictDataQuery.Schema, location="query")
@bp.output(page_schema(SysDictDataSchema(exclude=["type"])))
def page_sys_dict_data(query_data: SysDictDataQuery):
"""
分页查询字典数据
"""
return page(get_list_or_page(query_data))
@bp.get("/data/list")
@jwt_required()
@permission("system:dict:list")
@bp.input(SysDictDataQuery.Schema(exclude=pagination_fields), location="query")
@bp.output(SysDictDataSchema(exclude=["type"], many=True))
def list_sys_dict_data(query_data: SysDictDataQuery):
"""
查询字典数据列表
"""
return success(get_list_or_page(query_data))
def get_list_or_page(query_data: SysDictDataQuery):
"""
获取字典数据列表或分页
"""
query = (
select(SysDictData)
.filter_by(type_code=query_data.type_code)
.order_by(SysDictData.sort.asc())
)
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
SysDictData.label.like(f"%{kw}%") | SysDictData.code.like(f"%{kw}%")
)
else:
if query_data.code:
query = query.filter(
SysDictData.code.like(f"%{ModelFilter.escape_like(query_data.code)}%")
)
if query_data.label:
query = query.filter(
SysDictData.label.like(f"%{ModelFilter.escape_like(query_data.label)}%")
)
if query_data.status:
query = query.filter(SysDictData.status == query_data.status)
if query_data.createdAt and len(query_data.createdAt) >= 2:
query = query.filter(
SysDictData.created_at.between(
query_data.createdAt[0], query_data.createdAt[1]
)
)
if query_data.updatedAt and len(query_data.updatedAt) >= 2:
query = query.filter(
SysDictData.updated_at.between(
query_data.updatedAt[0], query_data.updatedAt[1]
)
)
if query_data.page and query_data.size:
return db.paginate(query, page=query_data.page, per_page=query_data.size)
else:
return db.session.scalars(query).all()
@bp.get("/data/<string:id>")
@jwt_required()
@permission("system:dict:list")
@bp.output(SysDictDataSchema(exclude=["type"]))
def get_sys_dict_data(id: str):
"""
查询字典数据
"""
return success(db.session.scalar(select(SysDictData).filter_by(id=id)))
@bp.get("/data")
@jwt_required()
@permission("system:dict:list")
@bp.input(
SysDictDataQuery.Schema(
exclude=pagination_fields + ["keyword", "label", "createdAt", "updatedAt"]
),
location="query",
)
@bp.output(SysDictDataSchema(exclude=["type"]))
def get_sys_dict_data_by_query(query_data: SysDictDataQuery):
"""
查询字典数据
"""
query = select(SysDictData).filter_by(type_code=query_data.type_code)
if query_data.code:
query = query.filter(SysDictData.code == query_data.code)
if query_data.status:
query = query.filter(SysDictData.status == query_data.status)
return success(db.session.scalar(query))
@bp.post("/data")
@jwt_required()
@permission("system:dict:create")
@bp.input(SysDictDataCreateRequest, location="json")
def create_sys_dict_data(json_data: dict):
"""
创建字典数据
"""
sys_dict_data = SysDictData(**json_data)
db.session.add(sys_dict_data)
db.session.commit()
return success()
@bp.put("/data/<string:id>")
@jwt_required()
@permission("system:dict:edit")
@bp.input(SysDictDataUpdateRequest(partial=True), location="json")
def update_sys_dict_data(id: str, json_data: dict):
"""
更新字典数据
"""
sys_dict_data = db.session.scalar(select(SysDictData).filter_by(id=id))
if not sys_dict_data:
raise BizException(message="字典数据不存在")
for key, value in json_data.items():
if value is None:
continue
setattr(sys_dict_data, key, value)
db.session.commit()
return success()
@bp.delete("/data/<string:id>")
@jwt_required()
@permission("system:dict:delete")
def delete_sys_dict_data(id: str):
"""
删除字典数据
"""
sys_dict_data = db.session.scalar(select(SysDictData).filter_by(id=id))
if not sys_dict_data:
raise BizException(message="字典数据不存在")
db.session.delete(sys_dict_data)
db.session.commit()
return success()
@bp.delete("/data/batch")
@jwt_required()
@permission("system:dict:delete")
@bp.input(SysDictDataBatchDeleteRequest.Schema, location="query")
def batch_delete_sys_dict_data(query_data: SysDictDataBatchDeleteRequest):
"""
批量删除字典数据
"""
if query_data.ids and len(query_data.ids) > 0:
db.session.execute(delete(SysDictData).filter_by(id.in_(query_data.ids)))
if query_data.type_code:
db.session.execute(
delete(SysDictData).filter_by(type_code=query_data.type_code)
)
db.session.commit()
return success()