You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

329 lines
11 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
ERP ODBC管理器
提供基于pyodbc的连接池和查询方法
"""
import pyodbc
from queue import Queue, Empty, Full
from contextlib import contextmanager
from typing import Optional, List, Dict, Any
from threading import Lock
import logging
import time
logger = logging.getLogger(__name__)
class PyODBCConnectionPool:
"""纯pyodbc连接池实现"""
def __init__(self, dsn: str, pool_size: int = 5, max_overflow: int = 10,
pool_timeout: int = 30, pool_recycle: int = 3600, lazy_init: bool = False):
"""
初始化连接池
Args:
dsn: ODBC数据源名称
pool_size: 连接池大小
max_overflow: 最大溢出连接数
pool_timeout: 获取连接超时(秒)
pool_recycle: 连接回收时间(秒)
lazy_init: 是否延迟初始化(不预创建连接)
"""
self.dsn = dsn
self.pool_size = pool_size
self.max_overflow = max_overflow
self.pool_timeout = pool_timeout
self.pool_recycle = pool_recycle
self._is_available = False
# 连接池队列
self._pool = Queue(maxsize=pool_size)
# 溢出连接计数
self._overflow_count = 0
self._overflow_lock = Lock()
# 预创建连接(如果不是延迟初始化)
if not lazy_init:
try:
for _ in range(pool_size):
conn = self._create_connection()
self._pool.put((conn, time.time()))
self._is_available = True
logger.info(f"ODBC连接池初始化成功: DSN={dsn}, 池大小={pool_size}")
except Exception as e:
logger.error(f"ODBC连接池初始化失败: {e}")
self._is_available = False
else:
logger.info(f"ODBC连接池延迟初始化: DSN={dsn}, 池大小={pool_size}")
def _create_connection(self):
"""创建新连接"""
try:
conn = pyodbc.connect(f'DSN={self.dsn}')
conn.autocommit = True
logger.debug(f"创建新ODBC连接: DSN={self.dsn}")
return conn
except Exception as e:
logger.error(f"创建ODBC连接失败: {e}")
raise
def _is_connection_valid(self, conn, created_time: float) -> bool:
"""检查连接是否有效"""
# 检查是否超过回收时间
if time.time() - created_time > self.pool_recycle:
return False
# 检查连接是否可用
try:
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
return True
except:
return False
def is_available(self) -> bool:
"""检查连接池是否可用"""
return self._is_available
def test_connection(self) -> bool:
"""测试连接是否可用"""
try:
conn = self._create_connection()
conn.close()
self._is_available = True
return True
except Exception as e:
logger.error(f"ODBC连接测试失败: {e}")
self._is_available = False
return False
@contextmanager
def get_connection(self):
"""获取连接(上下文管理器)"""
# 检查连接池是否可用
if not self._is_available:
raise Exception("ODBC连接池不可用请检查数据源配置")
conn = None
created_time = None
is_overflow = False
try:
# 尝试从池中获取连接
try:
conn, created_time = self._pool.get(timeout=self.pool_timeout)
# 检查连接是否有效
if not self._is_connection_valid(conn, created_time):
logger.debug("连接已过期,重新创建")
try:
conn.close()
except:
pass
conn = self._create_connection()
created_time = time.time()
except Empty:
# 池中无可用连接,尝试创建溢出连接
with self._overflow_lock:
if self._overflow_count < self.max_overflow:
self._overflow_count += 1
is_overflow = True
conn = self._create_connection()
created_time = time.time()
logger.debug(f"创建溢出连接,当前溢出数: {self._overflow_count}")
else:
raise Exception(f"连接池已满,无法获取连接(超时{self.pool_timeout}秒)")
yield conn
finally:
# 归还连接
if conn:
if is_overflow:
# 溢出连接直接关闭
try:
conn.close()
except:
pass
with self._overflow_lock:
self._overflow_count -= 1
logger.debug(f"关闭溢出连接,当前溢出数: {self._overflow_count}")
else:
# 归还到池中
try:
self._pool.put((conn, created_time), block=False)
except Full:
# 池已满,关闭连接
try:
conn.close()
except:
pass
def close_all(self):
"""关闭所有连接"""
while not self._pool.empty():
try:
conn, _ = self._pool.get_nowait()
conn.close()
except:
pass
logger.info("所有ODBC连接已关闭")
class ERPODBCManager:
"""ERP ODBC管理器 - 提供连接池和查询方法"""
_instance = None
_pool = None
_initialized = False
def __new__(cls):
"""单例模式"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def init_app(self, app):
"""初始化在app启动时调用"""
if self._initialized:
logger.warning("ERP ODBC管理器已初始化跳过重复初始化")
return
config = app.config.get("ERP_ODBC_CONFIG", {})
# 检查是否启用ODBC
if not config.get("enabled", True):
logger.info("ERP ODBC功能已禁用配置项 enabled=False")
self._pool = None
self._initialized = True
return
try:
# 创建连接池(延迟初始化)
self._pool = PyODBCConnectionPool(
dsn=config.get("dsn", "YHC-test"),
pool_size=config.get("pool_size", 5),
max_overflow=config.get("max_overflow", 10),
pool_timeout=config.get("pool_timeout", 30),
pool_recycle=config.get("pool_recycle", 3600),
lazy_init=True, # 延迟初始化,不预创建连接
)
# 测试连接
if self._pool.test_connection():
logger.info("ERP ODBC管理器初始化成功连接测试通过")
else:
logger.warning("ERP ODBC管理器初始化完成但连接测试失败ODBC功能将不可用")
self._initialized = True
except Exception as e:
logger.error(f"ERP ODBC管理器初始化失败: {e}", exc_info=True)
logger.warning("应用将继续启动但ODBC功能将不可用")
self._pool = None
self._initialized = True
def is_available(self) -> bool:
"""检查ODBC连接是否可用"""
return self._pool is not None and self._pool.is_available()
@contextmanager
def get_connection(self):
"""获取连接(上下文管理器)"""
if not self.is_available():
raise Exception("ODBC连接不可用请检查数据源配置或联系管理员")
with self._pool.get_connection() as conn:
yield conn
def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""
执行查询并返回结果
Args:
sql: SQL语句
params: 参数元组
Returns:
查询结果列表(字典格式)
Raises:
Exception: 当ODBC连接不可用或查询失败时
"""
if not self.is_available():
raise Exception("ODBC连接不可用无法执行查询")
with self.get_connection() as conn:
cursor = conn.cursor()
try:
logger.debug(f"执行SQL: {sql}")
logger.debug(f"参数: {params}")
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
# 获取列名
columns = [column[0] for column in cursor.description]
# 转换为字典列表
results = []
for row in cursor.fetchall():
results.append(dict(zip(columns, row)))
logger.debug(f"查询成功,返回 {len(results)} 条记录")
logger.debug(results)
return results
except Exception as e:
logger.error(f"SQL执行失败: {e}", exc_info=True)
raise
finally:
cursor.close()
def execute_non_query(self, sql: str, params: Optional[tuple] = None) -> int:
"""
执行非查询SQLINSERT/UPDATE/DELETE
Returns:
影响的行数
Raises:
Exception: 当ODBC连接不可用或执行失败时
"""
if not self.is_available():
raise Exception("ODBC连接不可用无法执行操作")
with self.get_connection() as conn:
cursor = conn.cursor()
try:
logger.debug(f"执行SQL: {sql[:200]}...")
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
rowcount = cursor.rowcount
conn.commit()
logger.info(f"执行成功,影响 {rowcount}")
return rowcount
except Exception as e:
conn.rollback()
logger.error(f"SQL执行失败: {e}", exc_info=True)
raise
finally:
cursor.close()
# 全局实例
erp_odbc = ERPODBCManager()