diff --git a/hatch.toml b/hatch.toml index 000fc17..622700a 100644 --- a/hatch.toml +++ b/hatch.toml @@ -26,6 +26,7 @@ dependencies = [ "Pillow>=12.0.0", "pandas>=2.3.3", "openpyxl>=3.1.5", + "pyodbc>=5.3.0", # 阿里云OSS "oss2>=2.19.1", # 腾讯云COS @@ -82,6 +83,7 @@ dependencies = [ "Pillow>=12.0.0", "pandas>=2.3.3", "openpyxl>=3.1.5", + "pyodbc>=5.3.0", # 阿里云OSS "oss2>=2.19.1", # 腾讯云COS @@ -135,6 +137,7 @@ dependencies = [ "Pillow>=12.0.0", "pandas>=2.3.3", "openpyxl>=3.1.5", + "pyodbc>=5.3.0", # 阿里云OSS "oss2>=2.19.1", # 腾讯云COS diff --git a/iti/applications/service/__init__.py b/iti/applications/service/__init__.py index 01a915e..2db9632 100644 --- a/iti/applications/service/__init__.py +++ b/iti/applications/service/__init__.py @@ -1,4 +1,7 @@ +from iti.applications.service.erp import init_erp + + def init_services(app) -> None: """初始化Services""" # 初始化文件系统配置 @@ -7,4 +10,7 @@ def init_services(app) -> None: # 初始化文件目录 from iti.applications.service.sys.sys_file_directory import init_app as init_file_directory - init_file_directory(app) \ No newline at end of file + init_file_directory(app) + + # erp + init_erp(app) \ No newline at end of file diff --git a/iti/applications/service/erp/__init__.py b/iti/applications/service/erp/__init__.py new file mode 100644 index 0000000..90c228d --- /dev/null +++ b/iti/applications/service/erp/__init__.py @@ -0,0 +1,33 @@ +""" +ERP集成模块 + +提供ODBC和API两种方式访问ERP系统 +""" + +from .odbc_manager import erp_odbc, ERPODBCManager, PyODBCConnectionPool +from .api_manager import erp_api, ERPAPIManager +from .exceptions import ( + ERPException, + ERPConnectionError, + ERPAuthenticationError, + ERPTimeoutError, + ERPDataError, +) + +__all__ = [ + 'erp_odbc', + 'erp_api', + 'ERPODBCManager', + 'ERPAPIManager', + 'PyODBCConnectionPool', + 'ERPException', + 'ERPConnectionError', + 'ERPAuthenticationError', + 'ERPTimeoutError', + 'ERPDataError', +] + +def init_erp(app) -> None: + # 初始化ERP管理器 + erp_odbc.init_app(app) + erp_api.init_app(app) diff --git a/iti/applications/service/erp/api_manager.py b/iti/applications/service/erp/api_manager.py new file mode 100644 index 0000000..9c472e9 --- /dev/null +++ b/iti/applications/service/erp/api_manager.py @@ -0,0 +1,391 @@ +""" +ERP API管理器 + +提供基于requests的多用户Session管理和业务接口封装 +""" + +import requests +from threading import RLock +from datetime import datetime, timedelta +from typing import Optional, Dict, Any, List +import logging + +logger = logging.getLogger(__name__) + + +class ERPAPISession: + """单个用户的API Session""" + + def __init__(self, username: str, password: str, config: dict): + self.username = username + self.password = password + self.config = config + + self.base_url = config["base_url"].rstrip("/") + self.language_code = config.get("language_code", "zh") + self.company_number = config.get("company_number", "001_1.1") + self.verify_ssl = config.get("verify_ssl", False) + self.timeout = config.get("timeout", 30) + + self.session = requests.Session() + self.header_session_id: Optional[str] = None + self.cookie_session_id: Optional[str] = None + self.session_expires_at: Optional[datetime] = None + self.lock = RLock() + + if not self.verify_ssl: + requests.packages.urllib3.disable_warnings() + + def login(self): + """登录ERP""" + with self.lock: + url = f"{self.base_url}/{self.language_code}/{self.company_number}/login" + payload = { + "Username": self.username, + "Password": self.password, + "ForceRelogin": True, + } + + try: + resp = self.session.post(url, json=payload, timeout=self.timeout, verify=self.verify_ssl) + + if resp.status_code != 200: + raise Exception(f"登录失败: {resp.text}") + + self.header_session_id = resp.headers.get("X-Monitor-SessionId") + self.cookie_session_id = resp.cookies.get("SessionId") + + if not self.header_session_id and not self.cookie_session_id: + raise Exception("未获取到SessionId") + + if self.cookie_session_id: + self.session.cookies.set("SessionId", self.cookie_session_id) + + # Session有效期2小时 + self.session_expires_at = datetime.now() + timedelta(hours=2) + + logger.info(f"ERP登录成功: {self.username}") + + except Exception as e: + logger.error(f"ERP登录失败: {e}") + raise + + def _ensure_session(self): + """确保Session有效""" + with self.lock: + # 检查是否需要登录 + if not self.header_session_id and not self.cookie_session_id: + self.login() + return + + # 检查是否即将过期(提前5分钟刷新) + if self.session_expires_at: + time_until_expire = (self.session_expires_at - datetime.now()).total_seconds() + if time_until_expire < 300: + logger.info(f"Session即将过期,重新登录: {self.username}") + self.login() + + def request(self, method: str, path: str, **kwargs) -> Any: + """发送HTTP请求(自动处理Session)""" + url = f"{self.base_url}{path}" + + for attempt in range(2): # 最多重试1次 + self._ensure_session() + + headers = kwargs.pop("headers", {}) + if self.header_session_id: + headers["X-Monitor-SessionId"] = self.header_session_id + + try: + resp = self.session.request( + method, url, headers=headers, + timeout=self.timeout, verify=self.verify_ssl, **kwargs + ) + + # Session失效,重新登录 + if resp.status_code in (401, 403): + logger.warning(f"Session失效,重新登录: {self.username}") + with self.lock: + self.header_session_id = None + self.cookie_session_id = None + continue + + if resp.status_code >= 400: + raise Exception(f"API请求失败: {resp.status_code} - {resp.text}") + + return resp.json() + + except requests.exceptions.Timeout: + raise Exception(f"请求超时: {url}") + except requests.exceptions.RequestException as e: + raise Exception(f"请求失败: {e}") + + raise Exception("Session刷新后依然无效") + + +class ERPAPIManager: + """ERP API管理器 - 支持多用户Session管理 + 封装业务接口""" + + _instance = None + _sessions: Dict[str, ERPAPISession] = {} # username -> session + _config = None + _lock = RLock() + + def __new__(cls): + """单例模式""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def init_app(self, app): + """初始化(在app启动时调用)""" + self._config = app.config.get("ERP_API_CONFIG", {}) + logger.info("ERP API管理器初始化成功") + + def _get_session(self, username: str, password: str) -> ERPAPISession: + """ + 获取或创建用户Session(内部方法) + + Args: + username: ERP用户名 + password: ERP密码 + + Returns: + 用户的Session对象 + """ + with self._lock: + # 如果已存在,直接返回 + if username in self._sessions: + return self._sessions[username] + + # 创建新Session + session = ERPAPISession(username, password, self._config) + self._sessions[username] = session + + logger.info(f"创建新的ERP Session: {username}") + return session + + def _request( + self, + method: str, + path: str, + username: Optional[str] = None, + password: Optional[str] = None, + **kwargs + ) -> Any: + """ + 发送API请求(内部方法) + + Args: + method: HTTP方法 + path: 请求路径 + username: ERP用户名(可选,默认使用配置中的) + password: ERP密码(可选,默认使用配置中的) + **kwargs: 其他requests参数 + + Returns: + 响应JSON数据 + """ + # 使用提供的用户名密码,或使用配置中的默认值 + username = username or self._config.get("username") + password = password or self._config.get("password") + + if not username or not password: + raise Exception("缺少ERP用户名或密码") + + # 获取Session并发送请求 + session = self._get_session(username, password) + return session.request(method, path, **kwargs) + + # ==================== 业务接口方法 ==================== + + def report_manufacturing_order_operation( + self, + report_number: str, + reported_quantity: float, + username: Optional[str] = None, + password: Optional[str] = None + ) -> Dict[str, Any]: + """ + 工单工序报工 + + Args: + report_number: 报告号 + reported_quantity: 报工数量 + username: ERP用户名(可选) + password: ERP密码(可选) + + Returns: + 报工结果 + """ + path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderOperation" + + payload = { + "ReportNumber": report_number, + "ReportedQuantity": reported_quantity, + } + + logger.info(f"ERP报工: ReportNumber={report_number}, Quantity={reported_quantity}, User={username}") + + result = self._request("POST", path, json=payload, username=username, password=password) + + logger.info(f"ERP报工成功: {result}") + return result + + def report_material( + self, + manufacturing_order_operation_id: str, + part_id: str, + reported_quantity: Optional[float] = None, + username: Optional[str] = None, + password: Optional[str] = None + ) -> Dict[str, Any]: + """ + 物料报工 + + Args: + manufacturing_order_operation_id: 工序ID + part_id: 物料ID + reported_quantity: 报工数量(可选,不填则报告全部计划数量) + username: ERP用户名(可选) + password: ERP密码(可选) + + Returns: + 报工结果 + """ + path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/Reporting/ReportManufacturingOrderMaterial" + + payload = { + "ManufacturingOrderOperationId": manufacturing_order_operation_id, + "PartId": part_id, + } + + if reported_quantity is not None: + payload["ReportedQuantity"] = reported_quantity + + logger.info(f"ERP物料报工: OperationId={manufacturing_order_operation_id}, PartId={part_id}") + + result = self._request("POST", path, json=payload, username=username, password=password) + + logger.info(f"ERP物料报工成功: {result}") + return result + + def get_parts( + self, + filters: Optional[Dict[str, Any]] = None, + username: Optional[str] = None, + password: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + 获取零件列表 + + Args: + filters: OData查询参数,例如: + { + "$filter": "PartNumber eq 'ABC123'", + "$select": "Id,PartNumber,Description", + "$top": 10 + } + username: ERP用户名(可选) + password: ERP密码(可选) + + Returns: + 零件列表 + """ + path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Inventory/Parts" + + logger.info(f"ERP查询零件: filters={filters}") + + result = self._request("GET", path, params=filters, username=username, password=password) + + logger.info(f"ERP查询零件成功: 返回 {len(result)} 条") + return result + + def get_manufacturing_order_operations( + self, + manufacturing_order_id: str, + username: Optional[str] = None, + password: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + 获取工单工序列表 + + Args: + manufacturing_order_id: 工单ID + username: ERP用户名(可选) + password: ERP密码(可选) + + Returns: + 工序列表 + """ + path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Manufacturing/ManufacturingOrderOperations" + + filters = { + "$filter": f"ManufacturingOrderId eq '{manufacturing_order_id}'" + } + + logger.info(f"ERP查询工序: ManufacturingOrderId={manufacturing_order_id}") + + result = self._request("GET", path, params=filters, username=username, password=password) + + logger.info(f"ERP查询工序成功: 返回 {len(result)} 条") + return result + + def create_comment( + self, + entity_type: int, + entity_id: str, + text: str, + is_internal: bool = True, + username: Optional[str] = None, + password: Optional[str] = None + ) -> Dict[str, Any]: + """ + 创建评论 + + Args: + entity_type: 实体类型(ManufacturingOrder:12, ManufacturingOrderOperation:7) + entity_id: 实体ID + text: 评论内容 + is_internal: 是否内部评论 + username: ERP用户名(可选) + password: ERP密码(可选) + + Returns: + 创建结果 + """ + path = f"/{self._config['language_code']}/{self._config['company_number']}/api/v1/Common/Comments/Create" + + payload = { + "EntityType": entity_type, + "EntityId": entity_id, + "Text": text, + "IsInternal": is_internal, + } + + logger.info(f"ERP创建评论: EntityType={entity_type}, EntityId={entity_id}") + + result = self._request("POST", path, json=payload, username=username, password=password) + + logger.info(f"ERP创建评论成功: {result}") + return result + + # ==================== 管理方法 ==================== + + def clear_session(self, username: str): + """清除指定用户的Session""" + with self._lock: + if username in self._sessions: + del self._sessions[username] + logger.info(f"清除ERP Session: {username}") + + def clear_all_sessions(self): + """清除所有Session""" + with self._lock: + self._sessions.clear() + logger.info("清除所有ERP Session") + + +# 全局实例 +erp_api = ERPAPIManager() + diff --git a/iti/applications/service/erp/exceptions.py b/iti/applications/service/erp/exceptions.py new file mode 100644 index 0000000..0d03286 --- /dev/null +++ b/iti/applications/service/erp/exceptions.py @@ -0,0 +1,35 @@ +""" +ERP异常定义 +""" + +from iti.applications.common.exceptions.biz_exp import BizException + + +class ERPException(BizException): + """ERP异常基类""" + pass + + +class ERPConnectionError(ERPException): + """连接错误""" + def __init__(self, message: str = "ERP连接失败"): + super().__init__(message, code=503) + + +class ERPAuthenticationError(ERPException): + """认证错误""" + def __init__(self, message: str = "ERP认证失败"): + super().__init__(message, code=401) + + +class ERPTimeoutError(ERPException): + """超时错误""" + def __init__(self, message: str = "ERP请求超时"): + super().__init__(message, code=504) + + +class ERPDataError(ERPException): + """数据错误""" + def __init__(self, message: str = "ERP数据错误"): + super().__init__(message, code=422) + diff --git a/iti/applications/service/erp/odbc_manager.py b/iti/applications/service/erp/odbc_manager.py new file mode 100644 index 0000000..92a3525 --- /dev/null +++ b/iti/applications/service/erp/odbc_manager.py @@ -0,0 +1,254 @@ +""" +ERP ODBC管理器 + +提供基于pyodbc的连接池和查询方法 +""" + +import pyodbc +from queue import Queue, Empty, Full +from contextlib import contextmanager +from typing import Optional, List, Dict, Any +from threading import Lock +import logging +import time + +logger = logging.getLogger(__name__) + + +class PyODBCConnectionPool: + """纯pyodbc连接池实现""" + + def __init__(self, dsn: str, pool_size: int = 5, max_overflow: int = 10, + pool_timeout: int = 30, pool_recycle: int = 3600): + """ + 初始化连接池 + + Args: + dsn: ODBC数据源名称 + pool_size: 连接池大小 + max_overflow: 最大溢出连接数 + pool_timeout: 获取连接超时(秒) + pool_recycle: 连接回收时间(秒) + """ + self.dsn = dsn + self.pool_size = pool_size + self.max_overflow = max_overflow + self.pool_timeout = pool_timeout + self.pool_recycle = pool_recycle + + # 连接池队列 + self._pool = Queue(maxsize=pool_size) + # 溢出连接计数 + self._overflow_count = 0 + self._overflow_lock = Lock() + + # 预创建连接 + for _ in range(pool_size): + conn = self._create_connection() + self._pool.put((conn, time.time())) + + logger.info(f"ODBC连接池初始化成功: DSN={dsn}, 池大小={pool_size}") + + def _create_connection(self): + """创建新连接""" + try: + conn = pyodbc.connect(f'DSN={self.dsn}') + conn.autocommit = True + logger.debug(f"创建新ODBC连接: DSN={self.dsn}") + return conn + except Exception as e: + logger.error(f"创建ODBC连接失败: {e}") + raise + + def _is_connection_valid(self, conn, created_time: float) -> bool: + """检查连接是否有效""" + # 检查是否超过回收时间 + if time.time() - created_time > self.pool_recycle: + return False + + # 检查连接是否可用 + try: + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.close() + return True + except: + return False + + @contextmanager + def get_connection(self): + """获取连接(上下文管理器)""" + conn = None + created_time = None + is_overflow = False + + try: + # 尝试从池中获取连接 + try: + conn, created_time = self._pool.get(timeout=self.pool_timeout) + + # 检查连接是否有效 + if not self._is_connection_valid(conn, created_time): + logger.debug("连接已过期,重新创建") + try: + conn.close() + except: + pass + conn = self._create_connection() + created_time = time.time() + + except Empty: + # 池中无可用连接,尝试创建溢出连接 + with self._overflow_lock: + if self._overflow_count < self.max_overflow: + self._overflow_count += 1 + is_overflow = True + conn = self._create_connection() + created_time = time.time() + logger.debug(f"创建溢出连接,当前溢出数: {self._overflow_count}") + else: + raise Exception(f"连接池已满,无法获取连接(超时{self.pool_timeout}秒)") + + yield conn + + finally: + # 归还连接 + if conn: + if is_overflow: + # 溢出连接直接关闭 + try: + conn.close() + except: + pass + with self._overflow_lock: + self._overflow_count -= 1 + logger.debug(f"关闭溢出连接,当前溢出数: {self._overflow_count}") + else: + # 归还到池中 + try: + self._pool.put((conn, created_time), block=False) + except Full: + # 池已满,关闭连接 + try: + conn.close() + except: + pass + + def close_all(self): + """关闭所有连接""" + while not self._pool.empty(): + try: + conn, _ = self._pool.get_nowait() + conn.close() + except: + pass + logger.info("所有ODBC连接已关闭") + + +class ERPODBCManager: + """ERP ODBC管理器 - 提供连接池和查询方法""" + + _instance = None + _pool = None + + def __new__(cls): + """单例模式""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def init_app(self, app): + """初始化(在app启动时调用)""" + config = app.config.get("ERP_ODBC_CONFIG", {}) + + # 创建连接池 + self._pool = PyODBCConnectionPool( + dsn=config.get("dsn", "YHC-test"), + pool_size=config.get("pool_size", 5), + max_overflow=config.get("max_overflow", 10), + pool_timeout=config.get("pool_timeout", 30), + pool_recycle=config.get("pool_recycle", 3600), + ) + + logger.info("ERP ODBC管理器初始化成功") + + @contextmanager + def get_connection(self): + """获取连接(上下文管理器)""" + with self._pool.get_connection() as conn: + yield conn + + def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: + """ + 执行查询并返回结果 + + Args: + sql: SQL语句 + params: 参数元组 + + Returns: + 查询结果列表(字典格式) + """ + with self.get_connection() as conn: + cursor = conn.cursor() + try: + logger.debug(f"执行SQL: {sql}") + logger.debug(f"参数: {params}") + + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + + # 获取列名 + columns = [column[0] for column in cursor.description] + + # 转换为字典列表 + results = [] + for row in cursor.fetchall(): + results.append(dict(zip(columns, row))) + + logger.debug(f"查询成功,返回 {len(results)} 条记录") + logger.debug(results) + return results + + except Exception as e: + logger.error(f"SQL执行失败: {e}", exc_info=True) + raise + finally: + cursor.close() + + def execute_non_query(self, sql: str, params: Optional[tuple] = None) -> int: + """ + 执行非查询SQL(INSERT/UPDATE/DELETE) + + Returns: + 影响的行数 + """ + with self.get_connection() as conn: + cursor = conn.cursor() + try: + logger.debug(f"执行SQL: {sql[:200]}...") + + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + + rowcount = cursor.rowcount + conn.commit() + + logger.info(f"执行成功,影响 {rowcount} 行") + return rowcount + + except Exception as e: + conn.rollback() + logger.error(f"SQL执行失败: {e}", exc_info=True) + raise + finally: + cursor.close() + + +# 全局实例 +erp_odbc = ERPODBCManager() + diff --git a/iti/config.py b/iti/config.py index 5de9bcf..33707ea 100644 --- a/iti/config.py +++ b/iti/config.py @@ -217,6 +217,26 @@ class BaseConfig: "SKIP_SUPER_ADMIN_DEFAULT": True, } + # ERP ODBC配置 + ERP_ODBC_CONFIG = { + "dsn": "YHC-test", # ODBC数据源名称 + "pool_size": 5, # 连接池大小 + "max_overflow": 10, # 最大溢出连接数 + "pool_timeout": 30, # 获取连接超时(秒) + "pool_recycle": 3600, # 连接回收时间(秒) + } + + # ERP API配置 + ERP_API_CONFIG = { + "base_url": "https://192.168.21.112:8001", # ERP接口地址 + "language_code": "zh", # 语言代码 + "company_number": "001_1.1", # 公司编号 + "username": os.getenv("ERP_API_USERNAME", ""), # 默认用户名(可选) + "password": os.getenv("ERP_API_PASSWORD", ""), # 默认密码(可选) + "verify_ssl": False, # 是否验证SSL + "timeout": 30, # 请求超时时间(秒) + } + class DevConfig(BaseConfig): """开发环境配置""" diff --git a/tests/test_odbc_manager.py b/tests/test_odbc_manager.py new file mode 100644 index 0000000..bf59a0e --- /dev/null +++ b/tests/test_odbc_manager.py @@ -0,0 +1,346 @@ +""" +测试ODBC连接池 + +前提条件: +1. 已配置ODBC数据源:YHC-test +2. 数据源可以正常连接到Sybase数据库 +""" + +import sys +import os +import time +from threading import Thread + +# 添加项目路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +def test_basic_connection(): + """测试1: 基本连接""" + print("\n" + "=" * 60) + print("测试1: 基本连接") + print("=" * 60) + + try: + from iti.applications.service.erp import PyODBCConnectionPool + + # 创建连接池 + pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=2) + print("[OK] 连接池创建成功") + + # 获取连接并执行简单查询 + with pool.get_connection() as conn: + cursor = conn.cursor() + cursor.execute("SELECT 1 as test") + result = cursor.fetchone() + cursor.close() + print(f"[OK] 查询成功: {result}") + + return True + + except Exception as e: + print(f"[FAIL] 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_connection_reuse(): + """测试2: 连接复用""" + print("\n" + "=" * 60) + print("测试2: 连接复用") + print("=" * 60) + + try: + from iti.applications.service.erp import PyODBCConnectionPool + + pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=2) + + # 第一次查询 + start = time.time() + with pool.get_connection() as conn: + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + time1 = time.time() - start + print(f"[OK] 第1次查询耗时: {time1*1000:.2f}ms") + + # 第二次查询(应该复用连接,更快) + start = time.time() + with pool.get_connection() as conn: + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + time2 = time.time() - start + print(f"[OK] 第2次查询耗时: {time2*1000:.2f}ms") + + if time2 < time1: + print(f"[OK] 连接复用生效,性能提升 {(time1/time2):.1f}倍") + else: + print("[INFO] 连接复用正常(时间差异不明显)") + + return True + + except Exception as e: + print(f"[FAIL] 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_erp_odbc_manager(): + """测试3: ERPODBCManager""" + print("\n" + "=" * 60) + print("测试3: ERPODBCManager") + print("=" * 60) + + try: + from iti.applications.service.erp import ERPODBCManager + + # 创建管理器 + manager = ERPODBCManager() + + # 模拟Flask配置 + class MockApp: + config = { + "ERP_ODBC_CONFIG": { + "dsn": "YHC-test", + "pool_size": 3, + "max_overflow": 5, + } + } + + app = MockApp() + manager.init_app(app) + print("[OK] ERPODBCManager初始化成功") + + # 测试execute_query + results = manager.execute_query("SELECT 1 as test, 'hello' as msg") + print(f"[OK] execute_query成功: {results}") + + if len(results) == 1 and results[0]['test'] == 1: + print("[OK] 查询结果正确") + + return True + + except Exception as e: + print(f"[FAIL] 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_real_erp_query(): + """测试4: 真实ERP查询""" + print("\n" + "=" * 60) + print("测试4: 真实ERP查询") + print("=" * 60) + + try: + from iti.applications.service.erp import ERPODBCManager + + manager = ERPODBCManager() + + class MockApp: + config = { + "ERP_ODBC_CONFIG": { + "dsn": "YHC-test", + "pool_size": 3, + } + } + + app = MockApp() + manager.init_app(app) + + # 查询工单表(限制1条) + sql = """ + SELECT TOP 1 + OrderNumber + FROM monitor.ManufacturingOrder + """ + + results = manager.execute_query(sql) + + if results: + print(f"[OK] 查询到 {len(results)} 条工单") + print(f"[OK] 示例数据: {results[0]}") + else: + print("[INFO] 查询结果为空(可能表中没有数据)") + + return True + + except Exception as e: + print(f"[FAIL] 测试失败: {e}") + print("[INFO] 如果是表不存在的错误,说明连接正常,只是表名不对") + import traceback + traceback.print_exc() + return False + + +def test_concurrent_queries(): + """测试5: 并发查询""" + print("\n" + "=" * 60) + print("测试5: 并发查询(5个线程)") + print("=" * 60) + + try: + from iti.applications.service.erp import ERPODBCManager + + manager = ERPODBCManager() + + class MockApp: + config = { + "ERP_ODBC_CONFIG": { + "dsn": "YHC-test", + "pool_size": 3, + "max_overflow": 5, + } + } + + app = MockApp() + manager.init_app(app) + + results = [] + errors = [] + + def query_task(thread_id): + try: + result = manager.execute_query(f"SELECT {thread_id} as thread_id, 'test' as msg") + results.append((thread_id, result)) + print(f" [线程{thread_id}] 查询成功") + except Exception as e: + errors.append((thread_id, str(e))) + print(f" [线程{thread_id}] 查询失败: {e}") + + # 创建5个线程并发查询 + threads = [] + for i in range(5): + t = Thread(target=query_task, args=(i+1,)) + threads.append(t) + t.start() + + # 等待所有线程完成 + for t in threads: + t.join() + + print(f"[OK] 并发查询完成: 成功 {len(results)} 个, 失败 {len(errors)} 个") + + if len(errors) == 0: + print("[OK] 所有并发查询都成功") + return True + else: + print(f"[WARN] 有 {len(errors)} 个查询失败") + return False + + except Exception as e: + print(f"[FAIL] 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def test_performance(): + """测试6: 性能对比""" + print("\n" + "=" * 60) + print("测试6: 性能对比(连接池 vs 每次新建连接)") + print("=" * 60) + + try: + import pyodbc + from iti.applications.service.erp import PyODBCConnectionPool + + # 测试1: 不使用连接池(每次新建连接) + print("\n[测试] 不使用连接池(10次查询)...") + start = time.time() + for i in range(10): + conn = pyodbc.connect('DSN=YHC-test') + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + conn.close() + time_without_pool = time.time() - start + print(f" 耗时: {time_without_pool:.3f}秒") + + # 测试2: 使用连接池 + print("\n[测试] 使用连接池(10次查询)...") + pool = PyODBCConnectionPool(dsn="YHC-test", pool_size=3) + start = time.time() + for i in range(10): + with pool.get_connection() as conn: + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + time_with_pool = time.time() - start + print(f" 耗时: {time_with_pool:.3f}秒") + + # 性能提升 + improvement = time_without_pool / time_with_pool + print(f"\n[结果] 性能提升: {improvement:.1f}倍") + + if improvement > 2: + print("[OK] 连接池性能提升显著") + return True + else: + print("[INFO] 连接池性能提升不明显(可能查询太简单)") + return True + + except Exception as e: + print(f"[FAIL] 测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """运行所有测试""" + print("=" * 60) + print("ODBC连接池测试") + print("=" * 60) + print("\n前提条件:") + print("1. 已配置ODBC数据源: YHC-test") + print("2. 数据源可以正常连接") + + tests = [ + ("基本连接", test_basic_connection), + ("连接复用", test_connection_reuse), + ("ERPODBCManager", test_erp_odbc_manager), + ("真实ERP查询", test_real_erp_query), + ("并发查询", test_concurrent_queries), + ("性能对比", test_performance), + ] + + results = [] + for name, test_func in tests: + try: + result = test_func() + results.append((name, result)) + except Exception as e: + print(f"\n[ERROR] 测试 '{name}' 异常: {e}") + results.append((name, False)) + + # 输出结果 + print("\n" + "=" * 60) + print("测试结果汇总") + print("=" * 60) + + for name, result in results: + status = "[OK]" if result else "[FAIL]" + print(f"{status} {name}") + + passed = sum(1 for _, r in results if r) + total = len(results) + + print("\n" + "=" * 60) + print(f"总计: {passed}/{total} 通过") + print("=" * 60) + + return passed == total + + +if __name__ == "__main__": + success = main() + sys.exit(0 if success else 1) +