from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS import logging, os from typing import List, Dict, Any logger = logging.getLogger(__name__) class InfluxDBMgr: """ influxdb链接管理 """ _instance = None _initialized = False def __new__(cls): """ 单例模式 """ if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def init_app(self, app): """ 初始化 """ if self._initialized: logger.warning("influxdb连接已初始化,跳过重复初始化") return logger.info("influxdb初始化...........") url = os.getenv("INFLUXDB_URL", "") token = os.getenv("INFLUXDB_TOKEN", "") org = os.getenv("INFLUXDB_ORG", "") self.bucket = os.getenv("INFLUXDB_BUCKET", "") if len(url) == 0: logger.error("influxdb未配置,初始化失败") self._initialized = False return self.client = InfluxDBClient(url=url, token=token, org=org) def query_table(self, query_measurement: str, query_filed: str)-> List[Dict[str, Any]]: query_api = self.client.query_api() flux_query = f''' from(bucket: "{self.bucket}") |> range(start: -15m) |> filter(fn: (r) => r._measurement == "{query_measurement}") |> filter(fn: (r) => r._field == "{query_filed}") |> yield(name: "mean") ''' query_tables = query_api.query(flux_query) result_records = [] for _table in query_tables: for record in _table.records: dict = {} # record 包含时间、measurement、field、value 和 tags dict["time"] = record.get_time() dict["measurement"] = record.get_measurement() dict["field"] = record.get_field() dict["value"] = record.get_value() # dict["tags"] = record.values # 包含所有 tags 和系统字段 # logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}") result_records.append(dict) return result_records def close_client(self): """ 关闭客户端连接,释放资源 """ if hasattr(self, 'client') and self.client: self.client.close() logger.info("InfluxDB 客户端已关闭") def __del__(self): """ 析构时自动关闭连接,有用? """ self.close_client() iot_influxdb = InfluxDBMgr()