from apiflask import APIBlueprint from iti.applications.extensions import db, sys_log from iti.applications.common.utils import success, page_schema, page from iti.applications.models import ( IotEndpoint, IotEndpointSchema, ) from .schemas.endpoint import ( EndpointQuery, EndpointAddRequest, EndpointUpdateRequest, ) from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException from flask_jwt_extended import jwt_required, current_user from sqlalchemy import select, delete, exists from sqlalchemy.orm import noload from iti.applications.common import permission bp = APIBlueprint("iot_endpoint", __name__, url_prefix="/endpoint", tag="采集端管理") @bp.get("/list") @jwt_required() @bp.doc(security="JWT") @permission("iot:endpoint:list") @bp.input(EndpointQuery.Schema(partial=True), location="query") @bp.output(IotEndpointSchema(many=True)) def list_endpoint(query_data: EndpointQuery): """ 获取采集端列表 """ return success(get_list_or_page(query_data)) @bp.get("/page") @jwt_required() @bp.doc(security="JWT") @permission("iot:endpoint:list") @bp.input(EndpointQuery.Schema(partial=True), location="query") @bp.output(page_schema(IotEndpointSchema(many=True))) def page_endpoint(query_data: EndpointQuery): """ 分页获取采集端列表 """ return page(get_list_or_page(query_data)) @bp.post("/add") @jwt_required() @bp.doc(security="JWT") @permission("iot:endpoint:add") @bp.input(EndpointAddRequest, location="json") def add_endpoint(json_data: dict): """ 添加采集端信息 """ endpoint = IotEndpoint(**json_data) endpoint.status = 0 db.session.add(endpoint) db.session.commit() return success() @bp.put("/") @jwt_required() @bp.doc(security="JWT") @permission("iot:endpoint:update") @bp.input(EndpointUpdateRequest(partial=True), location="json") def update_endpoint(id: int, json_data: dict): """ 更新采集端信息 """ endpoint = db.session.scalar(select(IotEndpoint).filter_by(id=id)) if not endpoint: raise BizException("采集端信息不存在") for key, value in json_data.items(): if value is not None: setattr(endpoint, key, value) db.session.commit() return success() @bp.delete("/") @jwt_required() @bp.doc(security="JWT") @permission("iot:endpoint:delete") def delete_endpoint(id: int): """ 删除采集端信息 """ endpoint = db.session.scalar(select(IotEndpoint).filter_by(id=id)) if not endpoint: raise BizException("采集端不存在") # 删除 db.session.delete(endpoint) db.session.commit() return success() def get_list_or_page(query_data: EndpointQuery): """ 获取采集端信息列表或分页 """ query = select(IotEndpoint).order_by(IotEndpoint.created_at.desc()) if query_data.keyword: kw = ModelFilter.escape_like(query_data.keyword) query = query.filter( IotEndpoint.endpoint_name.like(f"%{kw}%") | IotEndpoint.endpoint_number.like(f"%{kw}%") ) if query_data.device_id: query = query.filter(IotEndpoint.device_id == query_data.device_id) if query_data.status: query = query.filter(IotEndpoint.status == query_data.status) if query_data.page and query_data.size: return db.paginate(query, page=query_data.page, per_page=query_data.size) else: return db.session.scalars(query).all()