|
|
"""
|
|
|
ERP ODBC管理器
|
|
|
|
|
|
提供基于pyodbc的连接池和查询方法
|
|
|
"""
|
|
|
|
|
|
import pyodbc
|
|
|
from queue import Queue, Empty, Full
|
|
|
from contextlib import contextmanager
|
|
|
from typing import Optional, List, Dict, Any
|
|
|
from threading import Lock
|
|
|
import logging
|
|
|
import time
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class PyODBCConnectionPool:
|
|
|
"""纯pyodbc连接池实现"""
|
|
|
|
|
|
def __init__(self, dsn: str, pool_size: int = 5, max_overflow: int = 10,
|
|
|
pool_timeout: int = 30, pool_recycle: int = 3600, lazy_init: bool = False):
|
|
|
"""
|
|
|
初始化连接池
|
|
|
|
|
|
Args:
|
|
|
dsn: ODBC数据源名称
|
|
|
pool_size: 连接池大小
|
|
|
max_overflow: 最大溢出连接数
|
|
|
pool_timeout: 获取连接超时(秒)
|
|
|
pool_recycle: 连接回收时间(秒)
|
|
|
lazy_init: 是否延迟初始化(不预创建连接)
|
|
|
"""
|
|
|
self.dsn = dsn
|
|
|
self.pool_size = pool_size
|
|
|
self.max_overflow = max_overflow
|
|
|
self.pool_timeout = pool_timeout
|
|
|
self.pool_recycle = pool_recycle
|
|
|
self._is_available = False
|
|
|
|
|
|
# 连接池队列
|
|
|
self._pool = Queue(maxsize=pool_size)
|
|
|
# 溢出连接计数
|
|
|
self._overflow_count = 0
|
|
|
self._overflow_lock = Lock()
|
|
|
|
|
|
# 预创建连接(如果不是延迟初始化)
|
|
|
if not lazy_init:
|
|
|
try:
|
|
|
for _ in range(pool_size):
|
|
|
conn = self._create_connection()
|
|
|
self._pool.put((conn, time.time()))
|
|
|
self._is_available = True
|
|
|
logger.info(f"ODBC连接池初始化成功: DSN={dsn}, 池大小={pool_size}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"ODBC连接池初始化失败: {e}")
|
|
|
self._is_available = False
|
|
|
else:
|
|
|
logger.info(f"ODBC连接池延迟初始化: DSN={dsn}, 池大小={pool_size}")
|
|
|
|
|
|
def _create_connection(self):
|
|
|
"""创建新连接"""
|
|
|
try:
|
|
|
conn = pyodbc.connect(f'DSN={self.dsn}')
|
|
|
conn.autocommit = True
|
|
|
logger.debug(f"创建新ODBC连接: DSN={self.dsn}")
|
|
|
return conn
|
|
|
except Exception as e:
|
|
|
logger.error(f"创建ODBC连接失败: {e}")
|
|
|
raise
|
|
|
|
|
|
def _is_connection_valid(self, conn, created_time: float) -> bool:
|
|
|
"""检查连接是否有效"""
|
|
|
# 检查是否超过回收时间
|
|
|
if time.time() - created_time > self.pool_recycle:
|
|
|
return False
|
|
|
|
|
|
# 检查连接是否可用
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
cursor.execute("SELECT 1")
|
|
|
cursor.close()
|
|
|
return True
|
|
|
except:
|
|
|
return False
|
|
|
|
|
|
def is_available(self) -> bool:
|
|
|
"""检查连接池是否可用"""
|
|
|
return self._is_available
|
|
|
|
|
|
def test_connection(self) -> bool:
|
|
|
"""测试连接是否可用"""
|
|
|
try:
|
|
|
conn = self._create_connection()
|
|
|
conn.close()
|
|
|
self._is_available = True
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"ODBC连接测试失败: {e}")
|
|
|
self._is_available = False
|
|
|
return False
|
|
|
|
|
|
@contextmanager
|
|
|
def get_connection(self):
|
|
|
"""获取连接(上下文管理器)"""
|
|
|
# 检查连接池是否可用
|
|
|
if not self._is_available:
|
|
|
raise Exception("ODBC连接池不可用,请检查数据源配置")
|
|
|
|
|
|
conn = None
|
|
|
created_time = None
|
|
|
is_overflow = False
|
|
|
|
|
|
try:
|
|
|
# 尝试从池中获取连接
|
|
|
try:
|
|
|
conn, created_time = self._pool.get(timeout=self.pool_timeout)
|
|
|
|
|
|
# 检查连接是否有效
|
|
|
if not self._is_connection_valid(conn, created_time):
|
|
|
logger.debug("连接已过期,重新创建")
|
|
|
try:
|
|
|
conn.close()
|
|
|
except:
|
|
|
pass
|
|
|
conn = self._create_connection()
|
|
|
created_time = time.time()
|
|
|
|
|
|
except Empty:
|
|
|
# 池中无可用连接,尝试创建溢出连接
|
|
|
with self._overflow_lock:
|
|
|
if self._overflow_count < self.max_overflow:
|
|
|
self._overflow_count += 1
|
|
|
is_overflow = True
|
|
|
conn = self._create_connection()
|
|
|
created_time = time.time()
|
|
|
logger.debug(f"创建溢出连接,当前溢出数: {self._overflow_count}")
|
|
|
else:
|
|
|
raise Exception(f"连接池已满,无法获取连接(超时{self.pool_timeout}秒)")
|
|
|
|
|
|
yield conn
|
|
|
|
|
|
finally:
|
|
|
# 归还连接
|
|
|
if conn:
|
|
|
if is_overflow:
|
|
|
# 溢出连接直接关闭
|
|
|
try:
|
|
|
conn.close()
|
|
|
except:
|
|
|
pass
|
|
|
with self._overflow_lock:
|
|
|
self._overflow_count -= 1
|
|
|
logger.debug(f"关闭溢出连接,当前溢出数: {self._overflow_count}")
|
|
|
else:
|
|
|
# 归还到池中
|
|
|
try:
|
|
|
self._pool.put((conn, created_time), block=False)
|
|
|
except Full:
|
|
|
# 池已满,关闭连接
|
|
|
try:
|
|
|
conn.close()
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
def close_all(self):
|
|
|
"""关闭所有连接"""
|
|
|
while not self._pool.empty():
|
|
|
try:
|
|
|
conn, _ = self._pool.get_nowait()
|
|
|
conn.close()
|
|
|
except:
|
|
|
pass
|
|
|
logger.info("所有ODBC连接已关闭")
|
|
|
|
|
|
|
|
|
class ERPODBCManager:
|
|
|
"""ERP ODBC管理器 - 提供连接池和查询方法"""
|
|
|
|
|
|
_instance = None
|
|
|
_pool = None
|
|
|
_initialized = False
|
|
|
|
|
|
def __new__(cls):
|
|
|
"""单例模式"""
|
|
|
if cls._instance is None:
|
|
|
cls._instance = super().__new__(cls)
|
|
|
return cls._instance
|
|
|
|
|
|
def init_app(self, app):
|
|
|
"""初始化(在app启动时调用)"""
|
|
|
if self._initialized:
|
|
|
logger.warning("ERP ODBC管理器已初始化,跳过重复初始化")
|
|
|
return
|
|
|
|
|
|
config = app.config.get("ERP_ODBC_CONFIG", {})
|
|
|
|
|
|
# 检查是否启用ODBC
|
|
|
if not config.get("enabled", True):
|
|
|
logger.info("ERP ODBC功能已禁用(配置项 enabled=False)")
|
|
|
self._pool = None
|
|
|
self._initialized = True
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
# 创建连接池(延迟初始化)
|
|
|
self._pool = PyODBCConnectionPool(
|
|
|
dsn=config.get("dsn", "YHC-test"),
|
|
|
pool_size=config.get("pool_size", 5),
|
|
|
max_overflow=config.get("max_overflow", 10),
|
|
|
pool_timeout=config.get("pool_timeout", 30),
|
|
|
pool_recycle=config.get("pool_recycle", 3600),
|
|
|
lazy_init=True, # 延迟初始化,不预创建连接
|
|
|
)
|
|
|
|
|
|
# 测试连接
|
|
|
if self._pool.test_connection():
|
|
|
logger.info("ERP ODBC管理器初始化成功,连接测试通过")
|
|
|
else:
|
|
|
logger.warning("ERP ODBC管理器初始化完成,但连接测试失败,ODBC功能将不可用")
|
|
|
|
|
|
self._initialized = True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"ERP ODBC管理器初始化失败: {e}", exc_info=True)
|
|
|
logger.warning("应用将继续启动,但ODBC功能将不可用")
|
|
|
self._pool = None
|
|
|
self._initialized = True
|
|
|
|
|
|
def is_available(self) -> bool:
|
|
|
"""检查ODBC连接是否可用"""
|
|
|
return self._pool is not None and self._pool.is_available()
|
|
|
|
|
|
@contextmanager
|
|
|
def get_connection(self):
|
|
|
"""获取连接(上下文管理器)"""
|
|
|
if not self.is_available():
|
|
|
raise Exception("ODBC连接不可用,请检查数据源配置或联系管理员")
|
|
|
|
|
|
with self._pool.get_connection() as conn:
|
|
|
yield conn
|
|
|
|
|
|
def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
执行查询并返回结果
|
|
|
|
|
|
Args:
|
|
|
sql: SQL语句
|
|
|
params: 参数元组
|
|
|
|
|
|
Returns:
|
|
|
查询结果列表(字典格式)
|
|
|
|
|
|
Raises:
|
|
|
Exception: 当ODBC连接不可用或查询失败时
|
|
|
"""
|
|
|
if not self.is_available():
|
|
|
raise Exception("ODBC连接不可用,无法执行查询")
|
|
|
|
|
|
with self.get_connection() as conn:
|
|
|
cursor = conn.cursor()
|
|
|
try:
|
|
|
logger.debug(f"执行SQL: {sql}")
|
|
|
logger.debug(f"参数: {params}")
|
|
|
|
|
|
if params:
|
|
|
cursor.execute(sql, params)
|
|
|
else:
|
|
|
cursor.execute(sql)
|
|
|
|
|
|
# 获取列名
|
|
|
columns = [column[0] for column in cursor.description]
|
|
|
|
|
|
# 转换为字典列表
|
|
|
results = []
|
|
|
for row in cursor.fetchall():
|
|
|
results.append(dict(zip(columns, row)))
|
|
|
|
|
|
logger.debug(f"查询成功,返回 {len(results)} 条记录")
|
|
|
logger.debug(results)
|
|
|
return results
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"SQL执行失败: {e}", exc_info=True)
|
|
|
raise
|
|
|
finally:
|
|
|
cursor.close()
|
|
|
|
|
|
def execute_non_query(self, sql: str, params: Optional[tuple] = None) -> int:
|
|
|
"""
|
|
|
执行非查询SQL(INSERT/UPDATE/DELETE)
|
|
|
|
|
|
Returns:
|
|
|
影响的行数
|
|
|
|
|
|
Raises:
|
|
|
Exception: 当ODBC连接不可用或执行失败时
|
|
|
"""
|
|
|
if not self.is_available():
|
|
|
raise Exception("ODBC连接不可用,无法执行操作")
|
|
|
|
|
|
with self.get_connection() as conn:
|
|
|
cursor = conn.cursor()
|
|
|
try:
|
|
|
logger.debug(f"执行SQL: {sql[:200]}...")
|
|
|
|
|
|
if params:
|
|
|
cursor.execute(sql, params)
|
|
|
else:
|
|
|
cursor.execute(sql)
|
|
|
|
|
|
rowcount = cursor.rowcount
|
|
|
conn.commit()
|
|
|
|
|
|
logger.info(f"执行成功,影响 {rowcount} 行")
|
|
|
return rowcount
|
|
|
|
|
|
except Exception as e:
|
|
|
conn.rollback()
|
|
|
logger.error(f"SQL执行失败: {e}", exc_info=True)
|
|
|
raise
|
|
|
finally:
|
|
|
cursor.close()
|
|
|
|
|
|
|
|
|
# 全局实例
|
|
|
erp_odbc = ERPODBCManager()
|
|
|
|