From 675d2fad8f9b0626d9c835b4fb1a1ae6a59375e4 Mon Sep 17 00:00:00 2001 From: "DESKTOP-1JS6RSM\\Admin" Date: Thu, 26 Feb 2026 10:20:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0influxdb=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iti/.env.dev | 5 +- iti/applications/service/iot/influxdb_mgr.py | 252 ++++++++++++++++--- 2 files changed, 222 insertions(+), 35 deletions(-) diff --git a/iti/.env.dev b/iti/.env.dev index a0a4741..8101f44 100644 --- a/iti/.env.dev +++ b/iti/.env.dev @@ -20,4 +20,7 @@ ALIYUN_OSS_BUCKET=maintaince-dev INFLUXDB_URL="http://124.223.195.237:8086" INFLUXDB_TOKEN="HdHOox3RqEjJ--Ma9_dFcf-Iv8wu2u0FyD_sV4MT4EIoQoT7h4eZLR_n_yGgmiSLAGiIaUgaH6x-cILNGV8W4g==" INFLUXDB_ORG="noface" -INFLUXDB_BUCKET="yh-iot" \ No newline at end of file +INFLUXDB_BUCKET="yh-iot" +INFLUXDB_MAX_RETRIES=3 +INFLUXDB_RETRY_DELAY=1.0 +INFLUXDB_MAX_RETRY_DELAY=30.0 \ No newline at end of file diff --git a/iti/applications/service/iot/influxdb_mgr.py b/iti/applications/service/iot/influxdb_mgr.py index ed2b54d..6923cf3 100644 --- a/iti/applications/service/iot/influxdb_mgr.py +++ b/iti/applications/service/iot/influxdb_mgr.py @@ -1,7 +1,7 @@ -from influxdb_client import InfluxDBClient +from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS -import logging, os -from typing import List, Dict, Any +import logging, os, time +from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) @@ -22,7 +22,7 @@ class InfluxDBMgr: cls._instance = super().__new__(cls) return cls._instance - def init_app(self, app): + def init_app(self, app) -> None: """ 初始化 """ @@ -31,48 +31,232 @@ class InfluxDBMgr: return logger.info("influxdb初始化...........") - url = os.getenv("INFLUXDB_URL", "") - token = os.getenv("INFLUXDB_TOKEN", "") - org = os.getenv("INFLUXDB_ORG", "") + + # 增强配置验证 + self.url = os.getenv("INFLUXDB_URL", "") + self.token = os.getenv("INFLUXDB_TOKEN", "") + self.org = os.getenv("INFLUXDB_ORG", "") self.bucket = os.getenv("INFLUXDB_BUCKET", "") - if len(url) == 0: - logger.error("influxdb未配置,初始化失败") + + # 更严格的配置检查 + missing_configs = [] + if not self.url: + missing_configs.append("INFLUXDB_URL") + if not self.token: + missing_configs.append("INFLUXDB_TOKEN") + if not self.org: + missing_configs.append("INFLUXDB_ORG") + if not self.bucket: + missing_configs.append("INFLUXDB_BUCKET") + + if missing_configs: + logger.error(f"influxdb配置不完整,缺少: {', '.join(missing_configs)}") + self._initialized = False + return + + # 配置重连参数 + self.max_retries = int(os.getenv("INFLUXDB_MAX_RETRIES", "3")) + self.retry_delay = float(os.getenv("INFLUXDB_RETRY_DELAY", "1")) + self.max_retry_delay = float(os.getenv("INFLUXDB_MAX_RETRY_DELAY", "30")) + + # 初始化API实例缓存 + self._query_api = None + self._write_api = None + + # 建立连接 + if not self._connect(): + logger.error("influxdb初始化失败") self._initialized = False return - self.client = InfluxDBClient(url=url, token=token, org=org) + + logger.info("influxdb初始化成功") + + def _connect(self) -> bool: + """ + 建立连接 + """ + try: + self.client = InfluxDBClient( + url=self.url, + token=self.token, + org=self.org, + # timeout=10 # 添加连接超时设置 + ) + self._initialized = True + logger.info("InfluxDB连接成功") + return True + except Exception as e: + logger.error(f"InfluxDB连接失败: {e}") + self._initialized = False + return False + + def _check_connection(self) -> bool: + """ + 检查连接状态 + """ + if not self._initialized or not hasattr(self, 'client') or not self.client: + return False + + try: + # 尝试执行一个简单的查询来检查连接状态 + if not hasattr(self, '_query_api') or not self._query_api: + self._query_api = self.client.query_api() + + test_query = f'from(bucket: "{self.bucket}") |> range(start: -1m) |> limit(n: 1)' + self._query_api.query(test_query) + return True + except Exception as e: + logger.warning(f"InfluxDB连接检查失败: {e}") + return False - def query_table(self, query_measurement: str, query_filed: str)-> List[Dict[str, Any]]: - query_api = self.client.query_api() - flux_query = f''' - from(bucket: "{self.bucket}") - |> range(start: -15m) - |> filter(fn: (r) => r._measurement == "{query_measurement}") - |> filter(fn: (r) => r._field == "{query_filed}") - |> yield(name: "mean") - ''' + def _reconnect(self) -> bool: + """ + 重新连接 + """ + logger.info("尝试重新连接InfluxDB...") + for attempt in range(self.max_retries): + try: + # 使用更平滑的指数退避算法 + delay = min(self.retry_delay * (1.5 ** attempt), self.max_retry_delay) + logger.info(f"第{attempt + 1}次重连尝试,等待{delay:.2f}秒...") + time.sleep(delay) + + self.close_client() + if self._connect() and self._check_connection(): + logger.info("InfluxDB重连成功") + return True + + except Exception as e: + logger.error(f"第{attempt + 1}次重连失败: {e}") + + logger.error(f"经过{self.max_retries}次尝试后,InfluxDB重连失败") + return False + + def _ensure_connection(self) -> bool: + """ + 确保连接有效 + """ + if not self._check_connection(): + return self._reconnect() + return True + + def query_table(self, query_measurement: str, query_field: str)-> List[Dict[str, Any]]: + """ + 查询数据表,带自动重连功能 + """ + if not self._ensure_connection(): + logger.error("无法建立InfluxDB连接,查询失败") + return [] + + try: + if not hasattr(self, '_query_api') or not self._query_api: + self._query_api = self.client.query_api() + + flux_query = f''' + from(bucket: "{self.bucket}") + |> range(start: -15m) + |> filter(fn: (r) => r._measurement == "{query_measurement}") + |> filter(fn: (r) => r._field == "{query_field}") + |> yield(name: "mean") + ''' - query_tables = query_api.query(flux_query) + query_tables = self._query_api.query(flux_query) - result_records = [] + result_records = [] + + for _table in query_tables: + for record in _table.records: + dict = {} + # record 包含时间、measurement、field、value 和 tags + dict["time"] = record.get_time() + dict["measurement"] = record.get_measurement() + dict["field"] = record.get_field() + dict["value"] = record.get_value() + # dict["tags"] = record.values # 包含所有 tags 和系统字段 + # logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}") + result_records.append(dict) + + return result_records + + except Exception as e: + logger.error(f"查询InfluxDB失败: {e}") + # 查询失败时也尝试重连 + if not self._reconnect(): + logger.error("查询失败且重连失败,返回空结果") + return [] + + def write_data(self, measurement: str, fields: Dict[str, Any], tags: Optional[Dict[str, str]] = None, timestamp: Optional[int] = None) -> bool: + """ + 写入数据到InfluxDB,带自动重连功能 + """ + if not self._ensure_connection(): + logger.error("无法建立InfluxDB连接,写入失败") + return False - for _table in query_tables: - for record in _table.records: - dict = {} - # record 包含时间、measurement、field、value 和 tags - dict["time"] = record.get_time() - dict["measurement"] = record.get_measurement() - dict["field"] = record.get_field() - dict["value"] = record.get_value() - # dict["tags"] = record.values # 包含所有 tags 和系统字段 - # logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}") - result_records.append(dict) + try: + if not hasattr(self, '_write_api') or not self._write_api: + self._write_api = self.client.write_api(write_options=SYNCHRONOUS) + + point = Point(measurement) + + # 添加标签 + if tags: + for tag_key, tag_value in tags.items(): + point = point.tag(tag_key, tag_value) + + # 添加字段 + for field_key, field_value in fields.items(): + point = point.field(field_key, field_value) + + # 添加时间戳 + if timestamp: + point = point.time(timestamp) + + self._write_api.write(bucket=self.bucket, record=point) + logger.debug(f"数据成功写入InfluxDB: {measurement}") + return True + + except Exception as e: + logger.error(f"写入InfluxDB失败: {e}") + # 写入失败时也尝试重连 + if not self._reconnect(): + logger.error("写入失败且重连失败") + return False + + def write_batch_data(self, points: List[Point]) -> bool: + """ + 批量写入数据到InfluxDB,带自动重连功能 + """ + if not self._ensure_connection(): + logger.error("无法建立InfluxDB连接,批量写入失败") + return False - return result_records + try: + if not hasattr(self, '_write_api') or not self._write_api: + self._write_api = self.client.write_api(write_options=SYNCHRONOUS) + + self._write_api.write(bucket=self.bucket, record=points) + logger.debug(f"批量数据成功写入InfluxDB: {len(points)}条记录") + return True + + except Exception as e: + logger.error(f"批量写入InfluxDB失败: {e}") + # 写入失败时也尝试重连 + if not self._reconnect(): + logger.error("批量写入失败且重连失败") + return False - def close_client(self): + def close_client(self) -> None: """ 关闭客户端连接,释放资源 """ + # 关闭API实例 + if hasattr(self, '_query_api'): + delattr(self, '_query_api') + if hasattr(self, '_write_api'): + delattr(self, '_write_api') + + # 关闭客户端连接 if hasattr(self, 'client') and self.client: self.client.close() logger.info("InfluxDB 客户端已关闭")