From 194dfcf2a3a6e19c8078426b166b3cbccfd7d8c0 Mon Sep 17 00:00:00 2001 From: "DESKTOP-1JS6RSM\\Admin" Date: Mon, 23 Feb 2026 10:11:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0influxdb=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hatch.toml | 1 + iti/.env.dev | 12 ++- iti/applications/routes/iot/device_ctl.py | 20 +++++ iti/applications/routes/iot/endpoint_ctl.py | 21 +++++ iti/applications/routes/iot/node_ctl.py | 31 +++++++ iti/applications/routes/iot/workshop_ctl.py | 19 +++++ iti/applications/service/__init__.py | 5 +- iti/applications/service/iot/__init__.py | 12 +++ iti/applications/service/iot/influxdb_mgr.py | 86 ++++++++++++++++++++ 9 files changed, 204 insertions(+), 3 deletions(-) create mode 100644 iti/applications/service/iot/__init__.py create mode 100644 iti/applications/service/iot/influxdb_mgr.py diff --git a/hatch.toml b/hatch.toml index 000fc17..7ad664c 100644 --- a/hatch.toml +++ b/hatch.toml @@ -2,6 +2,7 @@ [envs.default] type = "virtual" dependencies = [ + "influxdb-client", "flask>=3.1.0", "apiflask>=2.4.0", "flask-cors>=6.0.0", diff --git a/iti/.env.dev b/iti/.env.dev index 06fef2a..a0a4741 100644 --- a/iti/.env.dev +++ b/iti/.env.dev @@ -1,7 +1,7 @@ FLASK_ENV=dev SECRET_KEY=iti-flask JWT_SECRET_KEY=iti-flask -DATABASE_URL=mysql+pymysql://root:root@127.0.0.1:3307/iti-flask?charset=utf8mb4 +DATABASE_URL=mysql+pymysql://root:chen222262@124.223.195.237:13306/iti-flask?charset=utf8mb4 # 前端相关 FRONTEND_ENABLED=False # 是否启用前端渲染 FRONTEND_PATH=dist # 前端文件所在位置,若static则无需填写 @@ -12,4 +12,12 @@ FRONTEND_PATH=dist # 前端文件所在位置,若static则无需填写 ALIYUN_OSS_ACCESS_KEY_ID=LTAI5t9cymUAWHVEo36yygaT ALIYUN_OSS_ACCESS_KEY_SECRET=FaaUsxadRYyshbYeAV8ypZNYVOx3tE ALIYUN_OSS_ENDPOINT=oss-cn-chengdu.aliyuncs.com -ALIYUN_OSS_BUCKET=maintaince-dev \ No newline at end of file +ALIYUN_OSS_BUCKET=maintaince-dev + +# ============================================ +# influxdb 配置 +# ============================================ +INFLUXDB_URL="http://124.223.195.237:8086" +INFLUXDB_TOKEN="HdHOox3RqEjJ--Ma9_dFcf-Iv8wu2u0FyD_sV4MT4EIoQoT7h4eZLR_n_yGgmiSLAGiIaUgaH6x-cILNGV8W4g==" +INFLUXDB_ORG="noface" +INFLUXDB_BUCKET="yh-iot" \ No newline at end of file diff --git a/iti/applications/routes/iot/device_ctl.py b/iti/applications/routes/iot/device_ctl.py index fd9e461..6714f3b 100644 --- a/iti/applications/routes/iot/device_ctl.py +++ b/iti/applications/routes/iot/device_ctl.py @@ -14,6 +14,7 @@ from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException from flask_jwt_extended import jwt_required from sqlalchemy import select, delete, exists +from sqlalchemy.sql.functions import func from sqlalchemy.orm import noload from iti.applications.common import permission @@ -104,6 +105,25 @@ def delete_device(id: int): db.session.commit() return success() +@bp.get("/count") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:device:list") +def count_device(): + """ + 统计设备数量 + """ + + countData = {} + deviceReady = db.session.query(func.count(IotDevice.id).label('number')).filter_by(status=1).first().number + deviceUnready = db.session.query(func.count(IotDevice.id).label('number')).filter_by(status=0).first().number + deviceFix = db.session.query(func.count(IotDevice.id).label('number')).filter_by(status=2).first().number + countData["ready"] = deviceReady + countData["unReady"] = deviceUnready + countData["fix"] = deviceFix + countData["total"] = deviceReady + deviceUnready + deviceFix + + return success(countData) def get_page(query_data: DeviceQuery): """ diff --git a/iti/applications/routes/iot/endpoint_ctl.py b/iti/applications/routes/iot/endpoint_ctl.py index 2c99b44..d04dd0b 100644 --- a/iti/applications/routes/iot/endpoint_ctl.py +++ b/iti/applications/routes/iot/endpoint_ctl.py @@ -15,6 +15,7 @@ from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException from flask_jwt_extended import jwt_required from sqlalchemy import select, delete, exists +from sqlalchemy.sql.functions import func from sqlalchemy.orm import noload from iti.applications.common import permission @@ -115,6 +116,26 @@ def delete_endpoint(id: int): db.session.commit() return success() +@bp.get("/count") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:endpoint:list") +def count_endpoint(): + """ + 统计采集端数量 + """ + + countData = {} + endpointReady = db.session.query(func.count(IotEndpoint.id).label('number')).filter_by(status=1).first().number + endpointUnready = db.session.query(func.count(IotEndpoint.id).label('number')).filter_by(status=0).first().number + endpointFix = db.session.query(func.count(IotEndpoint.id).label('number')).filter_by(status=2).first().number + countData["ready"] = endpointReady + countData["unReady"] = endpointUnready + countData["fix"] = endpointFix + countData["total"] = endpointReady + endpointUnready + endpointFix + + return success(countData) + def get_page(query_data: EndpointQuery): """ 获取采集端信息分页 diff --git a/iti/applications/routes/iot/node_ctl.py b/iti/applications/routes/iot/node_ctl.py index aaf3818..ff76372 100644 --- a/iti/applications/routes/iot/node_ctl.py +++ b/iti/applications/routes/iot/node_ctl.py @@ -11,6 +11,9 @@ from .schemas.node import ( NodeAddRequest, NodeUpdateRequest, ) +from iti.applications.service.iot import ( + iot_influxdb +) from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException from flask_jwt_extended import jwt_required @@ -119,6 +122,34 @@ def delete_node(id: int): db.session.commit() return success() +@bp.get("/monitoring/") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:node:list") +def get_node_alert_data(id: int): + """ + 获取监控数据 + """ + + node = db.session.scalar( + select(IotNode) + .options(noload(IotNode.workshop), noload(IotNode.device), noload(IotNode.endpoint)) + .filter_by(id=id)) + if not node: + raise BizException("采集节点不存在") + + measurement = f"ep{node.endpoint_id}_nd{node.id}" + + resultData = {} + resultData["title"] = node.title + resultData["dataType"] = node.data_type + resultData["tagLabel"] = node.tag_label + resultData["mark"] = node.mark + + resultData["monitoringData"] = iot_influxdb.query_table(measurement, node.tag_label) + + return success(resultData) + def get_page(query_data: NodeQuery): """ diff --git a/iti/applications/routes/iot/workshop_ctl.py b/iti/applications/routes/iot/workshop_ctl.py index 1d83003..67d156a 100644 --- a/iti/applications/routes/iot/workshop_ctl.py +++ b/iti/applications/routes/iot/workshop_ctl.py @@ -14,6 +14,7 @@ from iti.applications.common import ModelFilter from iti.applications.common.exceptions.biz_exp import BizException from flask_jwt_extended import jwt_required from sqlalchemy import select +from sqlalchemy.sql.functions import func from iti.applications.common import permission bp = APIBlueprint("iot_workshop", __name__, url_prefix="/workshop", tag="车间管理") @@ -102,6 +103,24 @@ def delete_workshop(id: int): db.session.commit() return success() +@bp.get("/count") +@jwt_required() +@bp.doc(security="JWT") +@permission("iot:workshop:list") +def count_workshop(): + """ + 统计车间数量 + """ + + countData = {} + workshopReady = db.session.query(func.count(IotWorkshop.id).label('number')).filter_by(status=1).first().number + workshopUnready = db.session.query(func.count(IotWorkshop.id).label('number')).filter_by(status=0).first().number + countData["ready"] = workshopReady + countData["unReady"] = workshopUnready + countData["total"] = workshopReady + workshopUnready + + return success(countData) + def get_list_or_page(query_data: WorkshopQuery): """ diff --git a/iti/applications/service/__init__.py b/iti/applications/service/__init__.py index 01a915e..b6be7bb 100644 --- a/iti/applications/service/__init__.py +++ b/iti/applications/service/__init__.py @@ -7,4 +7,7 @@ def init_services(app) -> None: # 初始化文件目录 from iti.applications.service.sys.sys_file_directory import init_app as init_file_directory - init_file_directory(app) \ No newline at end of file + init_file_directory(app) + + from iti.applications.service.iot import init_iot as init_iot_app + init_iot_app(app) \ No newline at end of file diff --git a/iti/applications/service/iot/__init__.py b/iti/applications/service/iot/__init__.py new file mode 100644 index 0000000..9a09315 --- /dev/null +++ b/iti/applications/service/iot/__init__.py @@ -0,0 +1,12 @@ +from .influxdb_mgr import iot_influxdb + + + +def init_iot(app) -> None: + import logging + logger = logging.getLogger(__name__) + + try: + iot_influxdb.init_app(app) + except Exception as e: + logger.error(f"初始化influxdb_mgr失败: {e}", exc_info=True) \ No newline at end of file diff --git a/iti/applications/service/iot/influxdb_mgr.py b/iti/applications/service/iot/influxdb_mgr.py new file mode 100644 index 0000000..ed2b54d --- /dev/null +++ b/iti/applications/service/iot/influxdb_mgr.py @@ -0,0 +1,86 @@ +from influxdb_client import InfluxDBClient +from influxdb_client.client.write_api import SYNCHRONOUS +import logging, os +from typing import List, Dict, Any + +logger = logging.getLogger(__name__) + + +class InfluxDBMgr: + """ + influxdb链接管理 + """ + + _instance = None + _initialized = False + + def __new__(cls): + """ + 单例模式 + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def init_app(self, app): + """ + 初始化 + """ + if self._initialized: + logger.warning("influxdb连接已初始化,跳过重复初始化") + return + + logger.info("influxdb初始化...........") + url = os.getenv("INFLUXDB_URL", "") + token = os.getenv("INFLUXDB_TOKEN", "") + org = os.getenv("INFLUXDB_ORG", "") + self.bucket = os.getenv("INFLUXDB_BUCKET", "") + if len(url) == 0: + logger.error("influxdb未配置,初始化失败") + self._initialized = False + return + self.client = InfluxDBClient(url=url, token=token, org=org) + + def query_table(self, query_measurement: str, query_filed: str)-> List[Dict[str, Any]]: + query_api = self.client.query_api() + flux_query = f''' + from(bucket: "{self.bucket}") + |> range(start: -15m) + |> filter(fn: (r) => r._measurement == "{query_measurement}") + |> filter(fn: (r) => r._field == "{query_filed}") + |> yield(name: "mean") + ''' + + query_tables = query_api.query(flux_query) + + result_records = [] + + for _table in query_tables: + for record in _table.records: + dict = {} + # record 包含时间、measurement、field、value 和 tags + dict["time"] = record.get_time() + dict["measurement"] = record.get_measurement() + dict["field"] = record.get_field() + dict["value"] = record.get_value() + # dict["tags"] = record.values # 包含所有 tags 和系统字段 + # logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}") + result_records.append(dict) + + return result_records + + def close_client(self): + """ + 关闭客户端连接,释放资源 + """ + if hasattr(self, 'client') and self.client: + self.client.close() + logger.info("InfluxDB 客户端已关闭") + + def __del__(self): + """ + 析构时自动关闭连接,有用? + """ + self.close_client() + +iot_influxdb = InfluxDBMgr() \ No newline at end of file