diff --git a/hatch.toml b/hatch.toml index 849b290..bf8fe1b 100644 --- a/hatch.toml +++ b/hatch.toml @@ -2,6 +2,7 @@ [envs.default] type = "virtual" dependencies = [ + "rocketmq-client-python", "simpleeval", "influxdb-client", "flask>=3.1.0", diff --git a/iti/.env.dev b/iti/.env.dev index 8101f44..3cbe203 100644 --- a/iti/.env.dev +++ b/iti/.env.dev @@ -23,4 +23,13 @@ INFLUXDB_ORG="noface" INFLUXDB_BUCKET="yh-iot" INFLUXDB_MAX_RETRIES=3 INFLUXDB_RETRY_DELAY=1.0 -INFLUXDB_MAX_RETRY_DELAY=30.0 \ No newline at end of file +INFLUXDB_MAX_RETRY_DELAY=30.0 + +# ============================================ +# rocketmq 配置 +# ============================================ +ROCKETMQ_NAMESRV_ADDR="124.223.195.237:9876" +ROCKETMQ_PRODUCER_GROUP="iot-collect-group" +ROCKETMQ_CONSUMER_GROUP="iot-collect-group" +ROCKETMQ_TOPIC="iot-collect-topic" +ROCKETMQ_TAGS="*" diff --git a/iti/applications/models/iot/iot_device.py b/iti/applications/models/iot/iot_device.py index 51cbd39..6ed1fc6 100644 --- a/iti/applications/models/iot/iot_device.py +++ b/iti/applications/models/iot/iot_device.py @@ -18,7 +18,7 @@ class IotDevice(db.Model, TimeModelMixin): workshop_id = db.Column(db.Integer, nullable=False, default=0, unique=True, comment="车间ID") device_name = db.Column(db.String(255), nullable=False, unique=True, comment="设备名称") device_number = db.Column(db.String(20), nullable=False, comment="设备编号") - description = db.Column(db.Text, nullable=True, comment="设备描述") + description = db.Column(db.Text, nullable=False, comment="设备描述") brand_name = db.Column(db.String(255), nullable=False, comment="品牌名称") specification_model = db.Column(db.String(255), nullable=False, comment="规格型号") status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:已停机 1:生产中 2:维修中") diff --git a/iti/applications/models/iot/iot_endpoint.py b/iti/applications/models/iot/iot_endpoint.py index 518d81b..7f5e40b 100644 --- a/iti/applications/models/iot/iot_endpoint.py +++ b/iti/applications/models/iot/iot_endpoint.py @@ -18,7 +18,7 @@ class IotEndpoint(db.Model, TimeModelMixin): ) endpoint_name = db.Column(db.String(255), nullable=False, unique=True, comment="采集端名称") endpoint_number = db.Column(db.String(20), nullable=False, comment="采集端编号") - description = db.Column(db.Text, nullable=True, comment="采集端描述") + description = db.Column(db.Text, nullable=False, comment="采集端描述") ip = db.Column(db.String(255), nullable=False, comment="采集端IP") port = db.Column(db.String(255), nullable=False, comment="采集端端口") opc_url_temp = db.Column(db.String(1024), nullable=False, comment="OPC URL模板") diff --git a/iti/applications/models/iot/iot_node.py b/iti/applications/models/iot/iot_node.py index 1351023..49a14e5 100644 --- a/iti/applications/models/iot/iot_node.py +++ b/iti/applications/models/iot/iot_node.py @@ -22,7 +22,7 @@ class IotNode(db.Model, TimeModelMixin): node_number = db.Column(db.String(20), nullable=False, unique=True, comment="节点编号") title = db.Column(db.String(255), nullable=False, unique=True, comment="节点ID") mark = db.Column(db.String(255), nullable=False, comment="采集标识") - mark_type = db.Column(db.Integer, nullable=True, comment="采集类型 1:只读 2:只写 3:读写") + mark_type = db.Column(db.Integer, nullable=False, comment="采集类型 1:只读 2:只写 3:读写") tag_label = db.Column(db.String(255), nullable=False, comment="变量别名,用于数据存储标记") data_type = db.Column(db.String(255), nullable=False, comment="值类型 text: 文本 int: 整型 float: 浮点型 boolean:布尔型") status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:禁用 1:启用") diff --git a/iti/applications/models/iot/iot_workshop.py b/iti/applications/models/iot/iot_workshop.py index b681c31..abf051e 100644 --- a/iti/applications/models/iot/iot_workshop.py +++ b/iti/applications/models/iot/iot_workshop.py @@ -17,7 +17,7 @@ class IotWorkshop(db.Model, TimeModelMixin, RemarkModelMixin): ) workshop_name = db.Column(db.String(255), nullable=False, unique=True, comment="车间名称") workshop_number = db.Column(db.String(50), nullable=False, comment="车间编号") - total_area = db.Column(db.String(50), nullable=True, comment="总面积(单位:平方米)") + total_area = db.Column(db.String(50), nullable=False, comment="总面积(单位:平方米)") director_name = db.Column(db.String(30), nullable=False, comment="负责人姓名") director_phone = db.Column(db.String(15), nullable=False, comment="负责人电话") status = db.Column(db.Integer, nullable=False, default=0, comment="状态 0:已停用 1:生产中") diff --git a/iti/applications/routes/iot/node_ctl.py b/iti/applications/routes/iot/node_ctl.py index 21020f2..b21a5e8 100644 --- a/iti/applications/routes/iot/node_ctl.py +++ b/iti/applications/routes/iot/node_ctl.py @@ -95,6 +95,17 @@ def add_node(json_data: dict): if not device: raise BizException("设备信息不存在") + # 判断节点编号是否唯一 + if json_data.get("node_number") is not None: + if db.session.scalar( + select( + exists().where( + IotNode.node_number == json_data.get("node_number"), IotNode.id != id + ) + ) + ): + raise BizException("同编号节点已存在") + node.workshop_id = device.workshop_id node.status = 0 db.session.add(node) @@ -112,6 +123,17 @@ def update_node(id: int, json_data: dict): 更新采集节点信息 """ + # 判断节点编号是否唯一 + if json_data.get("node_number") is not None: + if db.session.scalar( + select( + exists().where( + IotNode.node_number == json_data.get("node_number"), IotNode.id != id + ) + ) + ): + raise BizException("同编号节点已存在") + node = db.session.scalar( select(IotNode) .options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint)) diff --git a/iti/applications/routes/iot/schemas/device.py b/iti/applications/routes/iot/schemas/device.py index fde744b..7abc055 100644 --- a/iti/applications/routes/iot/schemas/device.py +++ b/iti/applications/routes/iot/schemas/device.py @@ -61,17 +61,17 @@ class DeviceAddRequest(BaseSchema): description = fields.String( required=False, metadata={"example": "设备描述", "description": "设备描述"}, - load_default=None, + load_default="", ) brand_name = fields.String( required=False, metadata={"example": "品牌名称", "description": "品牌名称"}, - load_default=None, + load_default="", ) specification_model = fields.String( required=False, metadata={"example": "规格型号", "description": "规格型号"}, - load_default=None, + load_default="", ) @@ -98,17 +98,17 @@ class DeviceUpdateRequest(BaseSchema): description = fields.String( required=False, metadata={"example": "设备描述", "description": "设备描述"}, - load_default=None, + load_default="", ) brand_name = fields.String( required=False, metadata={"example": "品牌名称", "description": "品牌名称"}, - load_default=None, + load_default="", ) specification_model = fields.String( required=False, metadata={"example": "规格型号", "description": "规格型号"}, - load_default=None, + load_default="", ) status = fields.Integer( required=False, diff --git a/iti/applications/routes/iot/schemas/endpoint.py b/iti/applications/routes/iot/schemas/endpoint.py index 8eb6720..fbff601 100644 --- a/iti/applications/routes/iot/schemas/endpoint.py +++ b/iti/applications/routes/iot/schemas/endpoint.py @@ -47,7 +47,7 @@ class EndpointAddRequest(BaseSchema): description = fields.String( required=False, metadata={"example": "采集端描述", "description": "采集端描述"}, - load_default=None, + load_default="", ) ip = fields.String( required=True, @@ -64,12 +64,12 @@ class EndpointAddRequest(BaseSchema): brand_name = fields.String( required=False, metadata={"example": "品牌名称", "description": "品牌名称"}, - load_default=None, + load_default="", ) specification_model = fields.String( required=False, metadata={"example": "规格型号", "description": "规格型号"}, - load_default=None, + load_default="", ) @@ -89,7 +89,7 @@ class EndpointUpdateRequest(BaseSchema): description = fields.String( required=False, metadata={"example": "采集端描述", "description": "采集端描述"}, - load_default=None, + load_default="", ) ip = fields.String( required=True, @@ -106,12 +106,12 @@ class EndpointUpdateRequest(BaseSchema): brand_name = fields.String( required=False, metadata={"example": "品牌名称", "description": "品牌名称"}, - load_default=None, + load_default="", ) specification_model = fields.String( required=False, metadata={"example": "规格型号", "description": "规格型号"}, - load_default=None, + load_default="", ) status = fields.Integer( required=False, diff --git a/iti/applications/routes/iot/schemas/node.py b/iti/applications/routes/iot/schemas/node.py index 53f8021..62e5fb3 100644 --- a/iti/applications/routes/iot/schemas/node.py +++ b/iti/applications/routes/iot/schemas/node.py @@ -73,7 +73,7 @@ class NodeAddRequest(BaseSchema): mark = fields.String( required=False, metadata={"example": "采集标识", "description": "采集标识"}, - load_default=None, + load_default="", ) mark_type = fields.Integer( required=False, @@ -113,7 +113,7 @@ class NodeUpdateRequest(BaseSchema): mark = fields.String( required=False, metadata={"example": "采集标识", "description": "采集标识"}, - load_default=None, + load_default="", ) mark_type = fields.Integer( required=False, diff --git a/iti/applications/routes/iot/schemas/workshop.py b/iti/applications/routes/iot/schemas/workshop.py index 5881718..8ac4ec7 100644 --- a/iti/applications/routes/iot/schemas/workshop.py +++ b/iti/applications/routes/iot/schemas/workshop.py @@ -49,7 +49,7 @@ class WorkshopAddRequest(BaseSchema): total_area = fields.String( required=False, metadata={"example": "1234", "description": "总面积(单位:平方米)"}, - load_default=None, + load_default="", ) director_name = fields.String( required=False, @@ -64,7 +64,7 @@ class WorkshopAddRequest(BaseSchema): remark = fields.String( required=False, metadata={"example": "备注", "description": "备注"}, - load_default=None, + load_default="", ) @@ -87,7 +87,7 @@ class WorkshopUpdateRequest(BaseSchema): total_area = fields.String( required=False, metadata={"example": "1234", "description": "总面积(单位:平方米)"}, - load_default=None, + load_default="", ) director_name = fields.String( required=False, @@ -107,5 +107,5 @@ class WorkshopUpdateRequest(BaseSchema): remark = fields.String( required=False, metadata={"example": "备注", "description": "备注"}, - load_default=None, + load_default="", ) \ No newline at end of file diff --git a/iti/applications/service/iot/__init__.py b/iti/applications/service/iot/__init__.py index 9a09315..1181581 100644 --- a/iti/applications/service/iot/__init__.py +++ b/iti/applications/service/iot/__init__.py @@ -1,5 +1,5 @@ from .influxdb_mgr import iot_influxdb - +from .rocketmq_mgr import iot_rocketmq def init_iot(app) -> None: @@ -9,4 +9,10 @@ def init_iot(app) -> None: try: iot_influxdb.init_app(app) except Exception as e: - logger.error(f"初始化influxdb_mgr失败: {e}", exc_info=True) \ No newline at end of file + logger.error(f"初始化influxdb_mgr失败: {e}", exc_info=True) + + try: + iot_rocketmq.init_app(app) + iot_rocketmq.start_consumer("iot-collect-topic", "*") + except Exception as e: + logger.error(f"初始化rocketmq_mgr失败: {e}", exc_info=True) diff --git a/iti/applications/service/iot/rocketmq_mgr.py b/iti/applications/service/iot/rocketmq_mgr.py new file mode 100644 index 0000000..d4930d6 --- /dev/null +++ b/iti/applications/service/iot/rocketmq_mgr.py @@ -0,0 +1,276 @@ +from __future__ import annotations + +import json +import logging +import os +from dataclasses import dataclass +from typing import Any, Callable, Optional, Union +from iti.applications.service.iot.alert import add_endpoint_alert_log, add_node_alert_log + +try: + from rocketmq.client import ConsumeStatus, Message, Producer, PushConsumer +except Exception: + ConsumeStatus = None + Message = None + Producer = None + PushConsumer = None + +logger = logging.getLogger(__name__) + +RocketMQBody = Union[str, bytes, bytearray, dict, list] +ConsumeCallback = Callable[[Any], Any] + + +@dataclass(frozen=True) +class RocketMQConfig: + namesrv_addr: str + producer_group: str + consumer_group: str + default_topic: str + default_tags: str + charset: str + access_key: str + secret_key: str + security_token: str + instance_name: str + + @staticmethod + def from_env() -> "RocketMQConfig": + return RocketMQConfig( + namesrv_addr=os.getenv("ROCKETMQ_NAMESRV_ADDR", "127.0.0.1:9876").strip(), + producer_group=os.getenv("ROCKETMQ_PRODUCER_GROUP", "iot-collect-group").strip(), + consumer_group=os.getenv("ROCKETMQ_CONSUMER_GROUP", "iot-collect-group").strip(), + default_topic=os.getenv("ROCKETMQ_TOPIC", "iot-collect-topic").strip(), + default_tags=os.getenv("ROCKETMQ_TAGS", "*").strip() or "*", + charset=os.getenv("ROCKETMQ_CHARSET", "utf-8").strip() or "utf-8", + access_key=os.getenv("ROCKETMQ_ACCESS_KEY", "").strip(), + secret_key=os.getenv("ROCKETMQ_SECRET_KEY", "").strip(), + security_token=os.getenv("ROCKETMQ_SECURITY_TOKEN", "").strip(), + instance_name=os.getenv("ROCKETMQ_INSTANCE_NAME", "").strip(), + ) + + +class RocketMQMgr: + _instance = None + _initialized = False + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def init_app(self, app) -> None: + if self._initialized: + logger.warning("rocketmq连接已初始化,跳过重复初始化") + return + + if Producer is None or PushConsumer is None or Message is None: + logger.error( + "rocketmq客户端库不可用,请安装 rocketmq-client-python(并确保本机可加载 librocketmq)" + ) + self._initialized = False + return + + cfg = RocketMQConfig.from_env() + + if not cfg.namesrv_addr: + logger.error("rocketmq配置不完整,缺少: ROCKETMQ_NAMESRV_ADDR") + self._initialized = False + return + + self.cfg = cfg + self._app = app + self.producer: Optional[Any] = None + self.consumer: Optional[Any] = None + self._producer_started = False + self._consumer_started = False + + self._initialized = True + logger.info("rocketmq初始化成功") + + def _ensure_initialized(self) -> bool: + if not self._initialized: + logger.error("rocketmq未初始化,请先调用 iot_rocketmq.init_app(app)") + return False + return True + + def _apply_credentials_if_supported(self, client: Any) -> None: + if not self.cfg.access_key or not self.cfg.secret_key: + return + if hasattr(client, "set_session_credentials"): + try: + client.set_session_credentials( + self.cfg.access_key, self.cfg.secret_key, self.cfg.security_token + ) + except Exception as e: + logger.warning(f"rocketmq设置ACL凭证失败: {e}") + + def start_producer(self) -> bool: + if not self._ensure_initialized(): + return False + if self._producer_started and self.producer: + return True + + try: + producer = Producer(self.cfg.producer_group) + producer.set_name_server_address(self.cfg.namesrv_addr) + if self.cfg.instance_name and hasattr(producer, "set_instance_name"): + producer.set_instance_name(self.cfg.instance_name) + self._apply_credentials_if_supported(producer) + producer.start() + self.producer = producer + self._producer_started = True + logger.info("rocketmq producer启动成功") + return True + except Exception as e: + logger.error(f"rocketmq producer启动失败: {e}", exc_info=True) + self.producer = None + self._producer_started = False + return False + + def shutdown_producer(self) -> None: + if not self.producer: + return + try: + self.producer.shutdown() + except Exception as e: + logger.warning(f"rocketmq producer关闭失败: {e}") + finally: + self.producer = None + self._producer_started = False + + def _encode_body(self, body: RocketMQBody) -> bytes: + if isinstance(body, bytes): + return body + if isinstance(body, bytearray): + return bytes(body) + if isinstance(body, str): + return body.encode(self.cfg.charset, errors="replace") + return json.dumps(body, ensure_ascii=False).encode(self.cfg.charset, errors="replace") + + def send_sync( + self, + body: RocketMQBody, + topic: Optional[str] = None, + tags: Optional[str] = None, + keys: Optional[str] = None, + ) -> Any: + if not self._ensure_initialized(): + raise RuntimeError("rocketmq未初始化") + if not self.start_producer() or not self.producer: + raise RuntimeError("rocketmq producer未启动") + + real_topic = (topic or self.cfg.default_topic).strip() + if not real_topic: + logger.error("topic不能为空(可通过参数传入或设置ROCKETMQ_TOPIC)") + raise ValueError("topic不能为空(可通过参数传入或设置ROCKETMQ_TOPIC)") + + msg = Message(real_topic) + if tags: + if hasattr(msg, "set_tags"): + msg.set_tags(tags) + if keys: + if hasattr(msg, "set_keys"): + msg.set_keys(keys) + msg.set_body(self._encode_body(body)) + return self.producer.send_sync(msg) + + def start_consumer( + self, + topic: Optional[str] = None, + tags: Optional[str] = None, + callback: Optional[ConsumeCallback] = None, + ) -> bool: + if not self._ensure_initialized(): + return False + if self._consumer_started and self.consumer: + return True + + real_topic = (topic or self.cfg.default_topic).strip() + if not real_topic: + logger.error("consumer topic不能为空(可通过参数传入或设置ROCKETMQ_TOPIC)") + return False + + selector_expression = (tags or self.cfg.default_tags).strip() or "*" + consume_cb = callback or self._default_consume_callback + + try: + consumer = PushConsumer(self.cfg.consumer_group) + consumer.set_name_server_address(self.cfg.namesrv_addr) + if self.cfg.instance_name and hasattr(consumer, "set_instance_name"): + consumer.set_instance_name(self.cfg.instance_name) + self._apply_credentials_if_supported(consumer) + + try: + consumer.subscribe(real_topic, consume_cb, selector_expression) + except TypeError: + consumer.subscribe(real_topic, selector_expression, consume_cb) + + consumer.start() + self.consumer = consumer + self._consumer_started = True + logger.info("rocketmq consumer启动成功") + return True + except Exception as e: + logger.error(f"rocketmq consumer启动失败: {e}", exc_info=True) + self.consumer = None + self._consumer_started = False + return False + + def shutdown_consumer(self) -> None: + if not self.consumer: + return + try: + self.consumer.shutdown() + except Exception as e: + logger.warning(f"rocketmq consumer关闭失败: {e}") + finally: + self.consumer = None + self._consumer_started = False + + def _default_consume_callback(self, msg: Any) -> Any: + topic = getattr(msg, "topic", "") + tags = getattr(msg, "tags", "") + keys = getattr(msg, "keys", "") + body_text = getattr(msg, "body", b"").decode(self.cfg.charset, errors="replace") + + # logger.info( + # f"[RocketMQ] topic={topic} tags={tags} keys={keys} body={body_text}" + # ) + try: + body_dict = json.loads(body_text) + except Exception: + logger.error(f"[RocketMQ] 解析body失败: {body_text}") + return False + + app = getattr(self, "_app", None) + if app is None: + logger.error("[RocketMQ] 未绑定Flask app,无法创建application context") + return False + + with app.app_context(): + if tags == b"netData" or tags == "netData": + add_endpoint_alert_log(body_dict["endpointId"]) + + elif tags == b"collectData" or tags == "collectData": + for node_data in body_dict["nodeDataList"]: + node_id = int(node_data["nodeId"]) + alert_value = node_data["alertValue"] + add_node_alert_log(node_id, alert_value) + + if ConsumeStatus is not None and hasattr(ConsumeStatus, "CONSUME_SUCCESS"): + return ConsumeStatus.CONSUME_SUCCESS + return True + + def shutdown(self) -> None: + self.shutdown_consumer() + self.shutdown_producer() + + def __del__(self): + try: + self.shutdown() + except Exception: + pass + + +iot_rocketmq = RocketMQMgr()