from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS import logging, os, time from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) class InfluxDBMgr: """ influxdb链接管理 """ _instance = None _initialized = False def __new__(cls): """ 单例模式 """ if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def init_app(self, app) -> None: """ 初始化 """ if self._initialized: logger.warning("influxdb连接已初始化,跳过重复初始化") return logger.info("influxdb初始化...........") # 增强配置验证 self.url = os.getenv("INFLUXDB_URL", "") self.token = os.getenv("INFLUXDB_TOKEN", "") self.org = os.getenv("INFLUXDB_ORG", "") self.bucket = os.getenv("INFLUXDB_BUCKET", "") # 更严格的配置检查 missing_configs = [] if not self.url: missing_configs.append("INFLUXDB_URL") if not self.token: missing_configs.append("INFLUXDB_TOKEN") if not self.org: missing_configs.append("INFLUXDB_ORG") if not self.bucket: missing_configs.append("INFLUXDB_BUCKET") if missing_configs: logger.error(f"influxdb配置不完整,缺少: {', '.join(missing_configs)}") self._initialized = False return # 配置重连参数 self.max_retries = int(os.getenv("INFLUXDB_MAX_RETRIES", "3")) self.retry_delay = float(os.getenv("INFLUXDB_RETRY_DELAY", "1")) self.max_retry_delay = float(os.getenv("INFLUXDB_MAX_RETRY_DELAY", "30")) # 初始化API实例缓存 self._query_api = None self._write_api = None # 建立连接 if not self._connect(): logger.error("influxdb初始化失败") self._initialized = False return logger.info("influxdb初始化成功") def _connect(self) -> bool: """ 建立连接 """ try: self.client = InfluxDBClient( url=self.url, token=self.token, org=self.org, # timeout=10 # 添加连接超时设置 ) self._initialized = True logger.info("InfluxDB连接成功") return True except Exception as e: logger.error(f"InfluxDB连接失败: {e}") self._initialized = False return False def _check_connection(self) -> bool: """ 检查连接状态 """ if not self._initialized or not hasattr(self, 'client') or not self.client: return False try: # 尝试执行一个简单的查询来检查连接状态 if not hasattr(self, '_query_api') or not self._query_api: self._query_api = self.client.query_api() test_query = f'from(bucket: "{self.bucket}") |> range(start: -1m) |> limit(n: 1)' self._query_api.query(test_query) return True except Exception as e: logger.warning(f"InfluxDB连接检查失败: {e}") return False def _reconnect(self) -> bool: """ 重新连接 """ logger.info("尝试重新连接InfluxDB...") for attempt in range(self.max_retries): try: # 使用更平滑的指数退避算法 delay = min(self.retry_delay * (1.5 ** attempt), self.max_retry_delay) logger.info(f"第{attempt + 1}次重连尝试,等待{delay:.2f}秒...") time.sleep(delay) self.close_client() if self._connect() and self._check_connection(): logger.info("InfluxDB重连成功") return True except Exception as e: logger.error(f"第{attempt + 1}次重连失败: {e}") logger.error(f"经过{self.max_retries}次尝试后,InfluxDB重连失败") return False def _ensure_connection(self) -> bool: """ 确保连接有效 """ if not self._check_connection(): return self._reconnect() return True def query_table(self, query_measurement: str, query_field: str)-> List[Dict[str, Any]]: """ 查询数据表,带自动重连功能 """ if not self._ensure_connection(): logger.error("无法建立InfluxDB连接,查询失败") return [] try: if not hasattr(self, '_query_api') or not self._query_api: self._query_api = self.client.query_api() flux_query = f''' from(bucket: "{self.bucket}") |> range(start: -15m) |> filter(fn: (r) => r._measurement == "{query_measurement}") |> filter(fn: (r) => r._field == "{query_field}") |> yield(name: "mean") ''' query_tables = self._query_api.query(flux_query) result_records = [] for _table in query_tables: for record in _table.records: dict = {} # record 包含时间、measurement、field、value 和 tags dict["time"] = record.get_time() dict["measurement"] = record.get_measurement() dict["field"] = record.get_field() dict["value"] = record.get_value() # dict["tags"] = record.values # 包含所有 tags 和系统字段 # logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}") result_records.append(dict) return result_records except Exception as e: logger.error(f"查询InfluxDB失败: {e}") # 查询失败时也尝试重连 if not self._reconnect(): logger.error("查询失败且重连失败,返回空结果") return [] def write_data(self, measurement: str, fields: Dict[str, Any], tags: Optional[Dict[str, str]] = None, timestamp: Optional[int] = None) -> bool: """ 写入数据到InfluxDB,带自动重连功能 """ if not self._ensure_connection(): logger.error("无法建立InfluxDB连接,写入失败") return False try: if not hasattr(self, '_write_api') or not self._write_api: self._write_api = self.client.write_api(write_options=SYNCHRONOUS) point = Point(measurement) # 添加标签 if tags: for tag_key, tag_value in tags.items(): point = point.tag(tag_key, tag_value) # 添加字段 for field_key, field_value in fields.items(): point = point.field(field_key, field_value) # 添加时间戳 if timestamp: point = point.time(timestamp) self._write_api.write(bucket=self.bucket, record=point) logger.debug(f"数据成功写入InfluxDB: {measurement}") return True except Exception as e: logger.error(f"写入InfluxDB失败: {e}") # 写入失败时也尝试重连 if not self._reconnect(): logger.error("写入失败且重连失败") return False def write_batch_data(self, points: List[Point]) -> bool: """ 批量写入数据到InfluxDB,带自动重连功能 """ if not self._ensure_connection(): logger.error("无法建立InfluxDB连接,批量写入失败") return False try: if not hasattr(self, '_write_api') or not self._write_api: self._write_api = self.client.write_api(write_options=SYNCHRONOUS) self._write_api.write(bucket=self.bucket, record=points) logger.debug(f"批量数据成功写入InfluxDB: {len(points)}条记录") return True except Exception as e: logger.error(f"批量写入InfluxDB失败: {e}") # 写入失败时也尝试重连 if not self._reconnect(): logger.error("批量写入失败且重连失败") return False def close_client(self) -> None: """ 关闭客户端连接,释放资源 """ # 关闭API实例 if hasattr(self, '_query_api'): delattr(self, '_query_api') if hasattr(self, '_write_api'): delattr(self, '_write_api') # 关闭客户端连接 if hasattr(self, 'client') and self.client: self.client.close() logger.info("InfluxDB 客户端已关闭") def __del__(self): """ 析构时自动关闭连接,有用? """ self.close_client() iot_influxdb = InfluxDBMgr()