Compare commits

..

No commits in common. 'iot' and 'main' have entirely different histories.
iot ... main

@ -2,9 +2,6 @@
[envs.default]
type = "virtual"
dependencies = [
"rocketmq-client-python",
"simpleeval",
"influxdb-client",
"flask>=3.1.0",
"apiflask>=2.4.0",
"flask-cors>=6.0.0",

@ -1,7 +1,7 @@
FLASK_ENV=dev
SECRET_KEY=iti-flask
JWT_SECRET_KEY=iti-flask
DATABASE_URL=mysql+pymysql://root:chen222262@124.223.195.237:13306/iti-flask?charset=utf8mb4
DATABASE_URL=mysql+pymysql://root:root@127.0.0.1:3307/iti-flask?charset=utf8mb4
# 前端相关
FRONTEND_ENABLED=False # 是否启用前端渲染
FRONTEND_PATH=dist # 前端文件所在位置,若static则无需填写
@ -12,24 +12,4 @@ FRONTEND_PATH=dist # 前端文件所在位置,若static则无需填写
ALIYUN_OSS_ACCESS_KEY_ID=LTAI5t9cymUAWHVEo36yygaT
ALIYUN_OSS_ACCESS_KEY_SECRET=FaaUsxadRYyshbYeAV8ypZNYVOx3tE
ALIYUN_OSS_ENDPOINT=oss-cn-chengdu.aliyuncs.com
ALIYUN_OSS_BUCKET=maintaince-dev
# ============================================
# influxdb 配置
# ============================================
INFLUXDB_URL="http://124.223.195.237:8086"
INFLUXDB_TOKEN="HdHOox3RqEjJ--Ma9_dFcf-Iv8wu2u0FyD_sV4MT4EIoQoT7h4eZLR_n_yGgmiSLAGiIaUgaH6x-cILNGV8W4g=="
INFLUXDB_ORG="noface"
INFLUXDB_BUCKET="yh-iot"
INFLUXDB_MAX_RETRIES=3
INFLUXDB_RETRY_DELAY=1.0
INFLUXDB_MAX_RETRY_DELAY=30.0
# ============================================
# rocketmq 配置
# ============================================
ROCKETMQ_NAMESRV_ADDR="124.223.195.237:9876"
ROCKETMQ_PRODUCER_GROUP="iot-collect-group"
ROCKETMQ_CONSUMER_GROUP="iot-collect-group"
ROCKETMQ_TOPIC="iot-collect-topic"
ROCKETMQ_TAGS="*"
ALIYUN_OSS_BUCKET=maintaince-dev

@ -10,10 +10,3 @@ from .sys.sys_dept import SysDept, SysDeptSchema
from .sys.sys_rel_user_dept import sys_user_dept
from .sys.sys_menu import SysMenu, SysMenuSchema, SysMenuMetaSchema
from .sys.sys_file import SysFile, SysFileSchema, SysFileDirectory, SysFileDirectorySchema
from .iot.iot_workshop import IotWorkshop, IotWorkshopSchema
from .iot.iot_device import IotDevice, IotDeviceSchema
from .iot.iot_endpoint import IotEndpoint, IotEndpointSchema
from .iot.iot_node import IotNode, IotNodeSchema
from .iot.iot_alert_rule import IotAlertRule, IotAlertRuleSchema
from .iot.iot_alert_log import IotAlertLog, IotAlertLogSchema
from .iot.iot_alert_push import IotAlertPush, IotAlertPushSchema

@ -1,40 +0,0 @@
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime, Nested
class IotAlertLog(db.Model, TimeModelMixin):
"""
告警日志表
"""
__tablename__ = "iot_alert_log"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
comment="标识",
)
alert_tag = db.Column(db.String(255), nullable=False, comment="告警标签")
alert_target_name = db.Column(db.String(255), nullable=False, comment="告警对象名称")
alert_content = db.Column(db.String(2048), nullable=False, comment="告警文本")
alert_level = db.Column(db.Integer, nullable=False, comment="告警级别 0-预警,1-一般,2-紧急,3-严重")
status = db.Column(db.Integer, nullable=False, default=1, comment="状态 1-告警中,0-已恢复")
trigger_count = db.Column(db.Integer, nullable=False, default=0, comment="触发次数")
class IotAlertLogSchema(BaseSchema):
"""
告警日志表响应结构
"""
class Meta:
name = "IotAlertLog"
id = Integer()
alert_tag = String()
alert_target_name = String()
alert_content = String()
alert_level = Integer()
status = Integer()
trigger_count = Integer()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")

@ -1,32 +0,0 @@
from ast import List
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime, Nested, List
class IotAlertPush(TimeModelMixin, db.Model):
"""
告警推送表
"""
__tablename__ = "iot_alert_push"
id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment="标识")
target_name = db.Column(db.String(255), nullable=False, comment="接收对象名称")
push_url = db.Column(db.String(2048), nullable=False, comment="告警推送URL")
alert_level = db.Column(db.String(255), nullable=False, comment="告警等级")
status = db.Column(db.Integer, nullable=False, default=1, comment="状态 1-启用,0-禁用")
class IotAlertPushSchema(BaseSchema):
"""
告警推送表响应结构
"""
class Meta:
name = "IotAlertPush"
id = Integer()
target_name = String()
push_url = String()
alert_level = List(Integer())
status = Integer()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")

@ -1,42 +0,0 @@
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime, Nested
class IotAlertRule(db.Model, TimeModelMixin):
"""
告警规则表
"""
__tablename__ = "iot_alert_rule"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
comment="标识",
)
rule_name = db.Column(db.String(255), nullable=False, comment="告警规则名称")
node_id = db.Column(db.Integer, nullable=False, comment="采集节点ID")
trigger_count = db.Column(db.Integer, nullable=False, comment="阈值触发次数,超过次数后告警")
alert_rule = db.Column(db.String(255), nullable=False, comment="告警触发表达式")
alert_text = db.Column(db.String(2048), nullable=False, comment="告警文本")
alert_level = db.Column(db.Integer, nullable=False, comment="告警级别 0-预警,1-一般,2-紧急,3-严重")
status = db.Column(db.Integer, nullable=False, default=1, comment="状态 1-启用,0-禁用")
class IotAlertRuleSchema(BaseSchema):
"""
告警规则表响应结构
"""
class Meta:
name = "IotAlertRule"
id = Integer()
rule_name = String()
node_id = Integer()
trigger_count = Integer()
alert_rule = String()
alert_text = String()
alert_level = Integer()
status = Integer()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")

@ -1,60 +0,0 @@
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime, Nested
class IotDevice(db.Model, TimeModelMixin):
"""
设备信息表
"""
__tablename__ = "iot_device"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
comment="标识",
)
workshop_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="车间ID")
device_name = db.Column(db.String(255), nullable=False, unique=True, comment="设备名称")
device_number = db.Column(db.String(20), nullable=False, comment="设备编号")
description = db.Column(db.Text, nullable=False, comment="设备描述")
brand_name = db.Column(db.String(255), nullable=False, comment="品牌名称")
specification_model = db.Column(db.String(255), nullable=False, comment="规格型号")
status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:已停机 1:生产中 2:维修中")
#关系
workshop = db.relationship(
"IotWorkshop",
primaryjoin="foreign(IotDevice.workshop_id) == IotWorkshop.id",
)
class IotDeviceSchema(BaseSchema):
"""
设备信息表响应结构
"""
class Meta:
name = "IotDevice"
id = Integer()
workshop_id = Integer()
device_name = String()
device_number = String()
description = String()
brand_name = String()
specification_model = String()
status = Integer()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")
#关系
workshop = Nested("IotWorkshopSimpleSchema")
class IotDeviceSimpleSchema(BaseSchema):
"""
设备信息表联合查询响应结构
"""
class Meta:
name = "IotDevice"
device_name = String()
device_number = String()

@ -1,61 +0,0 @@
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime, Nested
class IotEndpoint(db.Model, TimeModelMixin):
"""
采集端信息表
"""
__tablename__ = "iot_endpoint"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
comment="标识",
)
endpoint_name = db.Column(db.String(255), nullable=False, unique=True, comment="采集端名称")
endpoint_number = db.Column(db.String(20), nullable=False, comment="采集端编号")
description = db.Column(db.Text, nullable=False, comment="采集端描述")
ip = db.Column(db.String(255), nullable=False, comment="采集端IP")
port = db.Column(db.String(255), nullable=False, comment="采集端端口")
opc_url_temp = db.Column(db.String(1024), nullable=False, comment="OPC URL模板")
brand_name = db.Column(db.String(255), nullable=False, comment="品牌名称")
specification_model = db.Column(db.String(255), nullable=False, comment="规格型号")
is_online = db.Column(db.Integer, nullable=False, default=0, comment="在线状态 0:离线 1:在线")
status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:停用 1:运行中 2:维修中")
class IotEndpointSchema(BaseSchema):
"""
采集端信息表响应结构
"""
class Meta:
name = "IotEndpoint"
id = Integer()
endpoint_name = String()
endpoint_number = String()
description = String()
ip = String()
port = String()
opc_url_temp = String()
brand_name = String()
specification_model = String()
is_online = Integer()
status = Integer()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")
class IotEndpointSimpleSchema(BaseSchema):
"""
采集端信息表联合查询响应结构
"""
class Meta:
name = "IotEndpoint"
endpoint_name = String()
endpoint_number = String()

@ -1,67 +0,0 @@
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime, Nested
class IotNode(db.Model, TimeModelMixin):
"""
节点信息表
"""
__tablename__ = "iot_node"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
comment="标识",
)
workshop_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="车间ID")
device_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="设备ID")
endpoint_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="采集端ID")
node_number = db.Column(db.String(20), nullable=False, unique=True, comment="节点编号")
title = db.Column(db.String(255), nullable=False, unique=True, comment="节点ID")
mark = db.Column(db.String(255), nullable=False, comment="采集标识")
mark_type = db.Column(db.Integer, nullable=False, comment="采集类型 1:只读 2:只写 3:读写")
tag_label = db.Column(db.String(255), nullable=False, comment="变量别名,用于数据存储标记")
data_type = db.Column(db.String(255), nullable=False, comment="值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型")
status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:禁用 1:启用")
#关系
workshop = db.relationship(
"IotWorkshop",
primaryjoin="foreign(IotNode.workshop_id) == IotWorkshop.id",
)
device = db.relationship(
"IotDevice",
primaryjoin="foreign(IotNode.device_id) == IotDevice.id",
)
endpoint = db.relationship(
"IotEndpoint",
primaryjoin="foreign(IotNode.endpoint_id) == IotEndpoint.id",
)
class IotNodeSchema(BaseSchema):
"""
节点信息表响应结构
"""
class Meta:
name = "IotNode"
id = Integer()
workshop_id = Integer()
device_id = Integer()
endpoint_id = Integer()
node_number = String()
title = String()
mark = String()
mark_type = Integer()
tag_label = String()
data_type = String()
status = Integer()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")
#关系
workshop = Nested("IotWorkshopSimpleSchema")
device = Nested("IotDeviceSimpleSchema")
endpoint = Nested("IotEndpointSimpleSchema")

@ -1,51 +0,0 @@
from iti.applications.extensions import db
from iti.applications.common.crud import TimeModelMixin, RemarkModelMixin
from iti.applications.common.utils import BaseSchema
from apiflask.fields import String, Integer, DateTime
class IotWorkshop(db.Model, TimeModelMixin, RemarkModelMixin):
"""
车间信息表
"""
__tablename__ = "iot_workshop"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
comment="标识",
)
workshop_name = db.Column(db.String(255), nullable=False, unique=True, comment="车间名称")
workshop_number = db.Column(db.String(50), nullable=False, comment="车间编号")
total_area = db.Column(db.String(50), nullable=False, comment="总面积(单位:平方米)")
director_name = db.Column(db.String(30), nullable=False, comment="负责人姓名")
director_phone = db.Column(db.String(15), nullable=False, comment="负责人电话")
status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:已停用 1:生产中")
class IotWorkshopSchema(BaseSchema):
"""
车间信息表响应结构
"""
class Meta:
name = "IotWorkshop"
id = Integer()
workshop_name = String()
workshop_number = String()
total_area = String()
director_name = String()
director_phone = String()
status = Integer()
remark = String()
created_at = DateTime(format="%Y-%m-%d %H:%M:%S")
updated_at = DateTime(format="%Y-%m-%d %H:%M:%S")
class IotWorkshopSimpleSchema(BaseSchema):
"""
车间信息表联合查询响应结构
"""
class Meta:
name = "IotWorkshop"
workshop_name = String()
workshop_number = String()

@ -1,6 +1,5 @@
from iti.applications.extensions import broadcast_execute
from iti.applications.routes.common import register_common_bp
from iti.applications.routes.iot import register_iot_bp
from iti.applications.routes.sys import register_sys_bp
from iti.applications.routes.index import bp as index_bp
from iti.applications.routes.front import bp as frontend_bp
@ -18,8 +17,5 @@ def init_routes(app):
# 系统API蓝图注册
register_sys_bp(app)
# 物联网API注册
register_iot_bp(app)
# 插件初始化
broadcast_execute(app, "event_init")

@ -1,22 +0,0 @@
from apiflask import APIBlueprint
from .workshop_ctl import bp as workshop_bp
from .device_ctl import bp as device_bp
from .endpoint_ctl import bp as endpoint_bp
from .node_ctl import bp as node_bp
from .alert_rule_ctl import bp as alert_rule_bp
from .alert_log_ctl import bp as alert_log_bp
from .alert_push_ctl import bp as alert_push_bp
iot_bp = APIBlueprint("iot", __name__, url_prefix="/iot")
def register_iot_bp(app):
iot_bp.register_blueprint(workshop_bp)
iot_bp.register_blueprint(device_bp)
iot_bp.register_blueprint(endpoint_bp)
iot_bp.register_blueprint(node_bp)
iot_bp.register_blueprint(alert_rule_bp)
iot_bp.register_blueprint(alert_log_bp)
iot_bp.register_blueprint(alert_push_bp)
app.register_blueprint(iot_bp)

@ -1,119 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.service.iot.alert import (
add_endpoint_alert_log,
add_node_alert_log
)
from iti.applications.extensions import db
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotAlertLog,
IotAlertLogSchema,
)
from .schemas.alert_log import (
AlertLogQuery,
AlertLogAddRequest,
AlertLogUpdateRequest,
NodeAlertLogAddRequest,
)
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from sqlalchemy.orm import noload
from iti.applications.common import permission
bp = APIBlueprint("iot_alert_log", __name__, url_prefix="/alertLog", tag="告警中心")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertLog:list")
@bp.input(AlertLogQuery.Schema(partial=True), location="query")
@bp.output(IotAlertLogSchema(many=True))
def list_alert_log(query_data: AlertLogQuery):
"""
获取告警日志列表
"""
return success(get_list_or_page(query_data))
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertLog:list")
@bp.input(AlertLogQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotAlertLogSchema))
def page_alert_log(query_data: AlertLogQuery):
"""
获取告警日志分页列表
"""
return success(get_list_or_page(query_data))
@bp.post("/add/<int:endpoint_id>")
# @jwt_required()
# @bp.doc(security="JWT")
# @permission("iot:alertLog:add")
def add_alert_log(endpoint_id: int):
"""
添加采集端网络不可达告警日志
"""
result = add_endpoint_alert_log(endpoint_id)
if len(result) > 0:
raise BizException(result)
return success()
@bp.post("/addNodeValue")
# @jwt_required()
# @bp.doc(security="JWT")
# @permission("iot:alertLog:add")
@bp.input(NodeAlertLogAddRequest, location="json")
def add_node_alert(json_data: dict):
"""
添加节点值异常告警日志
"""
for node_data in json_data["node_data_list"]:
node_id = node_data["node_id"]
alert_value = node_data["alert_value"]
result = add_node_alert_log(node_id, alert_value)
if len(result) > 0:
raise BizException(result)
return success();
@bp.post("/recover/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertLog:update")
def recover_alert_log(id: int):
"""
恢复告警日志
"""
alert_log = db.session.scalar(select(IotAlertLog).filter_by(id=id))
if not alert_log:
raise BizException("告警日志不存在")
alert_log.trigger_count =0
alert_log.status = 0
db.session.commit()
return success()
def get_list_or_page(query_data: AlertLogQuery):
"""
获取告警日志列表
"""
query = select(IotAlertLog).order_by(IotAlertLog.created_at.desc())
if query_data.alert_tag is not None:
query = query.filter(IotAlertLog.alert_tag == query_data.alert_tag)
if query_data.status is not None:
query = query.filter(IotAlertLog.status == query_data.status)
if query_data.page and query_data.size:
return db.paginate(query, page=query_data.page, per_page=query_data.size)
else:
return db.session.scalars(query).all()

@ -1,132 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.extensions import db, sys_log
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotAlertPush,
IotAlertPushSchema,
)
from .schemas.alert_push import (
AlertPushQuery,
AlertPushAddRequest,
AlertPushUpdateRequest,
)
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from sqlalchemy.orm import noload
from iti.applications.common import permission
bp = APIBlueprint("iot_alert_push", __name__, url_prefix="/alertPush", tag="消息通知")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertPush:list")
@bp.input(AlertPushQuery.Schema(partial=True), location="query")
@bp.output(IotAlertPushSchema(many=True))
def list_alert_push(query_data: AlertPushQuery):
"""
获取消息通知列表
"""
r = get_list_or_page(query_data)
for item in r.items:
item.alert_level = list(map(int, item.alert_level.split(",")))
return success(r)
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertPush:list")
@bp.input(AlertPushQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotAlertPushSchema))
def page_alert_push(query_data: AlertPushQuery):
"""
获取消息通知分页列表
"""
r = get_list_or_page(query_data)
for item in r.items:
item.alert_level = list(map(int, item.alert_level.split(",")))
return success(r)
@bp.post("/add")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertPush:add")
@bp.input(AlertPushAddRequest,location="json")
def add_alert_push(json_data: dict):
"""
添加消息通知
"""
if "alert_level" in json_data:
json_data["alert_level"] = ",".join(map(str, json_data["alert_level"]))
alert_push = IotAlertPush(**json_data)
alert_push.status = 0
db.session.add(alert_push)
db.session.commit()
return success()
@bp.put("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertPush:update")
@bp.input(AlertPushUpdateRequest(partial=True), location="json")
def update_alert_push(id: int, json_data: dict):
"""
更新消息通知
"""
alert_push = db.session.scalar(select(IotAlertPush).where(IotAlertPush.id == id))
if not alert_push:
raise BizException("消息通知不存在")
for key, value in json_data.items():
if key == "alert_level":
value = ",".join(map(str, value))
if value is not None:
setattr(alert_push, key, value)
db.session.commit()
return success()
@bp.delete("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertPush:delete")
def delete_alert_push(id: int):
"""
删除消息通知
"""
alert_push = db.session.scalar(select(IotAlertPush).where(IotAlertPush.id == id))
if not alert_push:
raise BizException("消息通知不存在")
db.session.delete(alert_push)
db.session.commit()
return success()
def get_list_or_page(query_data: AlertPushQuery):
"""
获取消息通知列表
"""
query = select(IotAlertPush).order_by(IotAlertPush.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotAlertPush.target_name.like(f"%{kw}%")
)
if query_data.status is not None:
query = query.filter(IotAlertPush.status == query_data.status)
if query_data.page and query_data.size:
return db.paginate(query, page=query_data.page, per_page=query_data.size)
else:
return db.session.scalars(query).all()

@ -1,111 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.extensions import db, sys_log
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotAlertRule,
IotAlertRuleSchema,
)
from .schemas.alert_rule import (
AlertRuleQuery,
AlertRuleAddRequest,
AlertRuleUpdateRequest,
)
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from sqlalchemy.orm import noload
from iti.applications.common import permission
bp = APIBlueprint("iot_alert_rule", __name__, url_prefix="/alertRule", tag="告警规则")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertRule:list")
@bp.input(AlertRuleQuery.Schema(partial=True), location="query")
@bp.output(IotAlertRuleSchema(many=True))
def get_alert_rule_list(query_data: AlertRuleQuery):
"""
获取告警规则列表
"""
return get_list_or_page(query_data)
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertRule:list")
@bp.input(AlertRuleQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotAlertRuleSchema))
def get_alert_rule_page(query_data: AlertRuleQuery):
"""
获取告警规则分页列表
"""
return page(get_list_or_page(query_data))
@bp.post("/add")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertRule:add")
@bp.input(AlertRuleAddRequest, location="json")
def add_alert_rule(json_data: dict):
"""
添加告警规则
"""
alert_rule = IotAlertRule(**json_data)
db.session.add(alert_rule)
db.session.commit()
return success()
@bp.put("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertRule:update")
@bp.input(AlertRuleUpdateRequest(partial=True), location="json")
def update_alert_rule(id: int, json_data: dict):
"""
更新告警规则
"""
alert_rule = db.session.scalar(select(IotAlertRule).where(IotAlertRule.id == id))
if not alert_rule:
raise BizException("告警规则不存在")
for key, value in json_data.items():
setattr(alert_rule, key, value)
db.session.commit()
return success()
@bp.delete("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:alertRule:delete")
def delete_alert_rule(id: int):
"""
删除告警规则
"""
alert_rule = db.session.scalar(select(IotAlertRule).where(IotAlertRule.id == id))
if not alert_rule:
raise BizException("告警规则不存在")
db.session.delete(alert_rule)
db.session.commit()
return success()
def get_list_or_page(query_data: AlertRuleQuery):
"""
获取告警规则列表
"""
query = select(IotAlertRule).order_by(IotAlertRule.created_at.desc())
if query_data.node_id:
query = query.filter(IotAlertRule.node_id == query_data.node_id)
if query_data.status is not None:
query = query.filter(IotAlertRule.status == query_data.status)
if query_data.page and query_data.size:
return db.paginate(query, page=query_data.page, per_page=query_data.size)
else:
return db.session.scalars(query).all()

@ -1,185 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.extensions import db, sys_log
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotDevice,
IotDeviceSchema,
)
from .schemas.device import (
DeviceQuery,
DeviceAddRequest,
DeviceUpdateRequest,
)
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from sqlalchemy.orm import noload
from iti.applications.common import permission
bp = APIBlueprint("iot_device", __name__, url_prefix="/device", tag="设备管理")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:device:list")
@bp.input(DeviceQuery.Schema(partial=True), location="query")
@bp.output(IotDeviceSchema(many=True))
def list_device(query_data: DeviceQuery):
"""
获取设备列表
"""
return success(get_list(query_data))
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:device:list")
@bp.input(DeviceQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotDeviceSchema(many=True)))
def page_device(query_data: DeviceQuery):
"""
分页获取设备列表
"""
return page(get_page(query_data))
@bp.post("/add")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:device:add")
@bp.input(DeviceAddRequest, location="json")
def add_device(json_data: dict):
"""
添加设备信息
"""
# 判断设备编号是否唯一
if json_data.get("device_number") is not None:
if db.session.scalar(
select(
exists().where(
IotDevice.device_number == json_data.get("device_number")
)
)
):
raise BizException("同编号设备已存在")
device = IotDevice(**json_data)
device.status = 0
db.session.add(device)
db.session.commit()
return success()
@bp.put("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:device:update")
@bp.input(DeviceUpdateRequest(partial=True), location="json")
def update_device(id: int, json_data: dict):
"""
更新设备信息
"""
# 判断设备编号是否唯一
if json_data.get("device_number") is not None:
if db.session.scalar(
select(
exists().where(
IotDevice.device_number == json_data.get("device_number"), IotDevice.id != id
)
)
):
raise BizException("同编号设备已存在")
device = db.session.scalar(select(IotDevice).filter_by(id=id))
if not device:
raise BizException("设备信息不存在")
for key, value in json_data.items():
if value is not None:
setattr(device, key, value)
db.session.commit()
return success()
@bp.delete("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:device:delete")
def delete_device(id: int):
"""
删除设备信息
"""
device = db.session.scalar(select(IotDevice).filter_by(id=id))
if not device:
raise BizException("设备不存在")
# 删除设备
db.session.delete(device)
db.session.commit()
return success()
@bp.get("/count")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:device:list")
def count_device():
"""
统计设备数量
"""
countData = {}
deviceReady = db.session.query(func.count(IotDevice.id).label('number')).filter_by(status=1).first().number
deviceUnready = db.session.query(func.count(IotDevice.id).label('number')).filter_by(status=0).first().number
deviceFix = db.session.query(func.count(IotDevice.id).label('number')).filter_by(status=2).first().number
countData["ready"] = deviceReady
countData["unReady"] = deviceUnready
countData["fix"] = deviceFix
countData["total"] = deviceReady + deviceUnready + deviceFix
return success(countData)
def get_page(query_data: DeviceQuery):
"""
获取设备信息分页
"""
query = select(IotDevice).order_by(IotDevice.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotDevice.device_name.like(f"%{kw}%")
| IotDevice.device_number.like(f"%{kw}%")
)
if query_data.workshop_id:
query = query.filter(IotDevice.workshop_id == query_data.workshop_id)
if query_data.status is not None:
query = query.filter(IotDevice.status == query_data.status)
return db.paginate(query, page=query_data.page, per_page=query_data.size)
def get_list(query_data: DeviceQuery):
"""
获取设备列表
"""
query = select(IotDevice).options(noload(IotDevice.workshop)).order_by(IotDevice.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotDevice.device_name.like(f"%{kw}%")
| IotDevice.device_number.like(f"%{kw}%")
)
if query_data.workshop_id:
query = query.filter(IotDevice.workshop_id == query_data.workshop_id)
if query_data.status is not None:
query = query.filter(IotDevice.status == query_data.status)
return db.session.scalars(query).all()

@ -1,171 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.extensions import db, sys_log
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotDevice,
IotEndpoint,
IotEndpointSchema,
)
from .schemas.endpoint import (
EndpointQuery,
EndpointAddRequest,
EndpointUpdateRequest,
)
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from sqlalchemy.orm import noload
from iti.applications.common import permission
bp = APIBlueprint("iot_endpoint", __name__, url_prefix="/endpoint", tag="采集端管理")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:endpoint:list")
@bp.input(EndpointQuery.Schema(partial=True), location="query")
@bp.output(IotEndpointSchema(many=True))
def list_endpoint(query_data: EndpointQuery):
"""
获取采集端列表
"""
return success(get_list_or_page(query_data))
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:endpoint:list")
@bp.input(EndpointQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotEndpointSchema(many=True)))
def page_endpoint(query_data: EndpointQuery):
"""
分页获取采集端列表
"""
return page(get_list_or_page(query_data))
@bp.post("/add")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:endpoint:add")
@bp.input(EndpointAddRequest, location="json")
def add_endpoint(json_data: dict):
"""
添加采集端信息
"""
# 判断采集端编号是否唯一
if json_data.get("endpoint_number") is not None:
if db.session.scalar(
select(
exists().where(
IotEndpoint.endpoint_number == json_data.get("endpoint_number")
)
)
):
raise BizException("同编号采集端已存在")
endpoint = IotEndpoint(**json_data)
endpoint.status = 0
db.session.add(endpoint)
db.session.commit()
return success()
@bp.put("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:endpoint:update")
@bp.input(EndpointUpdateRequest(partial=True), location="json")
def update_endpoint(id: int, json_data: dict):
"""
更新采集端信息
"""
# 判断采集端编号是否唯一
if json_data.get("endpoint_number") is not None:
if db.session.scalar(
select(
exists().where(
IotEndpoint.endpoint_number == json_data.get("endpoint_number"), IotEndpoint.id != id
)
)
):
raise BizException("同编号采集端已存在")
endpoint = db.session.scalar(
select(IotEndpoint).filter_by(id=id))
if not endpoint:
raise BizException("采集端信息不存在")
for key, value in json_data.items():
if value is not None:
setattr(endpoint, key, value)
db.session.commit()
return success()
@bp.delete("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:endpoint:delete")
def delete_endpoint(id: int):
"""
删除采集端信息
"""
endpoint = db.session.scalar(
select(IotEndpoint)
.filter_by(id=id))
if not endpoint:
raise BizException("采集端不存在")
# 删除
db.session.delete(endpoint)
db.session.commit()
return success()
@bp.get("/count")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:endpoint:list")
def count_endpoint():
"""
统计采集端数量
"""
countData = {}
endpointReady = db.session.query(func.count(IotEndpoint.id).label('number')).filter_by(status=1).first().number
endpointUnready = db.session.query(func.count(IotEndpoint.id).label('number')).filter_by(status=0).first().number
endpointFix = db.session.query(func.count(IotEndpoint.id).label('number')).filter_by(status=2).first().number
countData["ready"] = endpointReady
countData["unReady"] = endpointUnready
countData["fix"] = endpointFix
countData["total"] = endpointReady + endpointUnready + endpointFix
return success(countData)
def get_list_or_page(query_data: EndpointQuery):
"""
获取采集端信息列表
"""
query = select(IotEndpoint).order_by(IotEndpoint.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotEndpoint.endpoint_name.like(f"%{kw}%")
| IotEndpoint.endpoint_number.like(f"%{kw}%")
)
if query_data.status is not None:
query = query.filter(IotEndpoint.status == query_data.status)
if query_data.page and query_data.size:
return db.paginate(query, page=query_data.page, per_page=query_data.size)
else:
return db.session.scalars(query).all()

@ -1,245 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.extensions import db, sys_log
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotDevice,
IotEndpoint,
IotNode,
IotNodeSchema,
)
from .schemas.node import (
NodeQuery,
NodeAddRequest,
NodeUpdateRequest,
)
from iti.applications.service.iot import (
iot_influxdb,
)
from iti.applications.service.iot.alert import delete_node_alert_rule
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from sqlalchemy.orm import noload
from iti.applications.common import permission
bp = APIBlueprint("iot_node", __name__, url_prefix="/node", tag="采集节点管理")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:list")
@bp.input(NodeQuery.Schema(partial=True), location="query")
@bp.output(IotNodeSchema(many=True))
def list_node(query_data: NodeQuery):
"""
获取采集节点列表
"""
return success(get_list(query_data))
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:list")
@bp.input(NodeQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotNodeSchema(many=True)))
def page_node(query_data: NodeQuery):
"""
分页获取采集节点列表
"""
return page(get_page(query_data))
@bp.get("/count")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:list")
def count_node():
"""
统计采集节点数量
"""
countData = {}
nodeReady = db.session.query(func.count(IotNode.id).label('number')).filter_by(status=1).first().number
nodeUnready = db.session.query(func.count(IotNode.id).label('number')).filter_by(status=0).first().number
countData["ready"] = nodeReady
countData["unReady"] = nodeUnready
countData["total"] = nodeReady + nodeUnready
return success(countData)
@bp.post("/add")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:add")
@bp.input(NodeAddRequest, location="json")
def add_node(json_data: dict):
"""
添加采集节点信息
"""
node = IotNode(**json_data)
endpoint = db.session.scalar(
select(IotEndpoint)
.filter_by(id=node.endpoint_id))
if not endpoint:
raise BizException("采集端信息不存在")
device = db.session.scalar(
select(IotDevice)
.options(noload(IotDevice.workshop))
.filter_by(id=node.device_id))
if not device:
raise BizException("设备信息不存在")
# 判断节点编号是否唯一
if json_data.get("node_number") is not None:
if db.session.scalar(
select(
exists().where(
IotNode.node_number == json_data.get("node_number"), IotNode.id != id
)
)
):
raise BizException("同编号节点已存在")
node.workshop_id = device.workshop_id
node.status = 0
db.session.add(node)
db.session.commit()
return success()
@bp.put("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:update")
@bp.input(NodeUpdateRequest(partial=True), location="json")
def update_node(id: int, json_data: dict):
"""
更新采集节点信息
"""
# 判断节点编号是否唯一
if json_data.get("node_number") is not None:
if db.session.scalar(
select(
exists().where(
IotNode.node_number == json_data.get("node_number"), IotNode.id != id
)
)
):
raise BizException("同编号节点已存在")
node = db.session.scalar(
select(IotNode)
.options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint))
.filter_by(id=id))
if not node:
raise BizException("节点信息不存在")
for key, value in json_data.items():
if value is not None:
setattr(node, key, value)
db.session.commit()
return success()
@bp.delete("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:delete")
def delete_node(id: int):
"""
删除采集节点信息
"""
node = db.session.scalar(
select(IotNode)
.options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint))
.filter_by(id=id))
if not node:
raise BizException("采集节点不存在")
# 删除节点告警规则
delete_node_alert_rule(node)
# 删除
db.session.delete(node)
db.session.commit()
return success()
@bp.get("/monitoring/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:node:list")
def get_node_alert_data(id: int):
"""
获取监控数据
"""
node = db.session.scalar(
select(IotNode)
.options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint))
.filter_by(id=id))
if not node:
raise BizException("采集节点不存在")
measurement = f"ep{node.endpoint_id}_nd{node.id}"
resultData = {}
resultData["nodeNumber"] = node.node_number
resultData["title"] = node.title
resultData["dataType"] = node.data_type
resultData["tagLabel"] = node.tag_label
resultData["mark"] = node.mark
resultData["monitoringData"] = iot_influxdb.query_table(measurement, node.tag_label)
return success(resultData)
def get_page(query_data: NodeQuery):
"""
获取采集节点分页
"""
query = select(IotNode).order_by(IotNode.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotNode.node_number.like(f"%{kw}%")
)
if query_data.endpoint_id:
query = query.filter(IotNode.endpoint_id == query_data.endpoint_id)
if query_data.device_id:
query = query.filter(IotNode.device_id == query_data.device_id)
elif query_data.workshop_id:
query = query.filter(IotNode.workshop_id == query_data.workshop_id)
if query_data.status is not None:
query = query.filter(IotNode.status == query_data.status)
return db.paginate(query, page=query_data.page, per_page=query_data.size)
def get_list(query_data: NodeQuery):
"""
获取采集节点列表
"""
query = select(IotNode).options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint)).order_by(IotNode.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotNode.node_number.like(f"%{kw}%")
)
if query_data.endpoint_id:
query = query.filter(IotNode.endpoint_id == query_data.endpoint_id)
if query_data.device_id:
query = query.filter(IotNode.device_id == query_data.device_id)
elif query_data.workshop_id:
query = query.filter(IotNode.workshop_id == query_data.workshop_id)
if query_data.status is not None:
query = query.filter(IotNode.status == query_data.status)
return db.session.scalars(query).all()

@ -1,163 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class AlertLogQuery(Pagination):
"""
告警日志信息查询请求
"""
alert_tag: str = field(
default=None,
metadata={
"required": False,
"metadata": {"example": "ep1-nd0", "description": "告警标签"},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema
class AlertLogAddRequest(BaseSchema):
"""
告警日志信息添加请求
"""
alert_tag = fields.String(
required=True,
metadata={
"example": "ep1-nd0",
"description": "告警标签",
},
)
alert_target_name = fields.String(
required=True,
metadata={
"example": "ep1-nd0",
"description": "告警对象名称",
},
)
trigger_count = fields.Integer(
required=True,
metadata={
"example": 1,
"description": "触发次数",
},
)
status = fields.Integer(
required=True,
metadata={
"example": 0,
"description": "状态",
},
)
alert_content = fields.String(
required=True,
metadata={
"example": "ep1-nd0",
"description": "告警内容",
},
)
alert_level = fields.Integer(
required=True,
metadata={
"example": 0,
"description": "告警级别",
},
)
class AlertLogUpdateRequest(BaseSchema):
"""
告警日志信息更新请求
"""
alert_tag = fields.String(
required=True,
metadata={
"example": "ep1-nd0",
"description": "告警标签",
},
)
alert_target_name = fields.String(
required=True,
metadata={
"example": "ep1-nd0",
"description": "告警对象名称",
},
)
trigger_count = fields.Integer(
required=True,
metadata={
"example": 1,
"description": "触发次数",
},
)
status = fields.Integer(
required=True,
metadata={
"example": 0,
"description": "状态",
},
)
alert_content = fields.String(
required=True,
metadata={
"example": "ep1-nd0",
"description": "告警内容",
},
)
alert_level = fields.Integer(
required=True,
metadata={
"example": 0,
"description": "告警级别",
},
)
class NodeCollectData(BaseSchema):
"""
节点收集数据
"""
node_id = fields.Integer(
required=True,
metadata={
"example": 1,
"description": "节点ID",
},
)
alert_value = fields.String(
required=True,
metadata={
"example": "-2.1",
"description": "告警值",
},
)
class NodeAlertLogAddRequest(BaseSchema):
"""
节点值异常告警日志信息添加请求
"""
node_data_list = fields.List(
fields.Nested(NodeCollectData),
required=True,
metadata={
"example": [
{
"node_id": 1,
"alert_value": "-2.1",
},
{
"node_id": 2,
"alert_value": "100",
},
],
"description": "节点收集数据列表",
},
)

@ -1,92 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class AlertPushQuery(Pagination):
"""
告警推送信息查询请求
"""
keyword: Optional[str] = field(
default=None,
metadata={
"required": False,
"metadata": {
"description": "关键字 [消息对象名称] 模糊查询"
},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema
class AlertPushAddRequest(BaseSchema):
"""
告警推送信息添加请求
"""
target_name = fields.String(
required=True,
metadata={
"example": "ERP",
"description": "告警对象名称",
},
)
push_url = fields.String(
required=True,
metadata={
"example": "https://www.baidu.com",
"description": "告警推送URL",
},
)
alert_level = fields.List(
fields.Integer(),
required=False,
metadata={"example": 1, "description": "告警级别"},
load_default=1,
)
status = fields.Integer(
required=False,
metadata={
"example": 1,
"description": "状态 0-禁用,1-启用",
},
load_default=1,
)
class AlertPushUpdateRequest(BaseSchema):
"""
告警推送信息更新请求
"""
target_name = fields.String(
required=True,
metadata={
"example": "ERP",
"description": "告警对象名称",
},
)
push_url = fields.String(
required=True,
metadata={
"example": "https://www.baidu.com",
"description": "告警推送URL",
},
)
alert_level = fields.List(
fields.Integer(),
required=False,
metadata={"example": 1, "description": "告警级别"},
load_default=1,
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态 0-禁用,1-启用"},
load_default=1,
)

@ -1,100 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class AlertRuleQuery(Pagination):
"""
告警规则信息查询请求
"""
node_id: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 1, "description": "节点ID"},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema
class AlertRuleAddRequest(BaseSchema):
"""
告警规则信息添加请求
"""
node_id = fields.Integer(
required=True,
metadata={"example": 1, "description": "节点ID"},
)
rule_name = fields.String(
required=True,
metadata={"example": "告警规则1", "description": "告警规则名称"},
)
alert_rule = fields.String(
required=True,
metadata={"example": "x<4", "description": "告警规则表达式"},
)
alert_text = fields.String(
required=False,
metadata={"example": "节点{{node.title}}告警内容", "description": "告警内容文本模板"},
)
alert_level = fields.Integer(
required=False,
metadata={"example": 0, "description": "告警级别 0-预警,1-一般,2-紧急,3-严重"},
load_default=0,
)
trigger_count = fields.Integer(
required=False,
metadata={"example": 1, "description": "阈值触发次数,超过次数后告警"},
load_default=1,
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态 1-启用,0-禁用"},
load_default=1,
)
class AlertRuleUpdateRequest(BaseSchema):
"""
告警规则信息更新请求
"""
node_id = fields.Integer(
required=True,
metadata={"example": 1, "description": "节点ID"},
)
rule_name = fields.String(
required=False,
metadata={"example": "告警规则1", "description": "告警规则名称"},
)
alert_rule = fields.String(
required=False,
metadata={"example": "x<4", "description": "告警规则表达式"},
)
alert_text = fields.String(
required=False,
metadata={"example": "节点{{node.title}}告警内容", "description": "告警内容文本模板"},
)
alert_level = fields.Integer(
required=False,
metadata={"example": 0, "description": "告警级别 0-预警,1-一般,2-紧急,3-严重"},
)
trigger_count = fields.Integer(
required=False,
metadata={"example": 1, "description": "阈值触发次数,超过次数后告警"},
load_default=1,
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态 1-启用,0-禁用"},
load_default=1,
)

@ -1,117 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class DeviceQuery(Pagination):
"""
设备信息查询请求
"""
keyword: Optional[str] = field(
default=None,
metadata={
"required": False,
"metadata": {
"description": "关键字 [设备名称|设备编号] 模糊查询"
},
},
)
workshop_id: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 1, "description": "车间ID"},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema # For the type check
class DeviceAddRequest(BaseSchema):
"""
新增设备信息
"""
workshop_id = fields.Integer(
required=False,
metadata={"example": 1, "description": "车间ID"},
load_default=None,
)
device_name = fields.String(
required=False,
metadata={"example": "设备名称", "descriptrion": "设备名称"},
load_default=None,
)
device_number = fields.String(
required=False,
metadata={"example": "设备编号", "description": "设备编号"},
load_default=None,
)
description = fields.String(
required=False,
metadata={"example": "设备描述", "description": "设备描述"},
load_default="",
)
brand_name = fields.String(
required=False,
metadata={"example": "品牌名称", "description": "品牌名称"},
load_default="",
)
specification_model = fields.String(
required=False,
metadata={"example": "规格型号", "description": "规格型号"},
load_default="",
)
class DeviceUpdateRequest(BaseSchema):
"""
更新设备信息
"""
workshop_id = fields.Integer(
required=False,
metadata={"example": 1, "description": "车间ID"},
load_default=None,
)
device_name = fields.String(
required=False,
metadata={"example": "设备名称", "descriptrion": "设备名称"},
load_default=None,
)
device_number = fields.String(
required=False,
metadata={"example": "设备编号", "description": "设备编号"},
load_default=None,
)
description = fields.String(
required=False,
metadata={"example": "设备描述", "description": "设备描述"},
load_default="",
)
brand_name = fields.String(
required=False,
metadata={"example": "品牌名称", "description": "品牌名称"},
load_default="",
)
specification_model = fields.String(
required=False,
metadata={"example": "规格型号", "description": "规格型号"},
load_default="",
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态(0-已停机,1-生产中,2-维修中)"},
load_default=None,
)

@ -1,120 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class EndpointQuery(Pagination):
"""
采集端信息查询请求
"""
keyword: Optional[str] = field(
default=None,
metadata={
"required": False,
"metadata": {
"description": "关键字 [采集端名称|采集端编号|采集端IP] 模糊查询"
},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema # For the type check
class EndpointAddRequest(BaseSchema):
"""
采集端新增信息
"""
endpoint_name = fields.String(
required=True,
metadata={"example": "采集端名称", "description": "采集端名称"},
)
endpoint_number = fields.String(
required=True,
metadata={"example": "采集端编号", "description": "采集端编号"},
)
description = fields.String(
required=False,
metadata={"example": "采集端描述", "description": "采集端描述"},
load_default="",
)
ip = fields.String(
required=True,
metadata={"example": "采集端IP", "description": "采集端IP"},
)
port = fields.String(
required=True,
metadata={"example": "采集端端口", "description": "采集端端口"},
)
opc_url_temp = fields.String(
required=True,
metadata={"example": "{_ip_}:{_port_}", "description": "OPC URL模板"},
)
brand_name = fields.String(
required=False,
metadata={"example": "品牌名称", "description": "品牌名称"},
load_default="",
)
specification_model = fields.String(
required=False,
metadata={"example": "规格型号", "description": "规格型号"},
load_default="",
)
class EndpointUpdateRequest(BaseSchema):
"""
更新采集端信息
"""
endpoint_name = fields.String(
required=True,
metadata={"example": "采集端名称", "descriptrion": "采集端名称"},
)
endpoint_number = fields.String(
required=True,
metadata={"example": "采集端编号", "description": "采集端编号"},
)
description = fields.String(
required=False,
metadata={"example": "采集端描述", "description": "采集端描述"},
load_default="",
)
ip = fields.String(
required=True,
metadata={"example": "采集端IP", "description": "采集端IP"},
)
port = fields.String(
required=True,
metadata={"example": "采集端端口", "description": "采集端端口"},
)
opc_url_temp = fields.String(
required=True,
metadata={"example": "{_ip_}:{_port_}", "description": "OPC URL模板"},
)
brand_name = fields.String(
required=False,
metadata={"example": "品牌名称", "description": "品牌名称"},
load_default="",
)
specification_model = fields.String(
required=False,
metadata={"example": "规格型号", "description": "规格型号"},
load_default="",
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态(0-已停用,1-生产中,2-维修中)"},
load_default=None,
)

@ -1,135 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class NodeQuery(Pagination):
"""
节点信息查询请求
"""
keyword: Optional[str] = field(
default=None,
metadata={
"required": False,
"metadata": {"example": "tt_01", "description": "查询关键字[节点编号]"},
},
)
workshop_id: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 1, "description": "车间ID"},
},
)
device_id: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 1, "description": "设备ID"},
},
)
endpoint_id: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 1, "description": "采集端ID"},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema # For the type check
class NodeAddRequest(BaseSchema):
"""
节点新增信息
"""
device_id = fields.Integer(
required=True,
metadata={"example": 1, "description": "设备ID"},
)
endpoint_id = fields.Integer(
required=True,
metadata={"example": 1, "description": "采集端ID"},
)
node_number = fields.String(
required=True,
metadata={"example": "tt_01", "description": "节点编号"},
)
title = fields.String(
required=True,
metadata={"example": "节点ID", "description": "节点ID"},
)
mark = fields.String(
required=False,
metadata={"example": "采集标识", "description": "采集标识"},
load_default="",
)
mark_type = fields.Integer(
required=False,
metadata={"example": "读写标识", "description": "采集类型 1:只读 2:只写 3:读写"},
load_default=1,
)
tag_label = fields.String(
required=True,
metadata={"example": "变量别名", "description": "变量别名"},
)
data_type = fields.String(
required=True,
metadata={"example": "text", "description": "值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型"},
)
class NodeUpdateRequest(BaseSchema):
"""
节点更新信息
"""
device_id = fields.Integer(
required=True,
metadata={"example": 1, "description": "设备ID"},
)
endpoint_id = fields.Integer(
required=True,
metadata={"example": 1, "description": "采集端ID"},
)
node_number = fields.String(
required=True,
metadata={"example": "tt_01", "description": "节点编号"},
)
title = fields.String(
required=True,
metadata={"example": "节点ID", "description": "节点ID"},
)
mark = fields.String(
required=False,
metadata={"example": "采集标识", "description": "采集标识"},
load_default="",
)
mark_type = fields.Integer(
required=False,
metadata={"example": "读写标识", "description": "采集类型 1:只读 2:只写 3:读写"},
load_default=1,
)
tag_label = fields.String(
required=True,
metadata={"example": "变量别名", "description": "变量别名"},
)
data_type = fields.String(
required=True,
metadata={"example": "text", "description": "值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型"},
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态(0-禁用,1-启用)"},
load_default=None,
)

@ -1,111 +0,0 @@
from dataclasses import field
from marshmallow_dataclass import dataclass
from marshmallow import validates_schema, ValidationError
from iti.applications.common.utils.schema import BaseSchema, Pagination
from typing import ClassVar, Optional
from apiflask import fields
@dataclass(base_schema=BaseSchema)
class WorkshopQuery(Pagination):
"""
车间信息查询请求
"""
keyword: Optional[str] = field(
default=None,
metadata={
"required": False,
"metadata": {
"description": "关键字 [车间名称|车间编号|负责人姓名] 模糊查询"
},
},
)
status: int = field(
default=None,
metadata={
"required": False,
"metadata": {"example": 0, "description": "状态"},
},
)
Schema: ClassVar[BaseSchema] = BaseSchema # For the type check
class WorkshopAddRequest(BaseSchema):
"""
新增车间信息
"""
workshop_name = fields.String(
required=False,
metadata={"example": "车间名称", "description": "车间名称"},
load_default=None,
)
workshop_number = fields.String(
required=False,
metadata={"example": "车间编号", "description": "车间编号"},
load_default=None,
)
total_area = fields.String(
required=False,
metadata={"example": "1234", "description": "总面积(单位:平方米)"},
load_default="",
)
director_name = fields.String(
required=False,
metadata={"example": "负责人姓名", "description": "负责人姓名"},
load_default=None,
)
director_phone = fields.String(
required=False,
metadata={"example": "负责人电话", "description": "负责人电话"},
load_default=None,
)
remark = fields.String(
required=False,
metadata={"example": "备注", "description": "备注"},
load_default="",
)
class WorkshopUpdateRequest(BaseSchema):
"""
更新车间信息
"""
workshop_name = fields.String(
required=False,
metadata={"example": "车间名称", "description": "车间名称"},
load_default=None,
)
workshop_number = fields.String(
required=False,
metadata={"example": "车间编号", "description": "车间编号"},
load_default=None,
)
total_area = fields.String(
required=False,
metadata={"example": "1234", "description": "总面积(单位:平方米)"},
load_default="",
)
director_name = fields.String(
required=False,
metadata={"example": "负责人姓名", "description": "负责人姓名"},
load_default=None,
)
director_phone = fields.String(
required=False,
metadata={"example": "负责人电话", "description": "负责人电话"},
load_default=None,
)
status = fields.Integer(
required=False,
metadata={"example": 1, "description": "状态(0-已停用,1-生产中)"},
load_default=None,
)
remark = fields.String(
required=False,
metadata={"example": "备注", "description": "备注"},
load_default="",
)

@ -1,164 +0,0 @@
from apiflask import APIBlueprint
from iti.applications.extensions import db, sys_log
from iti.applications.common.utils import success, page_schema, page
from iti.applications.models import (
IotWorkshop,
IotWorkshopSchema,
)
from .schemas.workshop import (
WorkshopQuery,
WorkshopAddRequest,
WorkshopUpdateRequest,
)
from iti.applications.common import ModelFilter
from iti.applications.common.exceptions.biz_exp import BizException
from flask_jwt_extended import jwt_required
from sqlalchemy import select, delete, exists
from sqlalchemy.sql.functions import func
from iti.applications.common import permission
bp = APIBlueprint("iot_workshop", __name__, url_prefix="/workshop", tag="车间管理")
@bp.get("/list")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:workshop:list")
@bp.input(WorkshopQuery.Schema(partial=True), location="query")
@bp.output(IotWorkshopSchema(many=True))
def list_workshop(query_data: WorkshopQuery):
"""
获取车间列表
"""
return success(get_list_or_page(query_data))
@bp.get("/page")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:workshop:list")
@bp.input(WorkshopQuery.Schema(partial=True), location="query")
@bp.output(page_schema(IotWorkshopSchema(many=True)))
def page_workshop(query_data: WorkshopQuery):
"""
分页获取车间列表
"""
return page(get_list_or_page(query_data))
@bp.post("/add")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:workshop:add")
@bp.input(WorkshopAddRequest, location="json")
def add_workshop(json_data: dict):
"""
添加车间信息
"""
# 判断车间编号是否唯一
if json_data.get("workshop_number") is not None:
if db.session.scalar(
select(
exists().where(
IotWorkshop.workshop_number == json_data.get("workshop_number")
)
)
):
raise BizException("同编号车间已存在")
workshop = IotWorkshop(**json_data)
workshop.status = 0
db.session.add(workshop)
db.session.commit()
return success()
@bp.put("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:workshop:update")
@bp.input(WorkshopUpdateRequest(partial=True), location="json")
def update_workshop(id: int, json_data: dict):
"""
更新车间信息
"""
# 判断车间编号是否唯一
if json_data.get("workshop_number") is not None:
if db.session.scalar(
select(
exists().where(
IotWorkshop.workshop_number == json_data.get("workshop_number"), IotWorkshop.id != id
)
)
):
raise BizException("同编号车间已存在")
workshop = db.session.scalar(select(IotWorkshop).filter_by(id=id))
if not workshop:
raise BizException("车间信息不存在")
for key, value in json_data.items():
if value is not None:
setattr(workshop, key, value)
db.session.commit()
return success()
@bp.delete("/<int:id>")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:workshop:delete")
def delete_workshop(id: int):
"""
删除车间信息
"""
workshop = db.session.scalar(select(IotWorkshop).filter_by(id=id))
if not workshop:
raise BizException("车间信息不存在")
# 删除
db.session.delete(workshop)
db.session.commit()
return success()
@bp.get("/count")
@jwt_required()
@bp.doc(security="JWT")
@permission("iot:workshop:list")
def count_workshop():
"""
统计车间数量
"""
countData = {}
workshopReady = db.session.query(func.count(IotWorkshop.id).label('number')).filter_by(status=1).first().number
workshopUnready = db.session.query(func.count(IotWorkshop.id).label('number')).filter_by(status=0).first().number
countData["ready"] = workshopReady
countData["unReady"] = workshopUnready
countData["total"] = workshopReady + workshopUnready
return success(countData)
def get_list_or_page(query_data: WorkshopQuery):
"""
获取车间信息列表或分页
"""
query = select(IotWorkshop).order_by(IotWorkshop.created_at.desc())
if query_data.keyword:
kw = ModelFilter.escape_like(query_data.keyword)
query = query.filter(
IotWorkshop.workshop_name.like(f"%{kw}%")
| IotWorkshop.workshop_number.like(f"%{kw}%")
| IotWorkshop.director_name.like(f"%{kw}%")
)
if query_data.status is not None:
query = query.filter(IotWorkshop.status == query_data.status)
if query_data.page and query_data.size:
return db.paginate(query, page=query_data.page, per_page=query_data.size)
else:
return db.session.scalars(query).all()

@ -7,7 +7,4 @@ def init_services(app) -> None:
# 初始化文件目录
from iti.applications.service.sys.sys_file_directory import init_app as init_file_directory
init_file_directory(app)
from iti.applications.service.iot import init_iot as init_iot_app
init_iot_app(app)
init_file_directory(app)

@ -1,18 +0,0 @@
from .influxdb_mgr import iot_influxdb
from .rocketmq_mgr import iot_rocketmq
def init_iot(app) -> None:
import logging
logger = logging.getLogger(__name__)
try:
iot_influxdb.init_app(app)
except Exception as e:
logger.error(f"初始化influxdb_mgr失败: {e}", exc_info=True)
try:
iot_rocketmq.init_app(app)
iot_rocketmq.start_consumer("iot-collect-topic", "*")
except Exception as e:
logger.error(f"初始化rocketmq_mgr失败: {e}", exc_info=True)

@ -1,169 +0,0 @@
from sqlalchemy.orm import noload
from sqlalchemy.sql._typing import ColumnExpressionArgument
from typing import List, Dict, Any, Optional, Set
from iti.applications.extensions import db
from flask import current_app
from iti.applications.models import (
IotAlertLog,
IotAlertRule,
IotAlertPush,
IotNode,
IotEndpoint,
)
from sqlalchemy import select, distinct, desc
from simpleeval import simple_eval
def delete_node_alert_rule(node: IotNode):
"""
删除节点告警规则
Args:
node: 节点对象
"""
db.session.query(IotAlertRule).filter_by(node_id=node.id).delete()
db.session.commit()
def add_endpoint_alert_log(endpoint_id: int):
"""
添加采集端告警日志
Args:
endpoint_id: 采集端ID
"""
endpoint = db.session.scalar(select(IotEndpoint).filter_by(id=endpoint_id))
if not endpoint:
return "采集端不存在"
alert_tag = f"ep{endpoint.id}_nd0"
alert_log = db.session.scalar(select(IotAlertLog).filter_by(alert_tag=alert_tag))
if alert_log:
alert_log.trigger_count += 1
if alert_log.trigger_count >= 3:
alert_log.status = 1
push_alert(alert_log)
else:
dict_data = dict(
alert_tag=alert_tag,
alert_target_name=f"{endpoint.endpoint_name}({endpoint.endpoint_number})",
alert_level=1,
status=0,
trigger_count=1,
alert_content=f"采集端 {endpoint.endpoint_name}({endpoint.endpoint_number}) 网络不可达",
)
alert_log = IotAlertLog(**dict_data)
db.session.add(alert_log)
db.session.commit()
return ""
def add_node_alert_log(node_id: int, alert_value: str):
"""
添加节点值异常告警日志
Args:
node_id: 节点ID
alert_value: 告警值
"""
node = db.session.scalar(select(IotNode).filter_by(id=node_id))
if not node:
return "节点不存在"
alert_rule_list = db.session.scalars(select(IotAlertRule).filter_by(node_id=node_id).order_by(desc(IotAlertRule.alert_level))).all()
if len(alert_rule_list) > 0:
for alert_rule in alert_rule_list:
if alert_rule.status == 1:
# 根据data_type转换alert_value
value_data = get_value(node.data_type, alert_value)
# 检查告警值是否满足触发规则
if is_alert_trigger(alert_rule.alert_rule, value_data):
alert_tag = f"ep{node.endpoint_id}_nd{node_id}"
alert_log = db.session.scalar(select(IotAlertLog).filter_by(alert_tag=alert_tag))
if alert_log:
alert_log.trigger_count += 1
alert_log.alert_content = complete_alert_text(node, alert_rule, alert_value)
if alert_log.trigger_count >= alert_rule.trigger_count:
alert_log.status = 1
push_alert(alert_log)
else:
dict_data = dict(
alert_tag=alert_tag,
alert_target_name=f"采集节点({node.node_number})",
alert_level=alert_rule.alert_level,
status=0,
trigger_count=1,
alert_content=complete_alert_text(node, alert_rule, alert_value),
)
alert_log = IotAlertLog(**dict_data)
db.session.add(alert_log)
db.session.commit()
break
return ""
def get_value(data_type: str, alert_value: str):
"""
获取告警值
Args:
data_type: 数据类型
alert_value: 告警值
"""
if data_type == "int":
return int(alert_value)
elif data_type == "float":
return float(alert_value)
elif data_type == "double":
return float(alert_value)
else:
return alert_value
def is_alert_trigger(alert_rule_text: str, value_data: Any):
"""
检查告警值是否满足触发规则
Args:
alert_rule_text: 告警规则文本
value_data: 告警值
"""
# 解析告警规则文本,判断是否满足触发规则
try:
result = simple_eval_expression(alert_rule_text, {'x': value_data})
return result
except (NameError, SyntaxError, TypeError) as e:
current_app.logger.error(f"评估表达式 {alert_rule_text} 时出错: {e}")
return False
def push_alert(alert_log: IotAlertLog):
"""
推送告警消息
Args:
alert_log: 告警日志
"""
level = f"{alert_log.alert_level}"
alert_push_list = db.session.scalars(select(IotAlertPush).filter(IotAlertPush.alert_level.contains(level)).filter_by(status=1)).all()
if alert_push_list:
for alert_push in alert_push_list:
# TODO 调用地址发送告警消息
current_app.logger.info(f"推送告警消息到 {alert_push.push_url},内容:{alert_log.alert_content}")
def complete_alert_text(node: IotNode, alert_rule: IotAlertRule, alert_value: str):
"""
完善告警内容文本
Args:
node: 节点
alert_rule: 告警规则
alert_value: 告警值
"""
alert_text_temp = alert_rule.alert_text or ""
alert_text_temp = alert_text_temp.replace("{_workshopName_}", node.workshop.workshop_name+"("+node.workshop.workshop_number+")")
alert_text_temp = alert_text_temp.replace("{_deviceName_}", node.device.device_name+"("+node.device.device_number+")")
alert_text_temp = alert_text_temp.replace("{_endpointName_}", node.endpoint.endpoint_name+"("+node.endpoint.endpoint_number+")")
alert_text_temp = alert_text_temp.replace("{_mark_}", node.mark)
alert_text_temp = alert_text_temp.replace("{_nodeNumber_}", node.node_number)
alert_text_temp = alert_text_temp.replace("{_alertValue_}", alert_value)
return alert_text_temp
def simple_eval_expression(expr, variables):
"""
简单地评估包含变量的字符串表达式
:param expr: 字符串表达式 "x > 5 and y < 10"
:param variables: 变量字典 {'x': 7, 'y': 3}
:return: 表达式结果
"""
return simple_eval(expr, names=variables)

@ -1,270 +0,0 @@
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import logging, os, time
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class InfluxDBMgr:
"""
influxdb链接管理
"""
_instance = None
_initialized = False
def __new__(cls):
"""
单例模式
"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def init_app(self, app) -> None:
"""
初始化
"""
if self._initialized:
logger.warning("influxdb连接已初始化跳过重复初始化")
return
logger.info("influxdb初始化...........")
# 增强配置验证
self.url = os.getenv("INFLUXDB_URL", "")
self.token = os.getenv("INFLUXDB_TOKEN", "")
self.org = os.getenv("INFLUXDB_ORG", "")
self.bucket = os.getenv("INFLUXDB_BUCKET", "")
# 更严格的配置检查
missing_configs = []
if not self.url:
missing_configs.append("INFLUXDB_URL")
if not self.token:
missing_configs.append("INFLUXDB_TOKEN")
if not self.org:
missing_configs.append("INFLUXDB_ORG")
if not self.bucket:
missing_configs.append("INFLUXDB_BUCKET")
if missing_configs:
logger.error(f"influxdb配置不完整缺少: {', '.join(missing_configs)}")
self._initialized = False
return
# 配置重连参数
self.max_retries = int(os.getenv("INFLUXDB_MAX_RETRIES", "3"))
self.retry_delay = float(os.getenv("INFLUXDB_RETRY_DELAY", "1"))
self.max_retry_delay = float(os.getenv("INFLUXDB_MAX_RETRY_DELAY", "30"))
# 初始化API实例缓存
self._query_api = None
self._write_api = None
# 建立连接
if not self._connect():
logger.error("influxdb初始化失败")
self._initialized = False
return
logger.info("influxdb初始化成功")
def _connect(self) -> bool:
"""
建立连接
"""
try:
self.client = InfluxDBClient(
url=self.url,
token=self.token,
org=self.org,
# timeout=10 # 添加连接超时设置
)
self._initialized = True
logger.info("InfluxDB连接成功")
return True
except Exception as e:
logger.error(f"InfluxDB连接失败: {e}")
self._initialized = False
return False
def _check_connection(self) -> bool:
"""
检查连接状态
"""
if not self._initialized or not hasattr(self, 'client') or not self.client:
return False
try:
# 尝试执行一个简单的查询来检查连接状态
if not hasattr(self, '_query_api') or not self._query_api:
self._query_api = self.client.query_api()
test_query = f'from(bucket: "{self.bucket}") |> range(start: -5s) |> limit(n: 1)'
self._query_api.query(test_query)
return True
except Exception as e:
logger.warning(f"InfluxDB连接检查失败: {e}")
return False
def _reconnect(self) -> bool:
"""
重新连接
"""
logger.info("尝试重新连接InfluxDB...")
for attempt in range(self.max_retries):
try:
# 使用更平滑的指数退避算法
delay = min(self.retry_delay * (1.5 ** attempt), self.max_retry_delay)
logger.info(f"{attempt + 1}次重连尝试,等待{delay:.2f}秒...")
time.sleep(delay)
self.close_client()
if self._connect() and self._check_connection():
logger.info("InfluxDB重连成功")
return True
except Exception as e:
logger.error(f"{attempt + 1}次重连失败: {e}")
logger.error(f"经过{self.max_retries}次尝试后InfluxDB重连失败")
return False
def _ensure_connection(self) -> bool:
"""
确保连接有效
"""
if not self._check_connection():
return self._reconnect()
return True
def query_table(self, query_measurement: str, query_field: str, rangeStr: str = "-5m")-> List[Dict[str, Any]]:
"""
查询数据表带自动重连功能
"""
if not self._ensure_connection():
logger.error("无法建立InfluxDB连接查询失败")
return []
try:
if not hasattr(self, '_query_api') or not self._query_api:
self._query_api = self.client.query_api()
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: {rangeStr})
|> filter(fn: (r) => r._measurement == "{query_measurement}")
|> filter(fn: (r) => r._field == "{query_field}")
|> yield(name: "mean")
'''
query_tables = self._query_api.query(flux_query)
result_records = []
for _table in query_tables:
for record in _table.records:
dict = {}
# record 包含时间、measurement、field、value 和 tags
dict["time"] = record.get_time()
dict["measurement"] = record.get_measurement()
dict["field"] = record.get_field()
dict["value"] = record.get_value()
# dict["tags"] = record.values # 包含所有 tags 和系统字段
# logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}")
result_records.append(dict)
return result_records
except Exception as e:
logger.error(f"查询InfluxDB失败: {e}")
# 查询失败时也尝试重连
if not self._reconnect():
logger.error("查询失败且重连失败,返回空结果")
return []
def write_data(self, measurement: str, fields: Dict[str, Any], tags: Optional[Dict[str, str]] = None, timestamp: Optional[int] = None) -> bool:
"""
写入数据到InfluxDB带自动重连功能
"""
if not self._ensure_connection():
logger.error("无法建立InfluxDB连接写入失败")
return False
try:
if not hasattr(self, '_write_api') or not self._write_api:
self._write_api = self.client.write_api(write_options=SYNCHRONOUS)
point = Point(measurement)
# 添加标签
if tags:
for tag_key, tag_value in tags.items():
point = point.tag(tag_key, tag_value)
# 添加字段
for field_key, field_value in fields.items():
point = point.field(field_key, field_value)
# 添加时间戳
if timestamp:
point = point.time(timestamp)
self._write_api.write(bucket=self.bucket, record=point)
logger.debug(f"数据成功写入InfluxDB: {measurement}")
return True
except Exception as e:
logger.error(f"写入InfluxDB失败: {e}")
# 写入失败时也尝试重连
if not self._reconnect():
logger.error("写入失败且重连失败")
return False
def write_batch_data(self, points: List[Point]) -> bool:
"""
批量写入数据到InfluxDB带自动重连功能
"""
if not self._ensure_connection():
logger.error("无法建立InfluxDB连接批量写入失败")
return False
try:
if not hasattr(self, '_write_api') or not self._write_api:
self._write_api = self.client.write_api(write_options=SYNCHRONOUS)
self._write_api.write(bucket=self.bucket, record=points)
logger.debug(f"批量数据成功写入InfluxDB: {len(points)}条记录")
return True
except Exception as e:
logger.error(f"批量写入InfluxDB失败: {e}")
# 写入失败时也尝试重连
if not self._reconnect():
logger.error("批量写入失败且重连失败")
return False
def close_client(self) -> None:
"""
关闭客户端连接释放资源
"""
# 关闭API实例
if hasattr(self, '_query_api'):
delattr(self, '_query_api')
if hasattr(self, '_write_api'):
delattr(self, '_write_api')
# 关闭客户端连接
if hasattr(self, 'client') and self.client:
self.client.close()
logger.info("InfluxDB 客户端已关闭")
def __del__(self):
"""
析构时自动关闭连接有用?
"""
self.close_client()
iot_influxdb = InfluxDBMgr()

@ -1,276 +0,0 @@
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass
from typing import Any, Callable, Optional, Union
from iti.applications.service.iot.alert import add_endpoint_alert_log, add_node_alert_log
try:
from rocketmq.client import ConsumeStatus, Message, Producer, PushConsumer
except Exception:
ConsumeStatus = None
Message = None
Producer = None
PushConsumer = None
logger = logging.getLogger(__name__)
RocketMQBody = Union[str, bytes, bytearray, dict, list]
ConsumeCallback = Callable[[Any], Any]
@dataclass(frozen=True)
class RocketMQConfig:
namesrv_addr: str
producer_group: str
consumer_group: str
default_topic: str
default_tags: str
charset: str
access_key: str
secret_key: str
security_token: str
instance_name: str
@staticmethod
def from_env() -> "RocketMQConfig":
return RocketMQConfig(
namesrv_addr=os.getenv("ROCKETMQ_NAMESRV_ADDR", "127.0.0.1:9876").strip(),
producer_group=os.getenv("ROCKETMQ_PRODUCER_GROUP", "iot-collect-group").strip(),
consumer_group=os.getenv("ROCKETMQ_CONSUMER_GROUP", "iot-collect-group").strip(),
default_topic=os.getenv("ROCKETMQ_TOPIC", "iot-collect-topic").strip(),
default_tags=os.getenv("ROCKETMQ_TAGS", "*").strip() or "*",
charset=os.getenv("ROCKETMQ_CHARSET", "utf-8").strip() or "utf-8",
access_key=os.getenv("ROCKETMQ_ACCESS_KEY", "").strip(),
secret_key=os.getenv("ROCKETMQ_SECRET_KEY", "").strip(),
security_token=os.getenv("ROCKETMQ_SECURITY_TOKEN", "").strip(),
instance_name=os.getenv("ROCKETMQ_INSTANCE_NAME", "").strip(),
)
class RocketMQMgr:
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def init_app(self, app) -> None:
if self._initialized:
logger.warning("rocketmq连接已初始化跳过重复初始化")
return
if Producer is None or PushConsumer is None or Message is None:
logger.error(
"rocketmq客户端库不可用请安装 rocketmq-client-python并确保本机可加载 librocketmq"
)
self._initialized = False
return
cfg = RocketMQConfig.from_env()
if not cfg.namesrv_addr:
logger.error("rocketmq配置不完整缺少: ROCKETMQ_NAMESRV_ADDR")
self._initialized = False
return
self.cfg = cfg
self._app = app
self.producer: Optional[Any] = None
self.consumer: Optional[Any] = None
self._producer_started = False
self._consumer_started = False
self._initialized = True
logger.info("rocketmq初始化成功")
def _ensure_initialized(self) -> bool:
if not self._initialized:
logger.error("rocketmq未初始化请先调用 iot_rocketmq.init_app(app)")
return False
return True
def _apply_credentials_if_supported(self, client: Any) -> None:
if not self.cfg.access_key or not self.cfg.secret_key:
return
if hasattr(client, "set_session_credentials"):
try:
client.set_session_credentials(
self.cfg.access_key, self.cfg.secret_key, self.cfg.security_token
)
except Exception as e:
logger.warning(f"rocketmq设置ACL凭证失败: {e}")
def start_producer(self) -> bool:
if not self._ensure_initialized():
return False
if self._producer_started and self.producer:
return True
try:
producer = Producer(self.cfg.producer_group)
producer.set_name_server_address(self.cfg.namesrv_addr)
if self.cfg.instance_name and hasattr(producer, "set_instance_name"):
producer.set_instance_name(self.cfg.instance_name)
self._apply_credentials_if_supported(producer)
producer.start()
self.producer = producer
self._producer_started = True
logger.info("rocketmq producer启动成功")
return True
except Exception as e:
logger.error(f"rocketmq producer启动失败: {e}", exc_info=True)
self.producer = None
self._producer_started = False
return False
def shutdown_producer(self) -> None:
if not self.producer:
return
try:
self.producer.shutdown()
except Exception as e:
logger.warning(f"rocketmq producer关闭失败: {e}")
finally:
self.producer = None
self._producer_started = False
def _encode_body(self, body: RocketMQBody) -> bytes:
if isinstance(body, bytes):
return body
if isinstance(body, bytearray):
return bytes(body)
if isinstance(body, str):
return body.encode(self.cfg.charset, errors="replace")
return json.dumps(body, ensure_ascii=False).encode(self.cfg.charset, errors="replace")
def send_sync(
self,
body: RocketMQBody,
topic: Optional[str] = None,
tags: Optional[str] = None,
keys: Optional[str] = None,
) -> Any:
if not self._ensure_initialized():
raise RuntimeError("rocketmq未初始化")
if not self.start_producer() or not self.producer:
raise RuntimeError("rocketmq producer未启动")
real_topic = (topic or self.cfg.default_topic).strip()
if not real_topic:
logger.error("topic不能为空可通过参数传入或设置ROCKETMQ_TOPIC")
raise ValueError("topic不能为空可通过参数传入或设置ROCKETMQ_TOPIC")
msg = Message(real_topic)
if tags:
if hasattr(msg, "set_tags"):
msg.set_tags(tags)
if keys:
if hasattr(msg, "set_keys"):
msg.set_keys(keys)
msg.set_body(self._encode_body(body))
return self.producer.send_sync(msg)
def start_consumer(
self,
topic: Optional[str] = None,
tags: Optional[str] = None,
callback: Optional[ConsumeCallback] = None,
) -> bool:
if not self._ensure_initialized():
return False
if self._consumer_started and self.consumer:
return True
real_topic = (topic or self.cfg.default_topic).strip()
if not real_topic:
logger.error("consumer topic不能为空可通过参数传入或设置ROCKETMQ_TOPIC")
return False
selector_expression = (tags or self.cfg.default_tags).strip() or "*"
consume_cb = callback or self._default_consume_callback
try:
consumer = PushConsumer(self.cfg.consumer_group)
consumer.set_name_server_address(self.cfg.namesrv_addr)
if self.cfg.instance_name and hasattr(consumer, "set_instance_name"):
consumer.set_instance_name(self.cfg.instance_name)
self._apply_credentials_if_supported(consumer)
try:
consumer.subscribe(real_topic, consume_cb, selector_expression)
except TypeError:
consumer.subscribe(real_topic, selector_expression, consume_cb)
consumer.start()
self.consumer = consumer
self._consumer_started = True
logger.info("rocketmq consumer启动成功")
return True
except Exception as e:
logger.error(f"rocketmq consumer启动失败: {e}", exc_info=True)
self.consumer = None
self._consumer_started = False
return False
def shutdown_consumer(self) -> None:
if not self.consumer:
return
try:
self.consumer.shutdown()
except Exception as e:
logger.warning(f"rocketmq consumer关闭失败: {e}")
finally:
self.consumer = None
self._consumer_started = False
def _default_consume_callback(self, msg: Any) -> Any:
topic = getattr(msg, "topic", "")
tags = getattr(msg, "tags", "")
keys = getattr(msg, "keys", "")
body_text = getattr(msg, "body", b"").decode(self.cfg.charset, errors="replace")
# logger.info(
# f"[RocketMQ] topic={topic} tags={tags} keys={keys} body={body_text}"
# )
try:
body_dict = json.loads(body_text)
except Exception:
logger.error(f"[RocketMQ] 解析body失败: {body_text}")
return False
app = getattr(self, "_app", None)
if app is None:
logger.error("[RocketMQ] 未绑定Flask app无法创建application context")
return False
with app.app_context():
if tags == b"netData" or tags == "netData":
add_endpoint_alert_log(body_dict["endpointId"])
elif tags == b"collectData" or tags == "collectData":
for node_data in body_dict["nodeDataList"]:
node_id = int(node_data["nodeId"])
alert_value = node_data["alertValue"]
add_node_alert_log(node_id, alert_value)
if ConsumeStatus is not None and hasattr(ConsumeStatus, "CONSUME_SUCCESS"):
return ConsumeStatus.CONSUME_SUCCESS
return True
def shutdown(self) -> None:
self.shutdown_consumer()
self.shutdown_producer()
def __del__(self):
try:
self.shutdown()
except Exception:
pass
iot_rocketmq = RocketMQMgr()
Loading…
Cancel
Save