diff --git a/hatch.toml b/hatch.toml index 7ad664c..849b290 100644 --- a/hatch.toml +++ b/hatch.toml @@ -2,6 +2,7 @@ [envs.default] type = "virtual" dependencies = [ + "simpleeval", "influxdb-client", "flask>=3.1.0", "apiflask>=2.4.0", diff --git a/iti/applications/models/iot/iot_alert_push.py b/iti/applications/models/iot/iot_alert_push.py index fbac2ee..d33990d 100644 --- a/iti/applications/models/iot/iot_alert_push.py +++ b/iti/applications/models/iot/iot_alert_push.py @@ -1,7 +1,8 @@ +from ast import List from iti.applications.extensions import db from iti.applications.common.crud import TimeModelMixin from iti.applications.common.utils import BaseSchema -from apiflask.fields import String, Integer, DateTime, Nested +from apiflask.fields import String, Integer, DateTime, Nested, List class IotAlertPush(TimeModelMixin, db.Model): """ @@ -25,7 +26,7 @@ class IotAlertPushSchema(BaseSchema): id = Integer() target_name = String() push_url = String() - alert_level = String() + alert_level = List(Integer()) status = Integer() created_at = DateTime(format="%Y-%m-%d %H:%M:%S") updated_at = DateTime(format="%Y-%m-%d %H:%M:%S") \ No newline at end of file diff --git a/iti/applications/models/iot/iot_node.py b/iti/applications/models/iot/iot_node.py index 087d8c0..1351023 100644 --- a/iti/applications/models/iot/iot_node.py +++ b/iti/applications/models/iot/iot_node.py @@ -19,16 +19,12 @@ class IotNode(db.Model, TimeModelMixin): workshop_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="车间ID") device_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="设备ID") endpoint_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="采集端ID") + node_number = db.Column(db.String(20), nullable=False, unique=True, comment="节点编号") title = db.Column(db.String(255), nullable=False, unique=True, comment="节点ID") mark = db.Column(db.String(255), nullable=False, comment="采集标识") mark_type = db.Column(db.Integer, nullable=True, comment="采集类型 1:只读 2:只写 3:读写") tag_label = db.Column(db.String(255), nullable=False, comment="变量别名,用于数据存储标记") data_type = db.Column(db.String(255), nullable=False, comment="值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型") - is_warning = db.Column(db.Integer, nullable=False, comment="预警类型 0:无预警 1:预警") - warning_effective_config = db.Column(db.String(255), nullable=False, comment="预警触发表达式") - is_calling = db.Column(db.Integer, nullable=False, comment="报警类型 0:无报警 1:报警") - calling_effective_config = db.Column(db.String(255), nullable=False, comment="报警触发表达式") - method_content = db.Column(db.String(255), nullable=False, comment="方法节点") status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:禁用 1:启用") #关系 workshop = db.relationship( @@ -56,16 +52,12 @@ class IotNodeSchema(BaseSchema): workshop_id = Integer() device_id = Integer() endpoint_id = Integer() + node_number = String() title = String() mark = String() mark_type = Integer() tag_label = String() data_type = String() - is_warning = Integer() - warning_effective_config = String() - is_calling = Integer() - calling_effective_config = String() - method_content = String() status = Integer() created_at = DateTime(format="%Y-%m-%d %H:%M:%S") updated_at = DateTime(format="%Y-%m-%d %H:%M:%S") diff --git a/iti/applications/routes/iot/alert_log_ctl.py b/iti/applications/routes/iot/alert_log_ctl.py index 612556b..75c9031 100644 --- a/iti/applications/routes/iot/alert_log_ctl.py +++ b/iti/applications/routes/iot/alert_log_ctl.py @@ -1,5 +1,8 @@ from apiflask import APIBlueprint -from iti.applications.service.iot.alert import add_endpoint_alert_log +from iti.applications.service.iot.alert import ( + add_endpoint_alert_log, + add_node_alert_log +) from iti.applications.extensions import db from iti.applications.common.utils import success, page_schema, page from iti.applications.models import ( @@ -10,6 +13,7 @@ from .schemas.alert_log import ( AlertLogQuery, AlertLogAddRequest, AlertLogUpdateRequest, + NodeAlertLogAddRequest, ) from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException @@ -63,6 +67,24 @@ def add_alert_log(endpoint_id: int): return success() +@bp.post("/addNodeValue") +# @jwt_required() +# @bp.doc(security="JWT") +# @permission("iot:alertLog:add") +@bp.input(NodeAlertLogAddRequest, location="json") +def add_node_alert(json_data: dict): + """ + 添加节点值异常告警日志 + """ + node_id = json_data["node_id"] + alert_value = json_data["alert_value"] + result = add_node_alert_log(node_id, alert_value) + if len(result) > 0: + raise BizException(result) + + return success(); + + @bp.post("/recover/") @jwt_required() @bp.doc(security="JWT") diff --git a/iti/applications/routes/iot/alert_push_ctl.py b/iti/applications/routes/iot/alert_push_ctl.py index e0a82e0..e75f629 100644 --- a/iti/applications/routes/iot/alert_push_ctl.py +++ b/iti/applications/routes/iot/alert_push_ctl.py @@ -30,8 +30,11 @@ def list_alert_push(query_data: AlertPushQuery): """ 获取消息通知列表 """ - - return success(get_list_or_page(query_data)) + + r = get_list_or_page(query_data) + for item in r.items: + item.alert_level = list(map(int, item.alert_level.split(","))) + return success(r) @bp.get("/page") @jwt_required() @@ -43,8 +46,11 @@ def page_alert_push(query_data: AlertPushQuery): """ 获取消息通知分页列表 """ - - return success(get_list_or_page(query_data)) + + r = get_list_or_page(query_data) + for item in r.items: + item.alert_level = list(map(int, item.alert_level.split(","))) + return success(r) @bp.post("/add") @@ -57,12 +63,55 @@ def add_alert_push(json_data: dict): 添加消息通知 """ + if "alert_level" in json_data: + json_data["alert_level"] = ",".join(map(str, json_data["alert_level"])) alert_push = IotAlertPush(**json_data) alert_push.status = 0 db.session.add(alert_push) db.session.commit() return success() +@bp.put("/") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertPush:update") +@bp.input(AlertPushUpdateRequest(partial=True), location="json") +def update_alert_push(id: int, json_data: dict): + """ + 更新消息通知 + """ + + alert_push = db.session.scalar(select(IotAlertPush).where(IotAlertPush.id == id)) + if not alert_push: + raise BizException("消息通知不存在") + + for key, value in json_data.items(): + if key == "alert_level": + value = ",".join(map(str, value)) + if value is not None: + setattr(alert_push, key, value) + + db.session.commit() + return success() + + +@bp.delete("/") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertPush:delete") +def delete_alert_push(id: int): + """ + 删除消息通知 + """ + + alert_push = db.session.scalar(select(IotAlertPush).where(IotAlertPush.id == id)) + if not alert_push: + raise BizException("消息通知不存在") + + db.session.delete(alert_push) + db.session.commit() + return success() + def get_list_or_page(query_data: AlertPushQuery): """ diff --git a/iti/applications/routes/iot/alert_rule_ctl.py b/iti/applications/routes/iot/alert_rule_ctl.py index 4ddb1ab..3ec1fc7 100644 --- a/iti/applications/routes/iot/alert_rule_ctl.py +++ b/iti/applications/routes/iot/alert_rule_ctl.py @@ -19,3 +19,93 @@ from sqlalchemy.orm import noload from iti.applications.common import permission bp = APIBlueprint("iot_alert_rule", __name__, url_prefix="/alertRule", tag="告警规则") + +@bp.get("/list") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertRule:list") +@bp.input(AlertRuleQuery.Schema(partial=True), location="query") +@bp.output(IotAlertRuleSchema(many=True)) +def get_alert_rule_list(query_data: AlertRuleQuery): + """ + 获取告警规则列表 + """ + + return get_list_or_page(query_data) + +@bp.get("/page") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertRule:list") +@bp.input(AlertRuleQuery.Schema(partial=True), location="query") +@bp.output(page_schema(IotAlertRuleSchema)) +def get_alert_rule_page(query_data: AlertRuleQuery): + """ + 获取告警规则分页列表 + """ + return page(get_list_or_page(query_data)) + +@bp.post("/add") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertRule:add") +@bp.input(AlertRuleAddRequest, location="json") +def add_alert_rule(json_data: dict): + """ + 添加告警规则 + """ + alert_rule = IotAlertRule(**json_data) + db.session.add(alert_rule) + db.session.commit() + return success() + +@bp.put("/") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertRule:update") +@bp.input(AlertRuleUpdateRequest(partial=True), location="json") +def update_alert_rule(id: int, json_data: dict): + """ + 更新告警规则 + """ + alert_rule = db.session.scalar(select(IotAlertRule).where(IotAlertRule.id == id)) + if not alert_rule: + raise BizException("告警规则不存在") + + for key, value in json_data.items(): + setattr(alert_rule, key, value) + + db.session.commit() + return success() + +@bp.delete("/") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:alertRule:delete") +def delete_alert_rule(id: int): + """ + 删除告警规则 + """ + alert_rule = db.session.scalar(select(IotAlertRule).where(IotAlertRule.id == id)) + if not alert_rule: + raise BizException("告警规则不存在") + + db.session.delete(alert_rule) + db.session.commit() + return success() + + + +def get_list_or_page(query_data: AlertRuleQuery): + """ + 获取告警规则列表 + """ + query = select(IotAlertRule).order_by(IotAlertRule.created_at.desc()) + if query_data.node_id: + query = query.filter(IotAlertRule.node_id == query_data.node_id) + if query_data.status is not None: + query = query.filter(IotAlertRule.status == query_data.status) + if query_data.page and query_data.size: + return db.paginate(query, page=query_data.page, per_page=query_data.size) + else: + return db.session.scalars(query).all() \ No newline at end of file diff --git a/iti/applications/routes/iot/node_ctl.py b/iti/applications/routes/iot/node_ctl.py index 37be1ed..21020f2 100644 --- a/iti/applications/routes/iot/node_ctl.py +++ b/iti/applications/routes/iot/node_ctl.py @@ -2,6 +2,7 @@ from apiflask import APIBlueprint from iti.applications.extensions import db, sys_log from iti.applications.common.utils import success, page_schema, page from iti.applications.models import ( + IotDevice, IotEndpoint, IotNode, IotNodeSchema, @@ -12,8 +13,9 @@ from .schemas.node import ( NodeUpdateRequest, ) from iti.applications.service.iot import ( - iot_influxdb + iot_influxdb, ) +from iti.applications.service.iot.alert import delete_node_alert_rule from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException from flask_jwt_extended import jwt_required @@ -83,12 +85,17 @@ def add_node(json_data: dict): node = IotNode(**json_data) endpoint = db.session.scalar( select(IotEndpoint) - .options(noload(IotEndpoint.workshop), noload(IotEndpoint.device)) .filter_by(id=node.endpoint_id)) if not endpoint: raise BizException("采集端信息不存在") - node.device_id = endpoint.device_id - node.workshop_id = endpoint.workshop_id + device = db.session.scalar( + select(IotDevice) + .options(noload(IotDevice.workshop)) + .filter_by(id=node.device_id)) + if not device: + raise BizException("设备信息不存在") + + node.workshop_id = device.workshop_id node.status = 0 db.session.add(node) db.session.commit() @@ -135,6 +142,9 @@ def delete_node(id: int): if not node: raise BizException("采集节点不存在") + # 删除节点告警规则 + delete_node_alert_rule(node) + # 删除 db.session.delete(node) db.session.commit() @@ -159,6 +169,7 @@ def get_node_alert_data(id: int): measurement = f"ep{node.endpoint_id}_nd{node.id}" resultData = {} + resultData["nodeNumber"] = node.node_number resultData["title"] = node.title resultData["dataType"] = node.data_type resultData["tagLabel"] = node.tag_label @@ -174,6 +185,11 @@ def get_page(query_data: NodeQuery): 获取采集节点分页 """ query = select(IotNode).order_by(IotNode.created_at.desc()) + if query_data.keyword: + kw = ModelFilter.escape_like(query_data.keyword) + query = query.filter( + IotNode.node_number.like(f"%{kw}%") + ) if query_data.endpoint_id: query = query.filter(IotNode.endpoint_id == query_data.endpoint_id) if query_data.device_id: @@ -191,6 +207,11 @@ def get_list(query_data: NodeQuery): 获取采集节点列表 """ query = select(IotNode).options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint)).order_by(IotNode.created_at.desc()) + if query_data.keyword: + kw = ModelFilter.escape_like(query_data.keyword) + query = query.filter( + IotNode.node_number.like(f"%{kw}%") + ) if query_data.endpoint_id: query = query.filter(IotNode.endpoint_id == query_data.endpoint_id) if query_data.device_id: diff --git a/iti/applications/routes/iot/schemas/alert_log.py b/iti/applications/routes/iot/schemas/alert_log.py index 6802738..09d1ff7 100644 --- a/iti/applications/routes/iot/schemas/alert_log.py +++ b/iti/applications/routes/iot/schemas/alert_log.py @@ -120,3 +120,23 @@ class AlertLogUpdateRequest(BaseSchema): "description": "告警级别", }, ) + + +class NodeAlertLogAddRequest(BaseSchema): + """ + 节点值异常告警日志信息添加请求 + """ + node_id = fields.Integer( + required=True, + metadata={ + "example": 1, + "description": "节点ID", + }, + ) + alert_value = fields.String( + required=True, + metadata={ + "example": "-2.1", + "description": "告警值", + }, + ) \ No newline at end of file diff --git a/iti/applications/routes/iot/schemas/alert_push.py b/iti/applications/routes/iot/schemas/alert_push.py index 2537b4d..b35dd23 100644 --- a/iti/applications/routes/iot/schemas/alert_push.py +++ b/iti/applications/routes/iot/schemas/alert_push.py @@ -46,6 +46,12 @@ class AlertPushAddRequest(BaseSchema): "description": "告警推送URL", }, ) + alert_level = fields.List( + fields.Integer(), + required=False, + metadata={"example": 1, "description": "告警级别"}, + load_default=1, + ) status = fields.Integer( required=False, metadata={ @@ -73,6 +79,12 @@ class AlertPushUpdateRequest(BaseSchema): "description": "告警推送URL", }, ) + alert_level = fields.List( + fields.Integer(), + required=False, + metadata={"example": 1, "description": "告警级别"}, + load_default=1, + ) status = fields.Integer( required=False, metadata={"example": 1, "description": "状态 0-禁用,1-启用"}, diff --git a/iti/applications/routes/iot/schemas/node.py b/iti/applications/routes/iot/schemas/node.py index e62f5d1..53f8021 100644 --- a/iti/applications/routes/iot/schemas/node.py +++ b/iti/applications/routes/iot/schemas/node.py @@ -12,6 +12,13 @@ class NodeQuery(Pagination): 节点信息查询请求 """ + keyword: Optional[str] = field( + default=None, + metadata={ + "required": False, + "metadata": {"example": "tt_01", "description": "查询关键字[节点编号]"}, + }, + ) workshop_id: int = field( default=None, metadata={ @@ -47,14 +54,21 @@ class NodeAddRequest(BaseSchema): """ 节点新增信息 """ - + device_id = fields.Integer( + required=True, + metadata={"example": 1, "description": "设备ID"}, + ) endpoint_id = fields.Integer( required=True, metadata={"example": 1, "description": "采集端ID"}, ) + node_number = fields.String( + required=True, + metadata={"example": "tt_01", "description": "节点编号"}, + ) title = fields.String( required=True, - metadata={"example": "节点ID", "descriptrion": "节点ID"}, + metadata={"example": "节点ID", "description": "节点ID"}, ) mark = fields.String( required=False, @@ -74,45 +88,27 @@ class NodeAddRequest(BaseSchema): required=True, metadata={"example": "text", "description": "值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型"}, ) - is_warning = fields.Integer( - required=False, - metadata={"example": "是否预警", "description": "预警类型"}, - load_default=0, - ) - warning_effective_config = fields.String( - required=False, - metadata={"example": "预警触发表达式", "description": "预警触发表达式"}, - load_default=None, - ) - is_calling = fields.Integer( - required=False, - metadata={"example": "是否报警", "description": "报警类型"}, - load_default=0, - ) - calling_effective_config = fields.String( - required=False, - metadata={"example": "报警触发表达式", "description": "报警触发表达式"}, - load_default=None, - ) - method_content = fields.String( - required=False, - metadata={"example": "方法节点", "description": "方法节点"}, - load_default=None, - ) class NodeUpdateRequest(BaseSchema): """ 节点更新信息 """ - + device_id = fields.Integer( + required=True, + metadata={"example": 1, "description": "设备ID"}, + ) endpoint_id = fields.Integer( required=True, metadata={"example": 1, "description": "采集端ID"}, ) + node_number = fields.String( + required=True, + metadata={"example": "tt_01", "description": "节点编号"}, + ) title = fields.String( required=True, - metadata={"example": "节点ID", "descriptrion": "节点ID"}, + metadata={"example": "节点ID", "description": "节点ID"}, ) mark = fields.String( required=False, @@ -132,31 +128,6 @@ class NodeUpdateRequest(BaseSchema): required=True, metadata={"example": "text", "description": "值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型"}, ) - is_warning = fields.Integer( - required=False, - metadata={"example": "是否预警", "description": "预警类型"}, - load_default=0, - ) - warning_effective_config = fields.String( - required=False, - metadata={"example": "预警触发表达式", "description": "预警触发表达式"}, - load_default=None, - ) - is_calling = fields.Integer( - required=False, - metadata={"example": "是否报警", "description": "报警类型"}, - load_default=0, - ) - calling_effective_config = fields.String( - required=False, - metadata={"example": "报警触发表达式", "description": "报警触发表达式"}, - load_default=None, - ) - method_content = fields.String( - required=False, - metadata={"example": "方法节点", "description": "方法节点"}, - load_default=None, - ) status = fields.Integer( required=False, metadata={"example": 1, "description": "状态(0-禁用,1-启用)"}, diff --git a/iti/applications/service/iot/alert.py b/iti/applications/service/iot/alert.py index 5177787..ee94132 100644 --- a/iti/applications/service/iot/alert.py +++ b/iti/applications/service/iot/alert.py @@ -7,9 +7,20 @@ from iti.applications.models import ( IotAlertLog, IotAlertRule, IotAlertPush, + IotNode, IotEndpoint, ) from sqlalchemy import select, distinct +from simpleeval import simple_eval + +def delete_node_alert_rule(node: IotNode): + """ + 删除节点告警规则 + Args: + node: 节点对象 + """ + db.session.query(IotAlertRule).filter_by(node_id=node.id).delete() + db.session.commit() def add_endpoint_alert_log(endpoint_id: int): """ @@ -42,6 +53,81 @@ def add_endpoint_alert_log(endpoint_id: int): db.session.commit() return "" +def add_node_alert_log(node_id: int, alert_value: str): + """ + 添加节点值异常告警日志 + Args: + node_id: 节点ID + alert_value: 告警值 + """ + node = db.session.scalar(select(IotNode).filter_by(id=node_id)) + if not node: + return "节点不存在" + + alert_rule_list = db.session.scalars(select(IotAlertRule).filter_by(node_id=node_id)).all() + if len(alert_rule_list) > 0: + for alert_rule in alert_rule_list: + if alert_rule.status == 1: + # 根据data_type转换alert_value + value_data = get_value(node.data_type, alert_value) + # 检查告警值是否满足触发规则 + if is_alert_trigger(alert_rule.alert_rule, value_data): + alert_tag = f"ep{node.endpoint_id}_nd{node_id}" + alert_log = db.session.scalar(select(IotAlertLog).filter_by(alert_tag=alert_tag)) + if alert_log: + alert_log.trigger_count += 1 + alert_log.alert_content = complete_alert_text(node, alert_rule, alert_value) + if alert_log.trigger_count >= alert_rule.trigger_count: + alert_log.status = 1 + push_alert(alert_log) + else: + dict_data = dict( + alert_tag=alert_tag, + alert_target_name=f"采集节点({node.node_number})", + alert_level=alert_rule.alert_level, + status=0, + trigger_count=1, + alert_content=complete_alert_text(node, alert_rule, alert_value), + ) + alert_log = IotAlertLog(**dict_data) + db.session.add(alert_log) + db.session.commit() + else: + current_app.logger.info(f"节点{node.node_number}值{alert_value}未触发告警规则{alert_rule.alert_rule}") + + return "" + +def get_value(data_type: str, alert_value: str): + """ + 获取告警值 + Args: + data_type: 数据类型 + alert_value: 告警值 + """ + if data_type == "int": + return int(alert_value) + elif data_type == "float": + return float(alert_value) + elif data_type == "double": + return float(alert_value) + else: + return alert_value + + +def is_alert_trigger(alert_rule_text: str, value_data: Any): + """ + 检查告警值是否满足触发规则 + Args: + alert_rule_text: 告警规则文本 + value_data: 告警值 + """ + # 解析告警规则文本,判断是否满足触发规则 + try: + result = simple_eval_expression(alert_rule_text, {'x': value_data}) + return result + except (NameError, SyntaxError, TypeError) as e: + current_app.logger.error(f"评估表达式 {alert_rule_text} 时出错: {e}") + return False def push_alert(alert_log: IotAlertLog): """ @@ -51,8 +137,34 @@ def push_alert(alert_log: IotAlertLog): """ level = f"{alert_log.alert_level}" - alert_push_list = db.session.scalars(select(IotAlertPush).filter(IotAlertPush.alert_level.contains(level))).all() + alert_push_list = db.session.scalars(select(IotAlertPush).filter(IotAlertPush.alert_level.contains(level)).filter_by(status=1)).all() if alert_push_list: for alert_push in alert_push_list: # TODO 调用地址发送告警消息 - current_app.logger.info(f"推送告警消息到 {alert_push.push_url},内容:{alert_log.alert_content}") \ No newline at end of file + current_app.logger.info(f"推送告警消息到 {alert_push.push_url},内容:{alert_log.alert_content}") + +def complete_alert_text(node: IotNode, alert_rule: IotAlertRule, alert_value: str): + """ + 完善告警内容文本 + Args: + node: 节点 + alert_rule: 告警规则 + alert_value: 告警值 + """ + alert_text_temp = alert_rule.alert_text or "" + alert_text_temp = alert_text_temp.replace("{_workshopName_}", node.workshop.workshop_name+"("+node.workshop.workshop_number+")") + alert_text_temp = alert_text_temp.replace("{_deviceName_}", node.device.device_name+"("+node.device.device_number+")") + alert_text_temp = alert_text_temp.replace("{_endpointName_}", node.endpoint.endpoint_name+"("+node.endpoint.endpoint_number+")") + alert_text_temp = alert_text_temp.replace("{_mark_}", node.mark) + alert_text_temp = alert_text_temp.replace("{_nodeNumber_}", node.node_number) + alert_text_temp = alert_text_temp.replace("{_alertValue_}", alert_value) + return alert_text_temp + +def simple_eval_expression(expr, variables): + """ + 简单地评估包含变量的字符串表达式 + :param expr: 字符串表达式,如 "x > 5 and y < 10" + :param variables: 变量字典,如 {'x': 7, 'y': 3} + :return: 表达式结果 + """ + return simple_eval(expr, names=variables) \ No newline at end of file