from sqlalchemy.orm import noload from sqlalchemy.sql._typing import ColumnExpressionArgument from typing import List, Dict, Any, Optional, Set from iti.applications.extensions import db from flask import current_app from iti.applications.models import ( IotAlertLog, IotAlertRule, IotAlertPush, IotEndpoint, ) from sqlalchemy import select, distinct def add_endpoint_alert_log(endpoint_id: int): """ 添加采集端告警日志 Args: endpoint_id: 采集端ID """ endpoint = db.session.scalar(select(IotEndpoint).filter_by(id=endpoint_id)) if not endpoint: return "采集端不存在" alert_tag = f"ep{endpoint.id}_nd0" alert_log = db.session.scalar(select(IotAlertLog).filter_by(alert_tag=alert_tag)) if alert_log: alert_log.trigger_count += 1 if alert_log.trigger_count >= 3: alert_log.status = 1 push_alert(alert_log) else: dict_data = dict( alert_tag=alert_tag, alert_target_name=f"{endpoint.endpoint_name}({endpoint.endpoint_number})", alert_level=1, status=0, trigger_count=1, alert_content=f"采集端 {endpoint.endpoint_name}({endpoint.endpoint_number}) 网络不可达", ) alert_log = IotAlertLog(**dict_data) db.session.add(alert_log) db.session.commit() return "" def push_alert(alert_log: IotAlertLog): """ 推送告警消息 Args: alert_log: 告警日志 """ level = f"{alert_log.alert_level}" alert_push_list = db.session.scalars(select(IotAlertPush).filter(IotAlertPush.alert_level.contains(level))).all() if alert_push_list: for alert_push in alert_push_list: # TODO 调用地址发送告警消息 current_app.logger.info(f"推送告警消息到 {alert_push.push_url},内容:{alert_log.alert_content}")