|
|
|
|
@ -0,0 +1,86 @@
|
|
|
|
|
from influxdb_client import InfluxDBClient
|
|
|
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
|
import logging, os
|
|
|
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class InfluxDBMgr:
|
|
|
|
|
"""
|
|
|
|
|
influxdb链接管理
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
_instance = None
|
|
|
|
|
_initialized = False
|
|
|
|
|
|
|
|
|
|
def __new__(cls):
|
|
|
|
|
"""
|
|
|
|
|
单例模式
|
|
|
|
|
"""
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
cls._instance = super().__new__(cls)
|
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
|
|
def init_app(self, app):
|
|
|
|
|
"""
|
|
|
|
|
初始化
|
|
|
|
|
"""
|
|
|
|
|
if self._initialized:
|
|
|
|
|
logger.warning("influxdb连接已初始化,跳过重复初始化")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logger.info("influxdb初始化...........")
|
|
|
|
|
url = os.getenv("INFLUXDB_URL", "")
|
|
|
|
|
token = os.getenv("INFLUXDB_TOKEN", "")
|
|
|
|
|
org = os.getenv("INFLUXDB_ORG", "")
|
|
|
|
|
self.bucket = os.getenv("INFLUXDB_BUCKET", "")
|
|
|
|
|
if len(url) == 0:
|
|
|
|
|
logger.error("influxdb未配置,初始化失败")
|
|
|
|
|
self._initialized = False
|
|
|
|
|
return
|
|
|
|
|
self.client = InfluxDBClient(url=url, token=token, org=org)
|
|
|
|
|
|
|
|
|
|
def query_table(self, query_measurement: str, query_filed: str)-> List[Dict[str, Any]]:
|
|
|
|
|
query_api = self.client.query_api()
|
|
|
|
|
flux_query = f'''
|
|
|
|
|
from(bucket: "{self.bucket}")
|
|
|
|
|
|> range(start: -15m)
|
|
|
|
|
|> filter(fn: (r) => r._measurement == "{query_measurement}")
|
|
|
|
|
|> filter(fn: (r) => r._field == "{query_filed}")
|
|
|
|
|
|> yield(name: "mean")
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
query_tables = query_api.query(flux_query)
|
|
|
|
|
|
|
|
|
|
result_records = []
|
|
|
|
|
|
|
|
|
|
for _table in query_tables:
|
|
|
|
|
for record in _table.records:
|
|
|
|
|
dict = {}
|
|
|
|
|
# record 包含时间、measurement、field、value 和 tags
|
|
|
|
|
dict["time"] = record.get_time()
|
|
|
|
|
dict["measurement"] = record.get_measurement()
|
|
|
|
|
dict["field"] = record.get_field()
|
|
|
|
|
dict["value"] = record.get_value()
|
|
|
|
|
# dict["tags"] = record.values # 包含所有 tags 和系统字段
|
|
|
|
|
# logger.info(f"时间: {time}, measurement: {measurement}, field: {field}, 值: {value}, tags: {tags}")
|
|
|
|
|
result_records.append(dict)
|
|
|
|
|
|
|
|
|
|
return result_records
|
|
|
|
|
|
|
|
|
|
def close_client(self):
|
|
|
|
|
"""
|
|
|
|
|
关闭客户端连接,释放资源
|
|
|
|
|
"""
|
|
|
|
|
if hasattr(self, 'client') and self.client:
|
|
|
|
|
self.client.close()
|
|
|
|
|
logger.info("InfluxDB 客户端已关闭")
|
|
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
"""
|
|
|
|
|
析构时自动关闭连接,有用?
|
|
|
|
|
"""
|
|
|
|
|
self.close_client()
|
|
|
|
|
|
|
|
|
|
iot_influxdb = InfluxDBMgr()
|